xref: /openbmc/qemu/util/vhost-user-server.c (revision caf2e8de4ed056acad4fbdb6fe420d8124d38f11)
1 /*
2  * Sharing QEMU devices via vhost-user protocol
3  *
4  * Copyright (c) Coiby Xu <coiby.xu@gmail.com>.
5  * Copyright (c) 2020 Red Hat, Inc.
6  *
7  * This work is licensed under the terms of the GNU GPL, version 2 or
8  * later.  See the COPYING file in the top-level directory.
9  */
10 #include "qemu/osdep.h"
11 #include "qemu/error-report.h"
12 #include "qemu/main-loop.h"
13 #include "qemu/vhost-user-server.h"
14 #include "block/aio-wait.h"
15 
16 /*
17  * Theory of operation:
18  *
19  * VuServer is started and stopped by vhost_user_server_start() and
20  * vhost_user_server_stop() from the main loop thread. Starting the server
21  * opens a vhost-user UNIX domain socket and listens for incoming connections.
22  * Only one connection is allowed at a time.
23  *
24  * The connection is handled by the vu_client_trip() coroutine in the
25  * VuServer->ctx AioContext. The coroutine consists of a vu_dispatch() loop
26  * where libvhost-user calls vu_message_read() to receive the next vhost-user
27  * protocol messages over the UNIX domain socket.
28  *
29  * When virtqueues are set up libvhost-user calls set_watch() to monitor kick
30  * fds. These fds are also handled in the VuServer->ctx AioContext.
31  *
32  * Both vu_client_trip() and kick fd monitoring can be stopped by shutting down
33  * the socket connection. Shutting down the socket connection causes
34  * vu_message_read() to fail since no more data can be received from the socket.
35  * After vu_dispatch() fails, vu_client_trip() calls vu_deinit() to stop
36  * libvhost-user before terminating the coroutine. vu_deinit() calls
37  * remove_watch() to stop monitoring kick fds and this stops virtqueue
38  * processing.
39  *
40  * When vu_client_trip() has finished cleaning up it schedules a BH in the main
41  * loop thread to accept the next client connection.
42  *
43  * When libvhost-user detects an error it calls panic_cb() and sets the
44  * dev->broken flag. Both vu_client_trip() and kick fd processing stop when
45  * the dev->broken flag is set.
46  *
47  * It is possible to switch AioContexts using
48  * vhost_user_server_detach_aio_context() and
49  * vhost_user_server_attach_aio_context(). They stop monitoring fds in the old
50  * AioContext and resume monitoring in the new AioContext. The vu_client_trip()
51  * coroutine remains in a yielded state during the switch. This is made
52  * possible by QIOChannel's support for spurious coroutine re-entry in
53  * qio_channel_yield(). The coroutine will restart I/O when re-entered from the
54  * new AioContext.
55  */
56 
57 static void vmsg_close_fds(VhostUserMsg *vmsg)
58 {
59     int i;
60     for (i = 0; i < vmsg->fd_num; i++) {
61         close(vmsg->fds[i]);
62     }
63 }
64 
65 static bool vmsg_unblock_fds(VhostUserMsg *vmsg, Error **errp)
66 {
67     int i;
68 
69     /*
70      * These messages carry fd used to map memory, not to send/receive messages,
71      * so this operation is useless. In addition, in some systems this
72      * operation can fail (e.g. in macOS setting an fd returned by shm_open()
73      * non-blocking fails with errno = ENOTTY)
74      */
75     if (vmsg->request == VHOST_USER_ADD_MEM_REG ||
76         vmsg->request == VHOST_USER_SET_MEM_TABLE) {
77         return true;
78     }
79 
80     for (i = 0; i < vmsg->fd_num; i++) {
81         if (!qemu_set_blocking(vmsg->fds[i], false, errp)) {
82             return false;
83         }
84     }
85 
86     return true;
87 }
88 
89 static void panic_cb(VuDev *vu_dev, const char *buf)
90 {
91     error_report("vu_panic: %s", buf);
92 }
93 
94 void vhost_user_server_inc_in_flight(VuServer *server)
95 {
96     assert(!server->wait_idle);
97     qatomic_inc(&server->in_flight);
98 }
99 
100 void vhost_user_server_dec_in_flight(VuServer *server)
101 {
102     if (qatomic_fetch_dec(&server->in_flight) == 1) {
103         if (server->wait_idle) {
104             aio_co_wake(server->co_trip);
105         }
106     }
107 }
108 
109 bool vhost_user_server_has_in_flight(VuServer *server)
110 {
111     return qatomic_load_acquire(&server->in_flight) > 0;
112 }
113 
114 static bool coroutine_fn
115 vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
116 {
117     struct iovec iov = {
118         .iov_base = (char *)vmsg,
119         .iov_len = VHOST_USER_HDR_SIZE,
120     };
121     int rc, read_bytes = 0;
122     Error *local_err = NULL;
123     const size_t max_fds = G_N_ELEMENTS(vmsg->fds);
124     VuServer *server = container_of(vu_dev, VuServer, vu_dev);
125     QIOChannel *ioc = server->ioc;
126 
127     vmsg->fd_num = 0;
128     if (!ioc) {
129         goto fail;
130     }
131 
132     assert(qemu_in_coroutine());
133     do {
134         size_t nfds = 0;
135         int *fds = NULL;
136 
137         /*
138          * qio_channel_readv_full may have short reads, keeping calling it
139          * until getting VHOST_USER_HDR_SIZE or 0 bytes in total
140          */
141         rc = qio_channel_readv_full(ioc, &iov, 1, &fds, &nfds, 0, &local_err);
142         if (rc < 0) {
143             if (rc == QIO_CHANNEL_ERR_BLOCK) {
144                 assert(local_err == NULL);
145                 if (server->ctx) {
146                     server->in_qio_channel_yield = true;
147                     qio_channel_yield(ioc, G_IO_IN);
148                     server->in_qio_channel_yield = false;
149                 } else {
150                     return false;
151                 }
152                 continue;
153             } else {
154                 error_report_err(local_err);
155                 goto fail;
156             }
157         }
158 
159         if (nfds > 0) {
160             if (vmsg->fd_num + nfds > max_fds) {
161                 error_report("A maximum of %zu fds are allowed, "
162                              "however got %zu fds now",
163                              max_fds, vmsg->fd_num + nfds);
164                 g_free(fds);
165                 goto fail;
166             }
167             memcpy(vmsg->fds + vmsg->fd_num, fds, nfds * sizeof(vmsg->fds[0]));
168             vmsg->fd_num += nfds;
169             g_free(fds);
170         }
171 
172         if (rc == 0) { /* socket closed */
173             goto fail;
174         }
175 
176         iov.iov_base += rc;
177         iov.iov_len -= rc;
178         read_bytes += rc;
179     } while (read_bytes != VHOST_USER_HDR_SIZE);
180 
181     /* qio_channel_readv_full will make socket fds blocking, unblock them */
182     if (!vmsg_unblock_fds(vmsg, &local_err)) {
183         error_report_err(local_err);
184         goto fail;
185     }
186     if (vmsg->size > sizeof(vmsg->payload)) {
187         error_report("Error: too big message request: %d, "
188                      "size: vmsg->size: %u, "
189                      "while sizeof(vmsg->payload) = %zu",
190                      vmsg->request, vmsg->size, sizeof(vmsg->payload));
191         goto fail;
192     }
193 
194     struct iovec iov_payload = {
195         .iov_base = (char *)&vmsg->payload,
196         .iov_len = vmsg->size,
197     };
198     if (vmsg->size) {
199         rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err);
200         if (rc != 1) {
201             if (local_err) {
202                 error_report_err(local_err);
203             }
204             goto fail;
205         }
206     }
207 
208     return true;
209 
210 fail:
211     vmsg_close_fds(vmsg);
212 
213     return false;
214 }
215 
216 static coroutine_fn void vu_client_trip(void *opaque)
217 {
218     VuServer *server = opaque;
219     VuDev *vu_dev = &server->vu_dev;
220 
221     while (!vu_dev->broken) {
222         if (server->quiescing) {
223             server->co_trip = NULL;
224             aio_wait_kick();
225             return;
226         }
227         /* vu_dispatch() returns false if server->ctx went away */
228         if (!vu_dispatch(vu_dev) && server->ctx) {
229             break;
230         }
231     }
232 
233     if (vhost_user_server_has_in_flight(server)) {
234         /* Wait for requests to complete before we can unmap the memory */
235         server->wait_idle = true;
236         qemu_coroutine_yield();
237         server->wait_idle = false;
238     }
239     assert(!vhost_user_server_has_in_flight(server));
240 
241     vu_deinit(vu_dev);
242 
243     /* vu_deinit() should have called remove_watch() */
244     assert(QTAILQ_EMPTY(&server->vu_fd_watches));
245 
246     object_unref(OBJECT(server->sioc));
247     server->sioc = NULL;
248 
249     object_unref(OBJECT(server->ioc));
250     server->ioc = NULL;
251 
252     server->co_trip = NULL;
253     if (server->restart_listener_bh) {
254         qemu_bh_schedule(server->restart_listener_bh);
255     }
256     aio_wait_kick();
257 }
258 
259 /*
260  * a wrapper for vu_kick_cb
261  *
262  * since aio_dispatch can only pass one user data pointer to the
263  * callback function, pack VuDev and pvt into a struct. Then unpack it
264  * and pass them to vu_kick_cb
265  */
266 static void kick_handler(void *opaque)
267 {
268     VuFdWatch *vu_fd_watch = opaque;
269     VuDev *vu_dev = vu_fd_watch->vu_dev;
270 
271     vu_fd_watch->cb(vu_dev, 0, vu_fd_watch->pvt);
272 
273     /* Stop vu_client_trip() if an error occurred in vu_fd_watch->cb() */
274     if (vu_dev->broken) {
275         VuServer *server = container_of(vu_dev, VuServer, vu_dev);
276 
277         qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
278     }
279 }
280 
281 static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd)
282 {
283 
284     VuFdWatch *vu_fd_watch, *next;
285     QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) {
286         if (vu_fd_watch->fd == fd) {
287             return vu_fd_watch;
288         }
289     }
290     return NULL;
291 }
292 
293 static void
294 set_watch(VuDev *vu_dev, int fd, int vu_evt,
295           vu_watch_cb cb, void *pvt)
296 {
297 
298     VuServer *server = container_of(vu_dev, VuServer, vu_dev);
299     g_assert(vu_dev);
300     g_assert(fd >= 0);
301     g_assert(cb);
302 
303     VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd);
304 
305     if (!vu_fd_watch) {
306         vu_fd_watch = g_new0(VuFdWatch, 1);
307 
308         QTAILQ_INSERT_TAIL(&server->vu_fd_watches, vu_fd_watch, next);
309 
310         vu_fd_watch->fd = fd;
311         vu_fd_watch->cb = cb;
312         /* TODO: handle error more gracefully than aborting */
313         qemu_set_blocking(fd, false, &error_abort);
314         aio_set_fd_handler(server->ctx, fd, kick_handler,
315                            NULL, NULL, NULL, vu_fd_watch);
316         vu_fd_watch->vu_dev = vu_dev;
317         vu_fd_watch->pvt = pvt;
318     }
319 }
320 
321 
322 static void remove_watch(VuDev *vu_dev, int fd)
323 {
324     VuServer *server;
325     g_assert(vu_dev);
326     g_assert(fd >= 0);
327 
328     server = container_of(vu_dev, VuServer, vu_dev);
329 
330     VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd);
331 
332     if (!vu_fd_watch) {
333         return;
334     }
335     aio_set_fd_handler(server->ctx, fd, NULL, NULL, NULL, NULL, NULL);
336 
337     QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next);
338     g_free(vu_fd_watch);
339 }
340 
341 
342 static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
343                       gpointer opaque)
344 {
345     VuServer *server = opaque;
346     Error *local_err = NULL;
347 
348     if (server->sioc) {
349         warn_report("Only one vhost-user client is allowed to "
350                     "connect the server one time");
351         return;
352     }
353 
354     if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb,
355                  vu_message_read, set_watch, remove_watch, server->vu_iface)) {
356         error_report("Failed to initialize libvhost-user");
357         return;
358     }
359 
360     /*
361      * Unset the callback function for network listener to make another
362      * vhost-user client keeping waiting until this client disconnects
363      */
364     qio_net_listener_set_client_func(server->listener,
365                                      NULL,
366                                      NULL,
367                                      NULL);
368     server->sioc = sioc;
369     /*
370      * Increase the object reference, so sioc will not freed by
371      * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc))
372      */
373     object_ref(OBJECT(server->sioc));
374     qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
375     server->ioc = QIO_CHANNEL(sioc);
376     object_ref(OBJECT(server->ioc));
377 
378     /* TODO vu_message_write() spins if non-blocking! */
379     if (!qio_channel_set_blocking(server->ioc, false, &local_err)) {
380         error_report_err(local_err);
381         vu_deinit(&server->vu_dev);
382         return;
383     }
384 
385     qio_channel_set_follow_coroutine_ctx(server->ioc, true);
386 
387     vhost_user_server_attach_aio_context(server, server->ctx);
388 }
389 
390 /* server->ctx acquired by caller */
391 void vhost_user_server_stop(VuServer *server)
392 {
393     qemu_bh_delete(server->restart_listener_bh);
394     server->restart_listener_bh = NULL;
395 
396     if (server->sioc) {
397         VuFdWatch *vu_fd_watch;
398 
399         QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
400             aio_set_fd_handler(server->ctx, vu_fd_watch->fd,
401                                NULL, NULL, NULL, NULL, vu_fd_watch);
402         }
403 
404         qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
405 
406         AIO_WAIT_WHILE(server->ctx, server->co_trip);
407     }
408 
409     if (server->listener) {
410         qio_net_listener_disconnect(server->listener);
411         object_unref(OBJECT(server->listener));
412     }
413 }
414 
415 /*
416  * Allow the next client to connect to the server. Called from a BH in the main
417  * loop.
418  */
419 static void restart_listener_bh(void *opaque)
420 {
421     VuServer *server = opaque;
422 
423     qio_net_listener_set_client_func(server->listener, vu_accept, server,
424                                      NULL);
425 }
426 
427 /* Called with ctx acquired */
428 void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx)
429 {
430     VuFdWatch *vu_fd_watch;
431 
432     server->ctx = ctx;
433 
434     if (!server->sioc) {
435         return;
436     }
437 
438     QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
439         aio_set_fd_handler(ctx, vu_fd_watch->fd, kick_handler, NULL,
440                            NULL, NULL, vu_fd_watch);
441     }
442 
443     if (server->co_trip) {
444         /*
445          * The caller didn't fully shut down co_trip (this can happen on
446          * non-polling drains like in bdrv_graph_wrlock()). This is okay as long
447          * as it no longer tries to shut it down and we're guaranteed to still
448          * be in the same AioContext as before.
449          *
450          * co_ctx can still be NULL if we get multiple calls and only just
451          * scheduled a new coroutine in the else branch.
452          */
453         AioContext *co_ctx = qemu_coroutine_get_aio_context(server->co_trip);
454 
455         assert(!server->quiescing);
456         assert(!co_ctx || co_ctx == ctx);
457     } else {
458         server->co_trip = qemu_coroutine_create(vu_client_trip, server);
459         assert(!server->in_qio_channel_yield);
460         aio_co_schedule(ctx, server->co_trip);
461     }
462 }
463 
464 /* Called with server->ctx acquired */
465 void vhost_user_server_detach_aio_context(VuServer *server)
466 {
467     if (server->sioc) {
468         VuFdWatch *vu_fd_watch;
469 
470         QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
471             aio_set_fd_handler(server->ctx, vu_fd_watch->fd,
472                                NULL, NULL, NULL, NULL, vu_fd_watch);
473         }
474     }
475 
476     server->ctx = NULL;
477 
478     if (server->ioc) {
479         if (server->in_qio_channel_yield) {
480             /* Stop receiving the next vhost-user message */
481             qio_channel_wake_read(server->ioc);
482         }
483     }
484 }
485 
486 bool vhost_user_server_start(VuServer *server,
487                              SocketAddress *socket_addr,
488                              AioContext *ctx,
489                              uint16_t max_queues,
490                              const VuDevIface *vu_iface,
491                              Error **errp)
492 {
493     QEMUBH *bh;
494     QIONetListener *listener;
495 
496     if (socket_addr->type != SOCKET_ADDRESS_TYPE_UNIX &&
497         socket_addr->type != SOCKET_ADDRESS_TYPE_FD) {
498         error_setg(errp, "Only socket address types 'unix' and 'fd' are supported");
499         return false;
500     }
501 
502     listener = qio_net_listener_new();
503     if (qio_net_listener_open_sync(listener, socket_addr, 1,
504                                    errp) < 0) {
505         object_unref(OBJECT(listener));
506         return false;
507     }
508 
509     bh = qemu_bh_new(restart_listener_bh, server);
510 
511     /* zero out unspecified fields */
512     *server = (VuServer) {
513         .listener              = listener,
514         .restart_listener_bh   = bh,
515         .vu_iface              = vu_iface,
516         .max_queues            = max_queues,
517         .ctx                   = ctx,
518     };
519 
520     qio_net_listener_set_name(server->listener, "vhost-user-backend-listener");
521 
522     qio_net_listener_set_client_func(server->listener,
523                                      vu_accept,
524                                      server,
525                                      NULL);
526 
527     QTAILQ_INIT(&server->vu_fd_watches);
528     return true;
529 }
530