1 /* 2 * Sharing QEMU devices via vhost-user protocol 3 * 4 * Copyright (c) Coiby Xu <coiby.xu@gmail.com>. 5 * Copyright (c) 2020 Red Hat, Inc. 6 * 7 * This work is licensed under the terms of the GNU GPL, version 2 or 8 * later. See the COPYING file in the top-level directory. 9 */ 10 #include "qemu/osdep.h" 11 #include "qemu/main-loop.h" 12 #include "qemu/vhost-user-server.h" 13 #include "block/aio-wait.h" 14 15 /* 16 * Theory of operation: 17 * 18 * VuServer is started and stopped by vhost_user_server_start() and 19 * vhost_user_server_stop() from the main loop thread. Starting the server 20 * opens a vhost-user UNIX domain socket and listens for incoming connections. 21 * Only one connection is allowed at a time. 22 * 23 * The connection is handled by the vu_client_trip() coroutine in the 24 * VuServer->ctx AioContext. The coroutine consists of a vu_dispatch() loop 25 * where libvhost-user calls vu_message_read() to receive the next vhost-user 26 * protocol messages over the UNIX domain socket. 27 * 28 * When virtqueues are set up libvhost-user calls set_watch() to monitor kick 29 * fds. These fds are also handled in the VuServer->ctx AioContext. 30 * 31 * Both vu_client_trip() and kick fd monitoring can be stopped by shutting down 32 * the socket connection. Shutting down the socket connection causes 33 * vu_message_read() to fail since no more data can be received from the socket. 34 * After vu_dispatch() fails, vu_client_trip() calls vu_deinit() to stop 35 * libvhost-user before terminating the coroutine. vu_deinit() calls 36 * remove_watch() to stop monitoring kick fds and this stops virtqueue 37 * processing. 38 * 39 * When vu_client_trip() has finished cleaning up it schedules a BH in the main 40 * loop thread to accept the next client connection. 41 * 42 * When libvhost-user detects an error it calls panic_cb() and sets the 43 * dev->broken flag. Both vu_client_trip() and kick fd processing stop when 44 * the dev->broken flag is set. 45 * 46 * It is possible to switch AioContexts using 47 * vhost_user_server_detach_aio_context() and 48 * vhost_user_server_attach_aio_context(). They stop monitoring fds in the old 49 * AioContext and resume monitoring in the new AioContext. The vu_client_trip() 50 * coroutine remains in a yielded state during the switch. This is made 51 * possible by QIOChannel's support for spurious coroutine re-entry in 52 * qio_channel_yield(). The coroutine will restart I/O when re-entered from the 53 * new AioContext. 54 */ 55 56 static void vmsg_close_fds(VhostUserMsg *vmsg) 57 { 58 int i; 59 for (i = 0; i < vmsg->fd_num; i++) { 60 close(vmsg->fds[i]); 61 } 62 } 63 64 static void vmsg_unblock_fds(VhostUserMsg *vmsg) 65 { 66 int i; 67 for (i = 0; i < vmsg->fd_num; i++) { 68 qemu_set_nonblock(vmsg->fds[i]); 69 } 70 } 71 72 static void panic_cb(VuDev *vu_dev, const char *buf) 73 { 74 error_report("vu_panic: %s", buf); 75 } 76 77 static bool coroutine_fn 78 vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg) 79 { 80 struct iovec iov = { 81 .iov_base = (char *)vmsg, 82 .iov_len = VHOST_USER_HDR_SIZE, 83 }; 84 int rc, read_bytes = 0; 85 Error *local_err = NULL; 86 const size_t max_fds = G_N_ELEMENTS(vmsg->fds); 87 VuServer *server = container_of(vu_dev, VuServer, vu_dev); 88 QIOChannel *ioc = server->ioc; 89 90 vmsg->fd_num = 0; 91 if (!ioc) { 92 error_report_err(local_err); 93 goto fail; 94 } 95 96 assert(qemu_in_coroutine()); 97 do { 98 size_t nfds = 0; 99 int *fds = NULL; 100 101 /* 102 * qio_channel_readv_full may have short reads, keeping calling it 103 * until getting VHOST_USER_HDR_SIZE or 0 bytes in total 104 */ 105 rc = qio_channel_readv_full(ioc, &iov, 1, &fds, &nfds, &local_err); 106 if (rc < 0) { 107 if (rc == QIO_CHANNEL_ERR_BLOCK) { 108 assert(local_err == NULL); 109 qio_channel_yield(ioc, G_IO_IN); 110 continue; 111 } else { 112 error_report_err(local_err); 113 goto fail; 114 } 115 } 116 117 if (nfds > 0) { 118 if (vmsg->fd_num + nfds > max_fds) { 119 error_report("A maximum of %zu fds are allowed, " 120 "however got %zu fds now", 121 max_fds, vmsg->fd_num + nfds); 122 g_free(fds); 123 goto fail; 124 } 125 memcpy(vmsg->fds + vmsg->fd_num, fds, nfds * sizeof(vmsg->fds[0])); 126 vmsg->fd_num += nfds; 127 g_free(fds); 128 } 129 130 if (rc == 0) { /* socket closed */ 131 goto fail; 132 } 133 134 iov.iov_base += rc; 135 iov.iov_len -= rc; 136 read_bytes += rc; 137 } while (read_bytes != VHOST_USER_HDR_SIZE); 138 139 /* qio_channel_readv_full will make socket fds blocking, unblock them */ 140 vmsg_unblock_fds(vmsg); 141 if (vmsg->size > sizeof(vmsg->payload)) { 142 error_report("Error: too big message request: %d, " 143 "size: vmsg->size: %u, " 144 "while sizeof(vmsg->payload) = %zu", 145 vmsg->request, vmsg->size, sizeof(vmsg->payload)); 146 goto fail; 147 } 148 149 struct iovec iov_payload = { 150 .iov_base = (char *)&vmsg->payload, 151 .iov_len = vmsg->size, 152 }; 153 if (vmsg->size) { 154 rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err); 155 if (rc != 1) { 156 if (local_err) { 157 error_report_err(local_err); 158 } 159 goto fail; 160 } 161 } 162 163 return true; 164 165 fail: 166 vmsg_close_fds(vmsg); 167 168 return false; 169 } 170 171 static coroutine_fn void vu_client_trip(void *opaque) 172 { 173 VuServer *server = opaque; 174 VuDev *vu_dev = &server->vu_dev; 175 176 while (!vu_dev->broken && vu_dispatch(vu_dev)) { 177 /* Keep running */ 178 } 179 180 vu_deinit(vu_dev); 181 182 /* vu_deinit() should have called remove_watch() */ 183 assert(QTAILQ_EMPTY(&server->vu_fd_watches)); 184 185 object_unref(OBJECT(server->sioc)); 186 server->sioc = NULL; 187 188 object_unref(OBJECT(server->ioc)); 189 server->ioc = NULL; 190 191 server->co_trip = NULL; 192 if (server->restart_listener_bh) { 193 qemu_bh_schedule(server->restart_listener_bh); 194 } 195 aio_wait_kick(); 196 } 197 198 /* 199 * a wrapper for vu_kick_cb 200 * 201 * since aio_dispatch can only pass one user data pointer to the 202 * callback function, pack VuDev and pvt into a struct. Then unpack it 203 * and pass them to vu_kick_cb 204 */ 205 static void kick_handler(void *opaque) 206 { 207 VuFdWatch *vu_fd_watch = opaque; 208 VuDev *vu_dev = vu_fd_watch->vu_dev; 209 210 vu_fd_watch->cb(vu_dev, 0, vu_fd_watch->pvt); 211 212 /* Stop vu_client_trip() if an error occurred in vu_fd_watch->cb() */ 213 if (vu_dev->broken) { 214 VuServer *server = container_of(vu_dev, VuServer, vu_dev); 215 216 qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 217 } 218 } 219 220 static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd) 221 { 222 223 VuFdWatch *vu_fd_watch, *next; 224 QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) { 225 if (vu_fd_watch->fd == fd) { 226 return vu_fd_watch; 227 } 228 } 229 return NULL; 230 } 231 232 static void 233 set_watch(VuDev *vu_dev, int fd, int vu_evt, 234 vu_watch_cb cb, void *pvt) 235 { 236 237 VuServer *server = container_of(vu_dev, VuServer, vu_dev); 238 g_assert(vu_dev); 239 g_assert(fd >= 0); 240 g_assert(cb); 241 242 VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd); 243 244 if (!vu_fd_watch) { 245 VuFdWatch *vu_fd_watch = g_new0(VuFdWatch, 1); 246 247 QTAILQ_INSERT_TAIL(&server->vu_fd_watches, vu_fd_watch, next); 248 249 vu_fd_watch->fd = fd; 250 vu_fd_watch->cb = cb; 251 qemu_set_nonblock(fd); 252 aio_set_fd_handler(server->ioc->ctx, fd, true, kick_handler, 253 NULL, NULL, NULL, vu_fd_watch); 254 vu_fd_watch->vu_dev = vu_dev; 255 vu_fd_watch->pvt = pvt; 256 } 257 } 258 259 260 static void remove_watch(VuDev *vu_dev, int fd) 261 { 262 VuServer *server; 263 g_assert(vu_dev); 264 g_assert(fd >= 0); 265 266 server = container_of(vu_dev, VuServer, vu_dev); 267 268 VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd); 269 270 if (!vu_fd_watch) { 271 return; 272 } 273 aio_set_fd_handler(server->ioc->ctx, fd, true, 274 NULL, NULL, NULL, NULL, NULL); 275 276 QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next); 277 g_free(vu_fd_watch); 278 } 279 280 281 static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc, 282 gpointer opaque) 283 { 284 VuServer *server = opaque; 285 286 if (server->sioc) { 287 warn_report("Only one vhost-user client is allowed to " 288 "connect the server one time"); 289 return; 290 } 291 292 if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb, 293 vu_message_read, set_watch, remove_watch, server->vu_iface)) { 294 error_report("Failed to initialize libvhost-user"); 295 return; 296 } 297 298 /* 299 * Unset the callback function for network listener to make another 300 * vhost-user client keeping waiting until this client disconnects 301 */ 302 qio_net_listener_set_client_func(server->listener, 303 NULL, 304 NULL, 305 NULL); 306 server->sioc = sioc; 307 /* 308 * Increase the object reference, so sioc will not freed by 309 * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc)) 310 */ 311 object_ref(OBJECT(server->sioc)); 312 qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client"); 313 server->ioc = QIO_CHANNEL(sioc); 314 object_ref(OBJECT(server->ioc)); 315 316 /* TODO vu_message_write() spins if non-blocking! */ 317 qio_channel_set_blocking(server->ioc, false, NULL); 318 319 server->co_trip = qemu_coroutine_create(vu_client_trip, server); 320 321 aio_context_acquire(server->ctx); 322 vhost_user_server_attach_aio_context(server, server->ctx); 323 aio_context_release(server->ctx); 324 } 325 326 void vhost_user_server_stop(VuServer *server) 327 { 328 aio_context_acquire(server->ctx); 329 330 qemu_bh_delete(server->restart_listener_bh); 331 server->restart_listener_bh = NULL; 332 333 if (server->sioc) { 334 VuFdWatch *vu_fd_watch; 335 336 QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { 337 aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true, 338 NULL, NULL, NULL, NULL, vu_fd_watch); 339 } 340 341 qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 342 343 AIO_WAIT_WHILE(server->ctx, server->co_trip); 344 } 345 346 aio_context_release(server->ctx); 347 348 if (server->listener) { 349 qio_net_listener_disconnect(server->listener); 350 object_unref(OBJECT(server->listener)); 351 } 352 } 353 354 /* 355 * Allow the next client to connect to the server. Called from a BH in the main 356 * loop. 357 */ 358 static void restart_listener_bh(void *opaque) 359 { 360 VuServer *server = opaque; 361 362 qio_net_listener_set_client_func(server->listener, vu_accept, server, 363 NULL); 364 } 365 366 /* Called with ctx acquired */ 367 void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx) 368 { 369 VuFdWatch *vu_fd_watch; 370 371 server->ctx = ctx; 372 373 if (!server->sioc) { 374 return; 375 } 376 377 qio_channel_attach_aio_context(server->ioc, ctx); 378 379 QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { 380 aio_set_fd_handler(ctx, vu_fd_watch->fd, true, kick_handler, NULL, 381 NULL, NULL, vu_fd_watch); 382 } 383 384 aio_co_schedule(ctx, server->co_trip); 385 } 386 387 /* Called with server->ctx acquired */ 388 void vhost_user_server_detach_aio_context(VuServer *server) 389 { 390 if (server->sioc) { 391 VuFdWatch *vu_fd_watch; 392 393 QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) { 394 aio_set_fd_handler(server->ctx, vu_fd_watch->fd, true, 395 NULL, NULL, NULL, NULL, vu_fd_watch); 396 } 397 398 qio_channel_detach_aio_context(server->ioc); 399 } 400 401 server->ctx = NULL; 402 } 403 404 bool vhost_user_server_start(VuServer *server, 405 SocketAddress *socket_addr, 406 AioContext *ctx, 407 uint16_t max_queues, 408 const VuDevIface *vu_iface, 409 Error **errp) 410 { 411 QEMUBH *bh; 412 QIONetListener *listener; 413 414 if (socket_addr->type != SOCKET_ADDRESS_TYPE_UNIX && 415 socket_addr->type != SOCKET_ADDRESS_TYPE_FD) { 416 error_setg(errp, "Only socket address types 'unix' and 'fd' are supported"); 417 return false; 418 } 419 420 listener = qio_net_listener_new(); 421 if (qio_net_listener_open_sync(listener, socket_addr, 1, 422 errp) < 0) { 423 object_unref(OBJECT(listener)); 424 return false; 425 } 426 427 bh = qemu_bh_new(restart_listener_bh, server); 428 429 /* zero out unspecified fields */ 430 *server = (VuServer) { 431 .listener = listener, 432 .restart_listener_bh = bh, 433 .vu_iface = vu_iface, 434 .max_queues = max_queues, 435 .ctx = ctx, 436 }; 437 438 qio_net_listener_set_name(server->listener, "vhost-user-backend-listener"); 439 440 qio_net_listener_set_client_func(server->listener, 441 vu_accept, 442 server, 443 NULL); 444 445 QTAILQ_INIT(&server->vu_fd_watches); 446 return true; 447 } 448