xref: /openbmc/qemu/hw/vfio-user/proxy.c (revision c3ae83117dfb198eae7f8afe8609e69674732cdb)
1 /*
2  * vfio protocol over a UNIX socket.
3  *
4  * Copyright © 2018, 2021 Oracle and/or its affiliates.
5  *
6  * SPDX-License-Identifier: GPL-2.0-or-later
7  */
8 
9 #include "qemu/osdep.h"
10 #include <sys/ioctl.h>
11 
12 #include "hw/vfio/vfio-device.h"
13 #include "hw/vfio-user/proxy.h"
14 #include "hw/vfio-user/trace.h"
15 #include "qapi/error.h"
16 #include "qobject/qbool.h"
17 #include "qobject/qdict.h"
18 #include "qobject/qjson.h"
19 #include "qobject/qnum.h"
20 #include "qemu/error-report.h"
21 #include "qemu/lockable.h"
22 #include "qemu/main-loop.h"
23 #include "qemu/thread.h"
24 #include "system/iothread.h"
25 
26 static IOThread *vfio_user_iothread;
27 
28 static void vfio_user_shutdown(VFIOUserProxy *proxy);
29 static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
30                                      VFIOUserFDs *fds);
31 static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg);
32 
33 static void vfio_user_recv(void *opaque);
34 static void vfio_user_send(void *opaque);
35 
36 static void vfio_user_request(void *opaque);
37 
38 static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
39 {
40     hdr->flags |= VFIO_USER_ERROR;
41     hdr->error_reply = err;
42 }
43 
44 /*
45  * Functions called by main, CPU, or iothread threads
46  */
47 
48 static void vfio_user_shutdown(VFIOUserProxy *proxy)
49 {
50     qio_channel_shutdown(proxy->ioc, QIO_CHANNEL_SHUTDOWN_READ, NULL);
51     qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL,
52                                    proxy->ctx, NULL, NULL);
53 }
54 
55 /*
56  * Same return values as qio_channel_writev_full():
57  *
58  * QIO_CHANNEL_ERR_BLOCK: *errp not set
59  * -1: *errp will be populated
60  * otherwise: bytes written
61  */
62 static ssize_t vfio_user_send_qio(VFIOUserProxy *proxy, VFIOUserMsg *msg,
63                                   Error **errp)
64 {
65     VFIOUserFDs *fds =  msg->fds;
66     struct iovec iov = {
67         .iov_base = msg->hdr,
68         .iov_len = msg->hdr->size,
69     };
70     size_t numfds = 0;
71     int *fdp = NULL;
72     ssize_t ret;
73 
74     if (fds != NULL && fds->send_fds != 0) {
75         numfds = fds->send_fds;
76         fdp = fds->fds;
77     }
78 
79     ret = qio_channel_writev_full(proxy->ioc, &iov, 1, fdp, numfds, 0, errp);
80 
81     if (ret == -1) {
82         vfio_user_set_error(msg->hdr, EIO);
83         vfio_user_shutdown(proxy);
84     }
85     trace_vfio_user_send_write(msg->hdr->id, ret);
86 
87     return ret;
88 }
89 
90 static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
91                                      VFIOUserFDs *fds)
92 {
93     VFIOUserMsg *msg;
94 
95     msg = QTAILQ_FIRST(&proxy->free);
96     if (msg != NULL) {
97         QTAILQ_REMOVE(&proxy->free, msg, next);
98     } else {
99         msg = g_malloc0(sizeof(*msg));
100         qemu_cond_init(&msg->cv);
101     }
102 
103     msg->hdr = hdr;
104     msg->fds = fds;
105     return msg;
106 }
107 
108 /*
109  * Recycle a message list entry to the free list.
110  */
111 static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg)
112 {
113     if (msg->type == VFIO_MSG_NONE) {
114         error_printf("vfio_user_recycle - freeing free msg\n");
115         return;
116     }
117 
118     /* free msg buffer if no one is waiting to consume the reply */
119     if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
120         g_free(msg->hdr);
121         if (msg->fds != NULL) {
122             g_free(msg->fds);
123         }
124     }
125 
126     msg->type = VFIO_MSG_NONE;
127     msg->hdr = NULL;
128     msg->fds = NULL;
129     msg->complete = false;
130     msg->pending = false;
131     QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
132 }
133 
134 VFIOUserFDs *vfio_user_getfds(int numfds)
135 {
136     VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
137 
138     fds->fds = (int *)((char *)fds + sizeof(*fds));
139 
140     return fds;
141 }
142 
143 /*
144  * Functions only called by iothread
145  */
146 
147 /*
148  * Process a received message.
149  */
150 static void vfio_user_process(VFIOUserProxy *proxy, VFIOUserMsg *msg,
151                               bool isreply)
152 {
153 
154     /*
155      * Replies signal a waiter, if none just check for errors
156      * and free the message buffer.
157      *
158      * Requests get queued for the BH.
159      */
160     if (isreply) {
161         msg->complete = true;
162         if (msg->type == VFIO_MSG_WAIT) {
163             qemu_cond_signal(&msg->cv);
164         } else {
165             if (msg->hdr->flags & VFIO_USER_ERROR) {
166                 error_printf("vfio_user_process: error reply on async ");
167                 error_printf("request command %x error %s\n",
168                              msg->hdr->command,
169                              strerror(msg->hdr->error_reply));
170             }
171             /* youngest nowait msg has been ack'd */
172             if (proxy->last_nowait == msg) {
173                 proxy->last_nowait = NULL;
174             }
175             vfio_user_recycle(proxy, msg);
176         }
177     } else {
178         QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
179         qemu_bh_schedule(proxy->req_bh);
180     }
181 }
182 
183 /*
184  * Complete a partial message read
185  */
186 static int vfio_user_complete(VFIOUserProxy *proxy, Error **errp)
187 {
188     VFIOUserMsg *msg = proxy->part_recv;
189     size_t msgleft = proxy->recv_left;
190     bool isreply;
191     char *data;
192     int ret;
193 
194     data = (char *)msg->hdr + (msg->hdr->size - msgleft);
195     while (msgleft > 0) {
196         ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
197 
198         /* error or would block */
199         if (ret <= 0) {
200             /* try for rest on next iternation */
201             if (ret == QIO_CHANNEL_ERR_BLOCK) {
202                 proxy->recv_left = msgleft;
203             }
204             return ret;
205         }
206         trace_vfio_user_recv_read(msg->hdr->id, ret);
207 
208         msgleft -= ret;
209         data += ret;
210     }
211 
212     /*
213      * Read complete message, process it.
214      */
215     proxy->part_recv = NULL;
216     proxy->recv_left = 0;
217     isreply = (msg->hdr->flags & VFIO_USER_TYPE) == VFIO_USER_REPLY;
218     vfio_user_process(proxy, msg, isreply);
219 
220     /* return positive value */
221     return 1;
222 }
223 
224 /*
225  * Receive and process one incoming message.
226  *
227  * For replies, find matching outgoing request and wake any waiters.
228  * For requests, queue in incoming list and run request BH.
229  */
230 static int vfio_user_recv_one(VFIOUserProxy *proxy, Error **errp)
231 {
232     VFIOUserMsg *msg = NULL;
233     g_autofree int *fdp = NULL;
234     VFIOUserFDs *reqfds;
235     VFIOUserHdr hdr;
236     struct iovec iov = {
237         .iov_base = &hdr,
238         .iov_len = sizeof(hdr),
239     };
240     bool isreply = false;
241     int i, ret;
242     size_t msgleft, numfds = 0;
243     char *data = NULL;
244     char *buf = NULL;
245 
246     /*
247      * Complete any partial reads
248      */
249     if (proxy->part_recv != NULL) {
250         ret = vfio_user_complete(proxy, errp);
251 
252         /* still not complete, try later */
253         if (ret == QIO_CHANNEL_ERR_BLOCK) {
254             return ret;
255         }
256 
257         if (ret <= 0) {
258             goto fatal;
259         }
260         /* else fall into reading another msg */
261     }
262 
263     /*
264      * Read header
265      */
266     ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, 0,
267                                  errp);
268     if (ret == QIO_CHANNEL_ERR_BLOCK) {
269         return ret;
270     }
271 
272     /* read error or other side closed connection */
273     if (ret <= 0) {
274         goto fatal;
275     }
276 
277     if (ret < sizeof(hdr)) {
278         error_setg(errp, "short read of header");
279         goto fatal;
280     }
281 
282     /*
283      * Validate header
284      */
285     if (hdr.size < sizeof(VFIOUserHdr)) {
286         error_setg(errp, "bad header size");
287         goto fatal;
288     }
289     switch (hdr.flags & VFIO_USER_TYPE) {
290     case VFIO_USER_REQUEST:
291         isreply = false;
292         break;
293     case VFIO_USER_REPLY:
294         isreply = true;
295         break;
296     default:
297         error_setg(errp, "unknown message type");
298         goto fatal;
299     }
300     trace_vfio_user_recv_hdr(proxy->sockname, hdr.id, hdr.command, hdr.size,
301                              hdr.flags);
302 
303     /*
304      * For replies, find the matching pending request.
305      * For requests, reap incoming FDs.
306      */
307     if (isreply) {
308         QTAILQ_FOREACH(msg, &proxy->pending, next) {
309             if (hdr.id == msg->id) {
310                 break;
311             }
312         }
313         if (msg == NULL) {
314             error_setg(errp, "unexpected reply");
315             goto err;
316         }
317         QTAILQ_REMOVE(&proxy->pending, msg, next);
318 
319         /*
320          * Process any received FDs
321          */
322         if (numfds != 0) {
323             if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
324                 error_setg(errp, "unexpected FDs");
325                 goto err;
326             }
327             msg->fds->recv_fds = numfds;
328             memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
329         }
330     } else {
331         if (numfds != 0) {
332             reqfds = vfio_user_getfds(numfds);
333             memcpy(reqfds->fds, fdp, numfds * sizeof(int));
334         } else {
335             reqfds = NULL;
336         }
337     }
338 
339     /*
340      * Put the whole message into a single buffer.
341      */
342     if (isreply) {
343         if (hdr.size > msg->rsize) {
344             error_setg(errp, "reply larger than recv buffer");
345             goto err;
346         }
347         *msg->hdr = hdr;
348         data = (char *)msg->hdr + sizeof(hdr);
349     } else {
350         if (hdr.size > proxy->max_xfer_size + sizeof(VFIOUserDMARW)) {
351             error_setg(errp, "vfio_user_recv request larger than max");
352             goto err;
353         }
354         buf = g_malloc0(hdr.size);
355         memcpy(buf, &hdr, sizeof(hdr));
356         data = buf + sizeof(hdr);
357         msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
358         msg->type = VFIO_MSG_REQ;
359     }
360 
361     /*
362      * Read rest of message.
363      */
364     msgleft = hdr.size - sizeof(hdr);
365     while (msgleft > 0) {
366         ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
367 
368         /* prepare to complete read on next iternation */
369         if (ret == QIO_CHANNEL_ERR_BLOCK) {
370             proxy->part_recv = msg;
371             proxy->recv_left = msgleft;
372             return ret;
373         }
374 
375         if (ret <= 0) {
376             goto fatal;
377         }
378         trace_vfio_user_recv_read(hdr.id, ret);
379 
380         msgleft -= ret;
381         data += ret;
382     }
383 
384     vfio_user_process(proxy, msg, isreply);
385     return 0;
386 
387     /*
388      * fatal means the other side closed or we don't trust the stream
389      * err means this message is corrupt
390      */
391 fatal:
392     vfio_user_shutdown(proxy);
393     proxy->state = VFIO_PROXY_ERROR;
394 
395     /* set error if server side closed */
396     if (ret == 0) {
397         error_setg(errp, "server closed socket");
398     }
399 
400 err:
401     for (i = 0; i < numfds; i++) {
402         close(fdp[i]);
403     }
404     if (isreply && msg != NULL) {
405         /* force an error to keep sending thread from hanging */
406         vfio_user_set_error(msg->hdr, EINVAL);
407         msg->complete = true;
408         qemu_cond_signal(&msg->cv);
409     }
410     return -1;
411 }
412 
413 static void vfio_user_recv(void *opaque)
414 {
415     VFIOUserProxy *proxy = opaque;
416 
417     QEMU_LOCK_GUARD(&proxy->lock);
418 
419     if (proxy->state == VFIO_PROXY_CONNECTED) {
420         Error *local_err = NULL;
421 
422         while (vfio_user_recv_one(proxy, &local_err) == 0) {
423             ;
424         }
425 
426         if (local_err != NULL) {
427             error_report_err(local_err);
428         }
429     }
430 }
431 
432 /*
433  * Send a single message, same return semantics as vfio_user_send_qio().
434  *
435  * Sent async messages are freed, others are moved to pending queue.
436  */
437 static ssize_t vfio_user_send_one(VFIOUserProxy *proxy, Error **errp)
438 {
439     VFIOUserMsg *msg;
440     ssize_t ret;
441 
442     msg = QTAILQ_FIRST(&proxy->outgoing);
443     ret = vfio_user_send_qio(proxy, msg, errp);
444     if (ret < 0) {
445         return ret;
446     }
447 
448     QTAILQ_REMOVE(&proxy->outgoing, msg, next);
449     proxy->num_outgoing--;
450     if (msg->type == VFIO_MSG_ASYNC) {
451         vfio_user_recycle(proxy, msg);
452     } else {
453         QTAILQ_INSERT_TAIL(&proxy->pending, msg, next);
454         msg->pending = true;
455     }
456 
457     return ret;
458 }
459 
460 /*
461  * Send messages from outgoing queue when the socket buffer has space.
462  * If we deplete 'outgoing', remove ourselves from the poll list.
463  */
464 static void vfio_user_send(void *opaque)
465 {
466     VFIOUserProxy *proxy = opaque;
467 
468     QEMU_LOCK_GUARD(&proxy->lock);
469 
470     if (proxy->state == VFIO_PROXY_CONNECTED) {
471         while (!QTAILQ_EMPTY(&proxy->outgoing)) {
472             Error *local_err = NULL;
473             int ret;
474 
475             ret = vfio_user_send_one(proxy, &local_err);
476 
477             if (ret == QIO_CHANNEL_ERR_BLOCK) {
478                 return;
479             } else if (ret == -1) {
480                 error_report_err(local_err);
481                 return;
482             }
483         }
484         qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
485                                        vfio_user_recv, NULL, NULL, proxy);
486 
487         /* queue empty - send any pending multi write msgs */
488         if (proxy->wr_multi != NULL) {
489             vfio_user_flush_multi(proxy);
490         }
491     }
492 }
493 
494 static void vfio_user_close_cb(void *opaque)
495 {
496     VFIOUserProxy *proxy = opaque;
497 
498     QEMU_LOCK_GUARD(&proxy->lock);
499 
500     proxy->state = VFIO_PROXY_CLOSED;
501     qemu_cond_signal(&proxy->close_cv);
502 }
503 
504 
505 /*
506  * Functions called by main or CPU threads
507  */
508 
509 /*
510  * Process incoming requests.
511  *
512  * The bus-specific callback has the form:
513  *    request(opaque, msg)
514  * where 'opaque' was specified in vfio_user_set_handler
515  * and 'msg' is the inbound message.
516  *
517  * The callback is responsible for disposing of the message buffer,
518  * usually by re-using it when calling vfio_send_reply or vfio_send_error,
519  * both of which free their message buffer when the reply is sent.
520  *
521  * If the callback uses a new buffer, it needs to free the old one.
522  */
523 static void vfio_user_request(void *opaque)
524 {
525     VFIOUserProxy *proxy = opaque;
526     VFIOUserMsgQ new, free;
527     VFIOUserMsg *msg, *m1;
528 
529     /* reap all incoming */
530     QTAILQ_INIT(&new);
531     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
532         QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
533             QTAILQ_REMOVE(&proxy->incoming, msg, next);
534             QTAILQ_INSERT_TAIL(&new, msg, next);
535         }
536     }
537 
538     /* process list */
539     QTAILQ_INIT(&free);
540     QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
541         QTAILQ_REMOVE(&new, msg, next);
542         trace_vfio_user_recv_request(msg->hdr->command);
543         proxy->request(proxy->req_arg, msg);
544         QTAILQ_INSERT_HEAD(&free, msg, next);
545     }
546 
547     /* free list */
548     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
549         QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
550             vfio_user_recycle(proxy, msg);
551         }
552     }
553 }
554 
555 /*
556  * Messages are queued onto the proxy's outgoing list.
557  *
558  * It handles 3 types of messages:
559  *
560  * async messages - replies and posted writes
561  *
562  * There will be no reply from the server, so message
563  * buffers are freed after they're sent.
564  *
565  * nowait messages - map/unmap during address space transactions
566  *
567  * These are also sent async, but a reply is expected so that
568  * vfio_wait_reqs() can wait for the youngest nowait request.
569  * They transition from the outgoing list to the pending list
570  * when sent, and are freed when the reply is received.
571  *
572  * wait messages - all other requests
573  *
574  * The reply to these messages is waited for by their caller.
575  * They also transition from outgoing to pending when sent, but
576  * the message buffer is returned to the caller with the reply
577  * contents.  The caller is responsible for freeing these messages.
578  *
579  * As an optimization, if the outgoing list and the socket send
580  * buffer are empty, the message is sent inline instead of being
581  * added to the outgoing list.  The rest of the transitions are
582  * unchanged.
583  */
584 static bool vfio_user_send_queued(VFIOUserProxy *proxy, VFIOUserMsg *msg,
585                                   Error **errp)
586 {
587     int ret;
588 
589     /* older coalesced writes go first */
590     if (proxy->wr_multi != NULL &&
591         ((msg->hdr->flags & VFIO_USER_TYPE) == VFIO_USER_REQUEST)) {
592         vfio_user_flush_multi(proxy);
593     }
594 
595     /*
596      * Unsent outgoing msgs - add to tail
597      */
598     if (!QTAILQ_EMPTY(&proxy->outgoing)) {
599         QTAILQ_INSERT_TAIL(&proxy->outgoing, msg, next);
600         proxy->num_outgoing++;
601         return true;
602     }
603 
604     /*
605      * Try inline - if blocked, queue it and kick send poller
606      */
607     if (proxy->flags & VFIO_PROXY_FORCE_QUEUED) {
608         ret = QIO_CHANNEL_ERR_BLOCK;
609     } else {
610         ret = vfio_user_send_qio(proxy, msg, errp);
611     }
612 
613     if (ret == QIO_CHANNEL_ERR_BLOCK) {
614         QTAILQ_INSERT_HEAD(&proxy->outgoing, msg, next);
615         proxy->num_outgoing = 1;
616         qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
617                                        vfio_user_recv, proxy->ctx,
618                                        vfio_user_send, proxy);
619         return true;
620     }
621     if (ret == -1) {
622         return false;
623     }
624 
625     /*
626      * Sent - free async, add others to pending
627      */
628     if (msg->type == VFIO_MSG_ASYNC) {
629         vfio_user_recycle(proxy, msg);
630     } else {
631         QTAILQ_INSERT_TAIL(&proxy->pending, msg, next);
632         msg->pending = true;
633     }
634 
635     return true;
636 }
637 
638 /*
639  * nowait send - vfio_wait_reqs() can wait for it later
640  *
641  * Returns false if we did not successfully receive a reply message, in which
642  * case @errp will be populated.
643  *
644  * In either case, ownership of @hdr and @fds is taken, and the caller must
645  * *not* free them itself.
646  */
647 bool vfio_user_send_nowait(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
648                            VFIOUserFDs *fds, int rsize, Error **errp)
649 {
650     VFIOUserMsg *msg;
651 
652     QEMU_LOCK_GUARD(&proxy->lock);
653 
654     msg = vfio_user_getmsg(proxy, hdr, fds);
655     msg->id = hdr->id;
656     msg->rsize = rsize ? rsize : hdr->size;
657     msg->type = VFIO_MSG_NOWAIT;
658 
659     if (hdr->flags & VFIO_USER_NO_REPLY) {
660         error_setg_errno(errp, EINVAL, "%s on NO_REPLY message", __func__);
661         vfio_user_recycle(proxy, msg);
662         return false;
663     }
664 
665     if (!vfio_user_send_queued(proxy, msg, errp)) {
666         vfio_user_recycle(proxy, msg);
667         return false;
668     }
669 
670     proxy->last_nowait = msg;
671 
672     return true;
673 }
674 
675 /*
676  * Returns false if we did not successfully receive a reply message, in which
677  * case @errp will be populated.
678  *
679  * In either case, the caller must free @hdr and @fds if needed.
680  */
681 bool vfio_user_send_wait(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
682                          VFIOUserFDs *fds, int rsize, Error **errp)
683 {
684     VFIOUserMsg *msg;
685     bool ok = false;
686 
687     if (hdr->flags & VFIO_USER_NO_REPLY) {
688         error_setg_errno(errp, EINVAL, "%s on NO_REPLY message", __func__);
689         return false;
690     }
691 
692     qemu_mutex_lock(&proxy->lock);
693 
694     msg = vfio_user_getmsg(proxy, hdr, fds);
695     msg->id = hdr->id;
696     msg->rsize = rsize ? rsize : hdr->size;
697     msg->type = VFIO_MSG_WAIT;
698 
699     ok = vfio_user_send_queued(proxy, msg, errp);
700 
701     if (ok) {
702         while (!msg->complete) {
703             if (!qemu_cond_timedwait(&msg->cv, &proxy->lock,
704                                      proxy->wait_time)) {
705                 VFIOUserMsgQ *list;
706 
707                 list = msg->pending ? &proxy->pending : &proxy->outgoing;
708                 QTAILQ_REMOVE(list, msg, next);
709                 error_setg_errno(errp, ETIMEDOUT,
710                                  "timed out waiting for reply");
711                 ok = false;
712                 break;
713             }
714         }
715     }
716 
717     vfio_user_recycle(proxy, msg);
718 
719     qemu_mutex_unlock(&proxy->lock);
720 
721     return ok;
722 }
723 
724 /*
725  * async send - msg can be queued, but will be freed when sent
726  *
727  * Returns false on failure, in which case @errp will be populated.
728  *
729  * In either case, ownership of @hdr and @fds is taken, and the caller must
730  * *not* free them itself.
731  */
732 bool vfio_user_send_async(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
733                           VFIOUserFDs *fds, Error **errp)
734 {
735     VFIOUserMsg *msg;
736 
737     QEMU_LOCK_GUARD(&proxy->lock);
738 
739     msg = vfio_user_getmsg(proxy, hdr, fds);
740     msg->id = hdr->id;
741     msg->rsize = 0;
742     msg->type = VFIO_MSG_ASYNC;
743 
744     if (!(hdr->flags & (VFIO_USER_NO_REPLY | VFIO_USER_REPLY))) {
745         error_setg_errno(errp, EINVAL, "%s on sync message", __func__);
746         vfio_user_recycle(proxy, msg);
747         return false;
748     }
749 
750     if (!vfio_user_send_queued(proxy, msg, errp)) {
751         vfio_user_recycle(proxy, msg);
752         return false;
753     }
754 
755     return true;
756 }
757 
758 void vfio_user_wait_reqs(VFIOUserProxy *proxy)
759 {
760     VFIOUserMsg *msg;
761 
762     /*
763      * Any DMA map/unmap requests sent in the middle
764      * of a memory region transaction were sent nowait.
765      * Wait for them here.
766      */
767     qemu_mutex_lock(&proxy->lock);
768     if (proxy->last_nowait != NULL) {
769         /*
770          * Change type to WAIT to wait for reply
771          */
772         msg = proxy->last_nowait;
773         msg->type = VFIO_MSG_WAIT;
774         proxy->last_nowait = NULL;
775         while (!msg->complete) {
776             if (!qemu_cond_timedwait(&msg->cv, &proxy->lock,
777                                      proxy->wait_time)) {
778                 VFIOUserMsgQ *list;
779 
780                 list = msg->pending ? &proxy->pending : &proxy->outgoing;
781                 QTAILQ_REMOVE(list, msg, next);
782                 error_printf("vfio_wait_reqs - timed out\n");
783                 break;
784             }
785         }
786 
787         if (msg->hdr->flags & VFIO_USER_ERROR) {
788             error_printf("vfio_user_wait_reqs - error reply on async ");
789             error_printf("request: command %x error %s\n", msg->hdr->command,
790                          strerror(msg->hdr->error_reply));
791         }
792 
793         /*
794          * Change type back to NOWAIT to free
795          */
796         msg->type = VFIO_MSG_NOWAIT;
797         vfio_user_recycle(proxy, msg);
798     }
799 
800     qemu_mutex_unlock(&proxy->lock);
801 }
802 
803 /*
804  * Reply to an incoming request.
805  */
806 void vfio_user_send_reply(VFIOUserProxy *proxy, VFIOUserHdr *hdr, int size)
807 {
808     Error *local_err = NULL;
809 
810     if (size < sizeof(VFIOUserHdr)) {
811         error_printf("%s: size too small", __func__);
812         g_free(hdr);
813         return;
814     }
815 
816     /*
817      * convert header to associated reply
818      */
819     hdr->flags = VFIO_USER_REPLY;
820     hdr->size = size;
821 
822     if (!vfio_user_send_async(proxy, hdr, NULL, &local_err)) {
823         error_report_err(local_err);
824     }
825 }
826 
827 /*
828  * Send an error reply to an incoming request.
829  */
830 void vfio_user_send_error(VFIOUserProxy *proxy, VFIOUserHdr *hdr, int error)
831 {
832     Error *local_err = NULL;
833 
834     /*
835      * convert header to associated reply
836      */
837     hdr->flags = VFIO_USER_REPLY;
838     hdr->flags |= VFIO_USER_ERROR;
839     hdr->error_reply = error;
840     hdr->size = sizeof(*hdr);
841 
842     if (!vfio_user_send_async(proxy, hdr, NULL, &local_err)) {
843         error_report_err(local_err);
844     }
845 }
846 
847 /*
848  * Close FDs erroneously received in an incoming request.
849  */
850 void vfio_user_putfds(VFIOUserMsg *msg)
851 {
852     VFIOUserFDs *fds = msg->fds;
853     int i;
854 
855     for (i = 0; i < fds->recv_fds; i++) {
856         close(fds->fds[i]);
857     }
858     g_free(fds);
859     msg->fds = NULL;
860 }
861 
862 void
863 vfio_user_disable_posted_writes(VFIOUserProxy *proxy)
864 {
865     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
866          proxy->flags |= VFIO_PROXY_NO_POST;
867     }
868 }
869 
870 static QLIST_HEAD(, VFIOUserProxy) vfio_user_sockets =
871     QLIST_HEAD_INITIALIZER(vfio_user_sockets);
872 
873 VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
874 {
875     VFIOUserProxy *proxy;
876     QIOChannelSocket *sioc;
877     QIOChannel *ioc;
878     char *sockname;
879 
880     if (addr->type != SOCKET_ADDRESS_TYPE_UNIX) {
881         error_setg(errp, "vfio_user_connect - bad address family");
882         return NULL;
883     }
884     sockname = addr->u.q_unix.path;
885 
886     sioc = qio_channel_socket_new();
887     ioc = QIO_CHANNEL(sioc);
888     if (qio_channel_socket_connect_sync(sioc, addr, errp) < 0) {
889         object_unref(OBJECT(ioc));
890         return NULL;
891     }
892     qio_channel_set_blocking(ioc, false, NULL);
893 
894     proxy = g_malloc0(sizeof(VFIOUserProxy));
895     proxy->sockname = g_strdup_printf("unix:%s", sockname);
896     proxy->ioc = ioc;
897 
898     /* init defaults */
899     proxy->max_xfer_size = VFIO_USER_DEF_MAX_XFER;
900     proxy->max_send_fds = VFIO_USER_DEF_MAX_FDS;
901     proxy->max_dma = VFIO_USER_DEF_MAP_MAX;
902     proxy->dma_pgsizes = VFIO_USER_DEF_PGSIZE;
903     proxy->max_bitmap = VFIO_USER_DEF_MAX_BITMAP;
904     proxy->migr_pgsize = VFIO_USER_DEF_PGSIZE;
905 
906     proxy->flags = VFIO_PROXY_CLIENT;
907     proxy->state = VFIO_PROXY_CONNECTED;
908 
909     qemu_mutex_init(&proxy->lock);
910     qemu_cond_init(&proxy->close_cv);
911 
912     if (vfio_user_iothread == NULL) {
913         vfio_user_iothread = iothread_create("VFIO user", errp);
914     }
915 
916     proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
917     proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
918 
919     QTAILQ_INIT(&proxy->outgoing);
920     QTAILQ_INIT(&proxy->incoming);
921     QTAILQ_INIT(&proxy->free);
922     QTAILQ_INIT(&proxy->pending);
923     QLIST_INSERT_HEAD(&vfio_user_sockets, proxy, next);
924 
925     return proxy;
926 }
927 
928 void vfio_user_set_handler(VFIODevice *vbasedev,
929                            void (*handler)(void *opaque, VFIOUserMsg *msg),
930                            void *req_arg)
931 {
932     VFIOUserProxy *proxy = vbasedev->proxy;
933 
934     proxy->request = handler;
935     proxy->req_arg = req_arg;
936     qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
937                                    vfio_user_recv, NULL, NULL, proxy);
938 }
939 
940 void vfio_user_disconnect(VFIOUserProxy *proxy)
941 {
942     VFIOUserMsg *r1, *r2;
943 
944     qemu_mutex_lock(&proxy->lock);
945 
946     /* our side is quitting */
947     if (proxy->state == VFIO_PROXY_CONNECTED) {
948         vfio_user_shutdown(proxy);
949         if (!QTAILQ_EMPTY(&proxy->pending)) {
950             error_printf("vfio_user_disconnect: outstanding requests\n");
951         }
952     }
953     object_unref(OBJECT(proxy->ioc));
954     proxy->ioc = NULL;
955     qemu_bh_delete(proxy->req_bh);
956     proxy->req_bh = NULL;
957 
958     proxy->state = VFIO_PROXY_CLOSING;
959     QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
960         qemu_cond_destroy(&r1->cv);
961         QTAILQ_REMOVE(&proxy->outgoing, r1, next);
962         g_free(r1);
963     }
964     QTAILQ_FOREACH_SAFE(r1, &proxy->incoming, next, r2) {
965         qemu_cond_destroy(&r1->cv);
966         QTAILQ_REMOVE(&proxy->incoming, r1, next);
967         g_free(r1);
968     }
969     QTAILQ_FOREACH_SAFE(r1, &proxy->pending, next, r2) {
970         qemu_cond_destroy(&r1->cv);
971         QTAILQ_REMOVE(&proxy->pending, r1, next);
972         g_free(r1);
973     }
974     QTAILQ_FOREACH_SAFE(r1, &proxy->free, next, r2) {
975         qemu_cond_destroy(&r1->cv);
976         QTAILQ_REMOVE(&proxy->free, r1, next);
977         g_free(r1);
978     }
979 
980     /*
981      * Make sure the iothread isn't blocking anywhere
982      * with a ref to this proxy by waiting for a BH
983      * handler to run after the proxy fd handlers were
984      * deleted above.
985      */
986     aio_bh_schedule_oneshot(proxy->ctx, vfio_user_close_cb, proxy);
987 
988     while (proxy->state != VFIO_PROXY_CLOSED) {
989         qemu_cond_wait(&proxy->close_cv, &proxy->lock);
990     }
991 
992     /* we now hold the only ref to proxy */
993     qemu_mutex_unlock(&proxy->lock);
994     qemu_cond_destroy(&proxy->close_cv);
995     qemu_mutex_destroy(&proxy->lock);
996 
997     QLIST_REMOVE(proxy, next);
998     if (QLIST_EMPTY(&vfio_user_sockets)) {
999         iothread_destroy(vfio_user_iothread);
1000         vfio_user_iothread = NULL;
1001     }
1002 
1003     g_free(proxy->sockname);
1004     g_free(proxy);
1005 }
1006 
1007 void vfio_user_request_msg(VFIOUserHdr *hdr, uint16_t cmd,
1008                            uint32_t size, uint32_t flags)
1009 {
1010     static uint16_t next_id;
1011 
1012     hdr->id = qatomic_fetch_inc(&next_id);
1013     hdr->command = cmd;
1014     hdr->size = size;
1015     hdr->flags = (flags & ~VFIO_USER_TYPE) | VFIO_USER_REQUEST;
1016     hdr->error_reply = 0;
1017 }
1018 
1019 struct cap_entry {
1020     const char *name;
1021     bool (*check)(VFIOUserProxy *proxy, QObject *qobj, Error **errp);
1022 };
1023 
1024 static bool caps_parse(VFIOUserProxy *proxy, QDict *qdict,
1025                        struct cap_entry caps[], Error **errp)
1026 {
1027     QObject *qobj;
1028     struct cap_entry *p;
1029 
1030     for (p = caps; p->name != NULL; p++) {
1031         qobj = qdict_get(qdict, p->name);
1032         if (qobj != NULL) {
1033             if (!p->check(proxy, qobj, errp)) {
1034                 return false;
1035             }
1036             qdict_del(qdict, p->name);
1037         }
1038     }
1039 
1040     /* warning, for now */
1041     if (qdict_size(qdict) != 0) {
1042         warn_report("spurious capabilities");
1043     }
1044     return true;
1045 }
1046 
1047 static bool check_migr_pgsize(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1048 {
1049     QNum *qn = qobject_to(QNum, qobj);
1050     uint64_t pgsize;
1051 
1052     if (qn == NULL || !qnum_get_try_uint(qn, &pgsize)) {
1053         error_setg(errp, "malformed %s", VFIO_USER_CAP_PGSIZE);
1054         return false;
1055     }
1056 
1057     /* must be larger than default */
1058     if (pgsize & (VFIO_USER_DEF_PGSIZE - 1)) {
1059         error_setg(errp, "pgsize 0x%"PRIx64" too small", pgsize);
1060         return false;
1061     }
1062 
1063     proxy->migr_pgsize = pgsize;
1064     return true;
1065 }
1066 
1067 static bool check_bitmap(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1068 {
1069     QNum *qn = qobject_to(QNum, qobj);
1070     uint64_t bitmap_size;
1071 
1072     if (qn == NULL || !qnum_get_try_uint(qn, &bitmap_size)) {
1073         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_BITMAP);
1074         return false;
1075     }
1076 
1077     /* can only lower it */
1078     if (bitmap_size > VFIO_USER_DEF_MAX_BITMAP) {
1079         error_setg(errp, "%s too large", VFIO_USER_CAP_MAX_BITMAP);
1080         return false;
1081     }
1082 
1083     proxy->max_bitmap = bitmap_size;
1084     return true;
1085 }
1086 
1087 static struct cap_entry caps_migr[] = {
1088     { VFIO_USER_CAP_PGSIZE, check_migr_pgsize },
1089     { VFIO_USER_CAP_MAX_BITMAP, check_bitmap },
1090     { NULL }
1091 };
1092 
1093 static bool check_max_fds(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1094 {
1095     QNum *qn = qobject_to(QNum, qobj);
1096     uint64_t max_send_fds;
1097 
1098     if (qn == NULL || !qnum_get_try_uint(qn, &max_send_fds) ||
1099         max_send_fds > VFIO_USER_MAX_MAX_FDS) {
1100         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_FDS);
1101         return false;
1102     }
1103     proxy->max_send_fds = max_send_fds;
1104     return true;
1105 }
1106 
1107 static bool check_max_xfer(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1108 {
1109     QNum *qn = qobject_to(QNum, qobj);
1110     uint64_t max_xfer_size;
1111 
1112     if (qn == NULL || !qnum_get_try_uint(qn, &max_xfer_size) ||
1113         max_xfer_size > VFIO_USER_MAX_MAX_XFER) {
1114         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_XFER);
1115         return false;
1116     }
1117     proxy->max_xfer_size = max_xfer_size;
1118     return true;
1119 }
1120 
1121 static bool check_pgsizes(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1122 {
1123     QNum *qn = qobject_to(QNum, qobj);
1124     uint64_t pgsizes;
1125 
1126     if (qn == NULL || !qnum_get_try_uint(qn, &pgsizes)) {
1127         error_setg(errp, "malformed %s", VFIO_USER_CAP_PGSIZES);
1128         return false;
1129     }
1130 
1131     /* must be larger than default */
1132     if (pgsizes & (VFIO_USER_DEF_PGSIZE - 1)) {
1133         error_setg(errp, "pgsize 0x%"PRIx64" too small", pgsizes);
1134         return false;
1135     }
1136 
1137     proxy->dma_pgsizes = pgsizes;
1138     return true;
1139 }
1140 
1141 static bool check_max_dma(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1142 {
1143     QNum *qn = qobject_to(QNum, qobj);
1144     uint64_t max_dma;
1145 
1146     if (qn == NULL || !qnum_get_try_uint(qn, &max_dma)) {
1147         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAP_MAX);
1148         return false;
1149     }
1150 
1151     /* can only lower it */
1152     if (max_dma > VFIO_USER_DEF_MAP_MAX) {
1153         error_setg(errp, "%s too large", VFIO_USER_CAP_MAP_MAX);
1154         return false;
1155     }
1156 
1157     proxy->max_dma = max_dma;
1158     return true;
1159 }
1160 
1161 static bool check_migr(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1162 {
1163     QDict *qdict = qobject_to(QDict, qobj);
1164 
1165     if (qdict == NULL) {
1166         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_FDS);
1167         return true;
1168     }
1169     return caps_parse(proxy, qdict, caps_migr, errp);
1170 }
1171 
1172 static bool check_multi(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1173 {
1174     QBool *qb = qobject_to(QBool, qobj);
1175 
1176     if (qb == NULL) {
1177         error_setg(errp, "malformed %s", VFIO_USER_CAP_MULTI);
1178         return false;
1179     }
1180     if (qbool_get_bool(qb)) {
1181         proxy->flags |= VFIO_PROXY_USE_MULTI;
1182     }
1183     return true;
1184 }
1185 
1186 static struct cap_entry caps_cap[] = {
1187     { VFIO_USER_CAP_MAX_FDS, check_max_fds },
1188     { VFIO_USER_CAP_MAX_XFER, check_max_xfer },
1189     { VFIO_USER_CAP_PGSIZES, check_pgsizes },
1190     { VFIO_USER_CAP_MAP_MAX, check_max_dma },
1191     { VFIO_USER_CAP_MIGR, check_migr },
1192     { VFIO_USER_CAP_MULTI, check_multi },
1193     { NULL }
1194 };
1195 
1196 static bool check_cap(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1197 {
1198    QDict *qdict = qobject_to(QDict, qobj);
1199 
1200     if (qdict == NULL) {
1201         error_setg(errp, "malformed %s", VFIO_USER_CAP);
1202         return false;
1203     }
1204     return caps_parse(proxy, qdict, caps_cap, errp);
1205 }
1206 
1207 static struct cap_entry ver_0_0[] = {
1208     { VFIO_USER_CAP, check_cap },
1209     { NULL }
1210 };
1211 
1212 static bool caps_check(VFIOUserProxy *proxy, int minor, const char *caps,
1213                        Error **errp)
1214 {
1215     QObject *qobj;
1216     QDict *qdict;
1217     bool ret;
1218 
1219     qobj = qobject_from_json(caps, NULL);
1220     if (qobj == NULL) {
1221         error_setg(errp, "malformed capabilities %s", caps);
1222         return false;
1223     }
1224     qdict = qobject_to(QDict, qobj);
1225     if (qdict == NULL) {
1226         error_setg(errp, "capabilities %s not an object", caps);
1227         qobject_unref(qobj);
1228         return false;
1229     }
1230     ret = caps_parse(proxy, qdict, ver_0_0, errp);
1231 
1232     qobject_unref(qobj);
1233     return ret;
1234 }
1235 
1236 static GString *caps_json(void)
1237 {
1238     QDict *dict = qdict_new();
1239     QDict *capdict = qdict_new();
1240     QDict *migdict = qdict_new();
1241     GString *str;
1242 
1243     qdict_put_int(migdict, VFIO_USER_CAP_PGSIZE, VFIO_USER_DEF_PGSIZE);
1244     qdict_put_int(migdict, VFIO_USER_CAP_MAX_BITMAP, VFIO_USER_DEF_MAX_BITMAP);
1245     qdict_put_obj(capdict, VFIO_USER_CAP_MIGR, QOBJECT(migdict));
1246 
1247     qdict_put_int(capdict, VFIO_USER_CAP_MAX_FDS, VFIO_USER_MAX_MAX_FDS);
1248     qdict_put_int(capdict, VFIO_USER_CAP_MAX_XFER, VFIO_USER_DEF_MAX_XFER);
1249     qdict_put_int(capdict, VFIO_USER_CAP_PGSIZES, VFIO_USER_DEF_PGSIZE);
1250     qdict_put_int(capdict, VFIO_USER_CAP_MAP_MAX, VFIO_USER_DEF_MAP_MAX);
1251     qdict_put_bool(capdict, VFIO_USER_CAP_MULTI, true);
1252 
1253     qdict_put_obj(dict, VFIO_USER_CAP, QOBJECT(capdict));
1254 
1255     str = qobject_to_json(QOBJECT(dict));
1256     qobject_unref(dict);
1257     return str;
1258 }
1259 
1260 bool vfio_user_validate_version(VFIOUserProxy *proxy, Error **errp)
1261 {
1262     g_autofree VFIOUserVersion *msgp = NULL;
1263     GString *caps;
1264     char *reply;
1265     int size, caplen;
1266 
1267     caps = caps_json();
1268     caplen = caps->len + 1;
1269     size = sizeof(*msgp) + caplen;
1270     msgp = g_malloc0(size);
1271 
1272     vfio_user_request_msg(&msgp->hdr, VFIO_USER_VERSION, size, 0);
1273     msgp->major = VFIO_USER_MAJOR_VER;
1274     msgp->minor = VFIO_USER_MINOR_VER;
1275     memcpy(&msgp->capabilities, caps->str, caplen);
1276     g_string_free(caps, true);
1277     trace_vfio_user_version(msgp->major, msgp->minor, msgp->capabilities);
1278 
1279     if (!vfio_user_send_wait(proxy, &msgp->hdr, NULL, 0, errp)) {
1280         return false;
1281     }
1282 
1283     if (msgp->hdr.flags & VFIO_USER_ERROR) {
1284         error_setg_errno(errp, msgp->hdr.error_reply, "version reply");
1285         return false;
1286     }
1287 
1288     if (msgp->major != VFIO_USER_MAJOR_VER ||
1289         msgp->minor > VFIO_USER_MINOR_VER) {
1290         error_setg(errp, "incompatible server version");
1291         return false;
1292     }
1293 
1294     reply = msgp->capabilities;
1295     if (reply[msgp->hdr.size - sizeof(*msgp) - 1] != '\0') {
1296         error_setg(errp, "corrupt version reply");
1297         return false;
1298     }
1299 
1300     if (!caps_check(proxy, msgp->minor, reply, errp)) {
1301         return false;
1302     }
1303 
1304     trace_vfio_user_version(msgp->major, msgp->minor, msgp->capabilities);
1305     return true;
1306 }
1307 
1308 void vfio_user_flush_multi(VFIOUserProxy *proxy)
1309 {
1310     VFIOUserMsg *msg;
1311     VFIOUserWRMulti *wm = proxy->wr_multi;
1312     Error *local_err = NULL;
1313 
1314     proxy->wr_multi = NULL;
1315 
1316     /* adjust size for actual # of writes */
1317     wm->hdr.size -= (VFIO_USER_MULTI_MAX - wm->wr_cnt) * sizeof(VFIOUserWROne);
1318 
1319     msg = vfio_user_getmsg(proxy, &wm->hdr, NULL);
1320     msg->id = wm->hdr.id;
1321     msg->rsize = 0;
1322     msg->type = VFIO_MSG_ASYNC;
1323     trace_vfio_user_wrmulti("flush", wm->wr_cnt);
1324 
1325     if (!vfio_user_send_queued(proxy, msg, &local_err)) {
1326         error_report_err(local_err);
1327         vfio_user_recycle(proxy, msg);
1328     }
1329 }
1330 
1331 void vfio_user_create_multi(VFIOUserProxy *proxy)
1332 {
1333     VFIOUserWRMulti *wm;
1334 
1335     wm = g_malloc0(sizeof(*wm));
1336     vfio_user_request_msg(&wm->hdr, VFIO_USER_REGION_WRITE_MULTI,
1337                           sizeof(*wm), VFIO_USER_NO_REPLY);
1338     proxy->wr_multi = wm;
1339 }
1340 
1341 void vfio_user_add_multi(VFIOUserProxy *proxy, uint8_t index,
1342                          off_t offset, uint32_t count, void *data)
1343 {
1344     VFIOUserWRMulti *wm = proxy->wr_multi;
1345     VFIOUserWROne *w1 = &wm->wrs[wm->wr_cnt];
1346 
1347     w1->offset = offset;
1348     w1->region = index;
1349     w1->count = count;
1350     memcpy(&w1->data, data, count);
1351 
1352     wm->wr_cnt++;
1353     trace_vfio_user_wrmulti("add", wm->wr_cnt);
1354     if (wm->wr_cnt == VFIO_USER_MULTI_MAX ||
1355         proxy->num_outgoing < VFIO_USER_OUT_LOW) {
1356         vfio_user_flush_multi(proxy);
1357     }
1358 }
1359