1 /* 2 * Sharing QEMU devices via vhost-user protocol 3 * 4 * Copyright (c) Coiby Xu <coiby.xu@gmail.com>. 5 * Copyright (c) 2020 Red Hat, Inc. 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2 or 8 * later. See the COPYING file in the top-level directory. 9 */ 10 #include "qemu/osdep.h" 11 #include "qemu/error-report.h" 12 #include "qemu/main-loop.h" 13 #include "qemu/vhost-user-server.h" 14 #include "block/aio-wait.h" 15 16 /* 17 * Theory of operation: 18 * 19 * VuServer is started and stopped by vhost_user_server_start() and 20 * vhost_user_server_stop() from the main loop thread. Starting the server 21 * opens a vhost-user UNIX domain socket and listens for incoming connections. 22 * Only one connection is allowed at a time. 23 * 24 * The connection is handled by the vu_client_trip() coroutine in the 25 * VuServer->ctx AioContext. The coroutine consists of a vu_dispatch() loop 26 * where libvhost-user calls vu_message_read() to receive the next vhost-user 27 * protocol messages over the UNIX domain socket. 28 * 29 * When virtqueues are set up libvhost-user calls set_watch() to monitor kick 30 * fds. These fds are also handled in the VuServer->ctx AioContext. 31 * 32 * Both vu_client_trip() and kick fd monitoring can be stopped by shutting down 33 * the socket connection. Shutting down the socket connection causes 34 * vu_message_read() to fail since no more data can be received from the socket. 35 * After vu_dispatch() fails, vu_client_trip() calls vu_deinit() to stop 36 * libvhost-user before terminating the coroutine. vu_deinit() calls 37 * remove_watch() to stop monitoring kick fds and this stops virtqueue 38 * processing. 39 * 40 * When vu_client_trip() has finished cleaning up it schedules a BH in the main 41 * loop thread to accept the next client connection. 42 * 43 * When libvhost-user detects an error it calls panic_cb() and sets the 44 * dev->broken flag. Both vu_client_trip() and kick fd processing stop when 45 * the dev->broken flag is set. 46 * 47 * It is possible to switch AioContexts using 48 * vhost_user_server_detach_aio_context() and 49 * vhost_user_server_attach_aio_context(). They stop monitoring fds in the old 50 * AioContext and resume monitoring in the new AioContext. The vu_client_trip() 51 * coroutine remains in a yielded state during the switch. This is made 52 * possible by QIOChannel's support for spurious coroutine re-entry in 53 * qio_channel_yield(). The coroutine will restart I/O when re-entered from the 54 * new AioContext. 55 */ 56 57 static void vmsg_close_fds(VhostUserMsg *vmsg) 58 { 59 int i; 60 for (i = 0; i < vmsg->fd_num; i++) { 61 close(vmsg->fds[i]); 62 } 63 } 64 65 static void vmsg_unblock_fds(VhostUserMsg *vmsg) 66 { 67 int i; 68 for (i = 0; i < vmsg->fd_num; i++) { 69 qemu_socket_set_nonblock(vmsg->fds[i]); 70 } 71 } 72 73 static void panic_cb(VuDev *vu_dev, const char *buf) 74 { 75 error_report("vu_panic: %s", buf); 76 } 77 78 void vhost_user_server_inc_in_flight(VuServer *server) 79 { 80 assert(!server->wait_idle); 81 qatomic_inc(&server->in_flight); 82 } 83 84 void vhost_user_server_dec_in_flight(VuServer *server) 85 { 86 if (qatomic_fetch_dec(&server->in_flight) == 1) { 87 if (server->wait_idle) { 88 aio_co_wake(server->co_trip); 89 } 90 } 91 } 92 93 bool vhost_user_server_has_in_flight(VuServer *server) 94 { 95 return qatomic_load_acquire(&server->in_flight) > 0; 96 } 97 98 static bool coroutine_fn 99 vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg) 100 { 101 struct iovec iov = { 102 .iov_base = (char *)vmsg, 103 .iov_len = VHOST_USER_HDR_SIZE, 104 }; 105 int rc, read_bytes = 0; 106 Error *local_err = NULL; 107 const size_t max_fds = G_N_ELEMENTS(vmsg->fds); 108 VuServer *server = container_of(vu_dev, VuServer, vu_dev); 109 QIOChannel *ioc = server->ioc; 110 111 vmsg->fd_num = 0; 112 if (!ioc) { 113 error_report_err(local_err); 114 goto fail; 115 } 116 117 assert(qemu_in_coroutine()); 118 do { 119 size_t nfds = 0; 120 int *fds = NULL; 121 122 /* 123 * qio_channel_readv_full may have short reads, keeping calling it 124 * until getting VHOST_USER_HDR_SIZE or 0 bytes in total 125 */ 126 rc = qio_channel_readv_full(ioc, &iov, 1, &fds, &nfds, 0, &local_err); 127 if (rc < 0) { 128 if (rc == QIO_CHANNEL_ERR_BLOCK) { 129 assert(local_err == NULL); 130 if (server->ctx) { 131 server->in_qio_channel_yield = true; 132 qio_channel_yield(ioc, G_IO_IN); 133 server->in_qio_channel_yield = false; 134 } else { 135 return false; 136 } 137 continue; 138 } else { 139 error_report_err(local_err); 140 goto fail; 141 } 142 } 143 144 if (nfds > 0) { 145 if (vmsg->fd_num + nfds > max_fds) { 146 error_report("A maximum of %zu fds are allowed, " 147 "however got %zu fds now", 148 max_fds, vmsg->fd_num + nfds); 149 g_free(fds); 150 goto fail; 151 } 152 memcpy(vmsg->fds + vmsg->fd_num, fds, nfds * sizeof(vmsg->fds[0])); 153 vmsg->fd_num += nfds; 154 g_free(fds); 155 } 156 157 if (rc == 0) { /* socket closed */ 158 goto fail; 159 } 160 161 iov.iov_base += rc; 162 iov.iov_len -= rc; 163 read_bytes += rc; 164 } while (read_bytes != VHOST_USER_HDR_SIZE); 165 166 /* qio_channel_readv_full will make socket fds blocking, unblock them */ 167 vmsg_unblock_fds(vmsg); 168 if (vmsg->size > sizeof(vmsg->payload)) { 169 error_report("Error: too big message request: %d, " 170 "size: vmsg->size: %u, " 171 "while sizeof(vmsg->payload) = %zu", 172 vmsg->request, vmsg->size, sizeof(vmsg->payload)); 173 goto fail; 174 } 175 176 struct iovec iov_payload = { 177 .iov_base = (char *)&vmsg->payload, 178 .iov_len = vmsg->size, 179 }; 180 if (vmsg->size) { 181 rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err); 182 if (rc != 1) { 183 if (local_err) { 184 error_report_err(local_err); 185 } 186 goto fail; 187 } 188 } 189 190 return true; 191 192 fail: 193 vmsg_close_fds(vmsg); 194 195 return false; 196 } 197 198 static coroutine_fn void vu_client_trip(void *opaque) 199 { 200 VuServer *server = opaque; 201 VuDev *vu_dev = &server->vu_dev; 202 203 while (!vu_dev->broken) { 204 if (server->quiescing) { 205 server->co_trip = NULL; 206 aio_wait_kick(); 207 return; 208 } 209 /* vu_dispatch() returns false if server->ctx went away */ 210 if (!vu_dispatch(vu_dev) && server->ctx) { 211 break; 212 } 213 } 214 215 if (vhost_user_server_has_in_flight(server)) { 216 /* Wait for requests to complete before we can unmap the memory */ 217 server->wait_idle = true; 218 qemu_coroutine_yield(); 219 server->wait_idle = false; 220 } 221 assert(!vhost_user_server_has_in_flight(server)); 222 223 vu_deinit(vu_dev); 224 225 /* vu_deinit() should have called remove_watch() */ 226 assert(QTAILQ_EMPTY(&server->vu_fd_watches)); 227 228 object_unref(OBJECT(server->sioc)); 229 server->sioc = NULL; 230 231 object_unref(OBJECT(server->ioc)); 232 server->ioc = NULL; 233 234 server->co_trip = NULL; 235 if (server->restart_listener_bh) { 236 qemu_bh_schedule(server->restart_listener_bh); 237 } 238 aio_wait_kick(); 239 } 240 241 /* 242 * a wrapper for vu_kick_cb 243 * 244 * since aio_dispatch can only pass one user data pointer to the 245 * callback function, pack VuDev and pvt into a struct. Then unpack it 246 * and pass them to vu_kick_cb 247 */ 248 static void kick_handler(void *opaque) 249 { 250 VuFdWatch *vu_fd_watch = opaque; 251 VuDev *vu_dev = vu_fd_watch->vu_dev; 252 253 vu_fd_watch->cb(vu_dev, 0, vu_fd_watch->pvt); 254 255 /* Stop vu_client_trip() if an error occurred in vu_fd_watch->cb() */ 256 if (vu_dev->broken) { 257 VuServer *server = container_of(vu_dev, VuServer, vu_dev); 258 259 qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 260 } 261 } 262 263 static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd) 264 { 265 266 VuFdWatch *vu_fd_watch, *next; 267 QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) { 268 if (vu_fd_watch->fd == fd) { 269 return vu_fd_watch; 270 } 271 } 272 return NULL; 273 } 274 275 static void 276 set_watch(VuDev *vu_dev, int fd, int vu_evt, 277 vu_watch_cb cb, void *pvt) 278 { 279 280 VuServer *server = container_of(vu_dev, VuServer, vu_dev); 281 g_assert(vu_dev); 282 g_assert(fd >= 0); 283 g_assert(cb); 284 285 VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd); 286 287 if (!vu_fd_watch) { 288 vu_fd_watch = g_new0(VuFdWatch, 1); 289 290 QTAILQ_INSERT_TAIL(&server->vu_fd_watches, vu_fd_watch, next); 291 292 vu_fd_watch->fd = fd; 293 vu_fd_watch->cb = cb; 294 qemu_socket_set_nonblock(fd); 295 aio_set_fd_handler(server->ctx, fd, kick_handler, 296 NULL, NULL, NULL, vu_fd_watch); 297 vu_fd_watch->vu_dev = vu_dev; 298 vu_fd_watch->pvt = pvt; 299 } 300 } 301 302 303 static void remove_watch(VuDev *vu_dev, int fd) 304 { 305 VuServer *server; 306 g_assert(vu_dev); 307 g_assert(fd >= 0); 308 309 server = container_of(vu_dev, VuServer, vu_dev); 310 311 VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd); 312 313 if (!vu_fd_watch) { 314 return; 315 } 316 aio_set_fd_handler(server->ctx, fd, NULL, NULL, NULL, NULL, NULL); 317 318 QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next); 319 g_free(vu_fd_watch); 320 } 321 322 323 static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc, 324 gpointer opaque) 325 { 326 VuServer *server = opaque; 327 328 if (server->sioc) { 329 warn_report("Only one vhost-user client is allowed to " 330 "connect the server one time"); 331 return; 332 } 333 334 if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb, 335 vu_message_read, set_watch, remove_watch, server->vu_iface)) { 336 error_report("Failed to initialize libvhost-user"); 337 return; 338 } 339 340 /* 341 * Unset the callback function for network listener to make another 342 * vhost-user client keeping waiting until this client disconnects 343 */ 344 qio_net_listener_set_client_func(server->listener, 345 NULL, 346 NULL, 347 NULL); 348 server->sioc = sioc; 349 /* 350 * Increase the object reference, so sioc will not freed by 351 * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc)) 352 */ 353 object_ref(OBJECT(server->sioc)); 354 qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client"); 355 server->ioc = QIO_CHANNEL(sioc); 356 object_ref(OBJECT(server->ioc)); 357 358 /* TODO vu_message_write() spins if non-blocking! */ 359 qio_channel_set_blocking(server->ioc, false, NULL); 360 361 qio_channel_set_follow_coroutine_ctx(server->ioc, true); 362 363 vhost_user_server_attach_aio_context(server, server->ctx); 364 } 365 366 /* server->ctx acquired by caller */ 367 void vhost_user_server_stop(VuServer *server) 368 { 369 qemu_bh_delete(server->restart_listener_bh); 370 server->restart_listener_bh = NULL; 371 372 if (server->sioc) { 373 VuFdWatch *vu_fd_watch; 374 375 QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { 376 aio_set_fd_handler(server->ctx, vu_fd_watch->fd, 377 NULL, NULL, NULL, NULL, vu_fd_watch); 378 } 379 380 qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 381 382 AIO_WAIT_WHILE(server->ctx, server->co_trip); 383 } 384 385 if (server->listener) { 386 qio_net_listener_disconnect(server->listener); 387 object_unref(OBJECT(server->listener)); 388 } 389 } 390 391 /* 392 * Allow the next client to connect to the server. Called from a BH in the main 393 * loop. 394 */ 395 static void restart_listener_bh(void *opaque) 396 { 397 VuServer *server = opaque; 398 399 qio_net_listener_set_client_func(server->listener, vu_accept, server, 400 NULL); 401 } 402 403 /* Called with ctx acquired */ 404 void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx) 405 { 406 VuFdWatch *vu_fd_watch; 407 408 server->ctx = ctx; 409 410 if (!server->sioc) { 411 return; 412 } 413 414 QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { 415 aio_set_fd_handler(ctx, vu_fd_watch->fd, kick_handler, NULL, 416 NULL, NULL, vu_fd_watch); 417 } 418 419 if (server->co_trip) { 420 /* 421 * The caller didn't fully shut down co_trip (this can happen on 422 * non-polling drains like in bdrv_graph_wrlock()). This is okay as long 423 * as it no longer tries to shut it down and we're guaranteed to still 424 * be in the same AioContext as before. 425 * 426 * co_ctx can still be NULL if we get multiple calls and only just 427 * scheduled a new coroutine in the else branch. 428 */ 429 AioContext *co_ctx = qemu_coroutine_get_aio_context(server->co_trip); 430 431 assert(!server->quiescing); 432 assert(!co_ctx || co_ctx == ctx); 433 } else { 434 server->co_trip = qemu_coroutine_create(vu_client_trip, server); 435 assert(!server->in_qio_channel_yield); 436 aio_co_schedule(ctx, server->co_trip); 437 } 438 } 439 440 /* Called with server->ctx acquired */ 441 void vhost_user_server_detach_aio_context(VuServer *server) 442 { 443 if (server->sioc) { 444 VuFdWatch *vu_fd_watch; 445 446 QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { 447 aio_set_fd_handler(server->ctx, vu_fd_watch->fd, 448 NULL, NULL, NULL, NULL, vu_fd_watch); 449 } 450 } 451 452 server->ctx = NULL; 453 454 if (server->ioc) { 455 if (server->in_qio_channel_yield) { 456 /* Stop receiving the next vhost-user message */ 457 qio_channel_wake_read(server->ioc); 458 } 459 } 460 } 461 462 bool vhost_user_server_start(VuServer *server, 463 SocketAddress *socket_addr, 464 AioContext *ctx, 465 uint16_t max_queues, 466 const VuDevIface *vu_iface, 467 Error **errp) 468 { 469 QEMUBH *bh; 470 QIONetListener *listener; 471 472 if (socket_addr->type != SOCKET_ADDRESS_TYPE_UNIX && 473 socket_addr->type != SOCKET_ADDRESS_TYPE_FD) { 474 error_setg(errp, "Only socket address types 'unix' and 'fd' are supported"); 475 return false; 476 } 477 478 listener = qio_net_listener_new(); 479 if (qio_net_listener_open_sync(listener, socket_addr, 1, 480 errp) < 0) { 481 object_unref(OBJECT(listener)); 482 return false; 483 } 484 485 bh = qemu_bh_new(restart_listener_bh, server); 486 487 /* zero out unspecified fields */ 488 *server = (VuServer) { 489 .listener = listener, 490 .restart_listener_bh = bh, 491 .vu_iface = vu_iface, 492 .max_queues = max_queues, 493 .ctx = ctx, 494 }; 495 496 qio_net_listener_set_name(server->listener, "vhost-user-backend-listener"); 497 498 qio_net_listener_set_client_func(server->listener, 499 vu_accept, 500 server, 501 NULL); 502 503 QTAILQ_INIT(&server->vu_fd_watches); 504 return true; 505 } 506