xref: /openbmc/qemu/hw/vfio-user/proxy.c (revision 2deec9ab7d25d7cd8f57033bd0421c1f9f28d905)
1 /*
2  * vfio protocol over a UNIX socket.
3  *
4  * Copyright © 2018, 2021 Oracle and/or its affiliates.
5  *
6  * SPDX-License-Identifier: GPL-2.0-or-later
7  */
8 
9 #include "qemu/osdep.h"
10 #include <sys/ioctl.h>
11 
12 #include "hw/vfio/vfio-device.h"
13 #include "hw/vfio-user/proxy.h"
14 #include "hw/vfio-user/trace.h"
15 #include "qapi/error.h"
16 #include "qobject/qbool.h"
17 #include "qobject/qdict.h"
18 #include "qobject/qjson.h"
19 #include "qobject/qnum.h"
20 #include "qemu/error-report.h"
21 #include "qemu/lockable.h"
22 #include "qemu/main-loop.h"
23 #include "qemu/thread.h"
24 #include "system/iothread.h"
25 
26 static IOThread *vfio_user_iothread;
27 
28 static void vfio_user_shutdown(VFIOUserProxy *proxy);
29 static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
30                                      VFIOUserFDs *fds);
31 static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg);
32 
33 static void vfio_user_recv(void *opaque);
34 static void vfio_user_send(void *opaque);
35 static void vfio_user_cb(void *opaque);
36 
37 static void vfio_user_request(void *opaque);
38 
39 static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
40 {
41     hdr->flags |= VFIO_USER_ERROR;
42     hdr->error_reply = err;
43 }
44 
45 /*
46  * Functions called by main, CPU, or iothread threads
47  */
48 
49 static void vfio_user_shutdown(VFIOUserProxy *proxy)
50 {
51     qio_channel_shutdown(proxy->ioc, QIO_CHANNEL_SHUTDOWN_READ, NULL);
52     qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL,
53                                    proxy->ctx, NULL, NULL);
54 }
55 
56 /*
57  * Same return values as qio_channel_writev_full():
58  *
59  * QIO_CHANNEL_ERR_BLOCK: *errp not set
60  * -1: *errp will be populated
61  * otherwise: bytes written
62  */
63 static ssize_t vfio_user_send_qio(VFIOUserProxy *proxy, VFIOUserMsg *msg,
64                                   Error **errp)
65 {
66     VFIOUserFDs *fds =  msg->fds;
67     struct iovec iov = {
68         .iov_base = msg->hdr,
69         .iov_len = msg->hdr->size,
70     };
71     size_t numfds = 0;
72     int *fdp = NULL;
73     ssize_t ret;
74 
75     if (fds != NULL && fds->send_fds != 0) {
76         numfds = fds->send_fds;
77         fdp = fds->fds;
78     }
79 
80     ret = qio_channel_writev_full(proxy->ioc, &iov, 1, fdp, numfds, 0, errp);
81 
82     if (ret == -1) {
83         vfio_user_set_error(msg->hdr, EIO);
84         vfio_user_shutdown(proxy);
85     }
86     trace_vfio_user_send_write(msg->hdr->id, ret);
87 
88     return ret;
89 }
90 
91 static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
92                                      VFIOUserFDs *fds)
93 {
94     VFIOUserMsg *msg;
95 
96     msg = QTAILQ_FIRST(&proxy->free);
97     if (msg != NULL) {
98         QTAILQ_REMOVE(&proxy->free, msg, next);
99     } else {
100         msg = g_malloc0(sizeof(*msg));
101         qemu_cond_init(&msg->cv);
102     }
103 
104     msg->hdr = hdr;
105     msg->fds = fds;
106     return msg;
107 }
108 
109 /*
110  * Recycle a message list entry to the free list.
111  */
112 static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg)
113 {
114     if (msg->type == VFIO_MSG_NONE) {
115         error_printf("vfio_user_recycle - freeing free msg\n");
116         return;
117     }
118 
119     /* free msg buffer if no one is waiting to consume the reply */
120     if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
121         g_free(msg->hdr);
122         if (msg->fds != NULL) {
123             g_free(msg->fds);
124         }
125     }
126 
127     msg->type = VFIO_MSG_NONE;
128     msg->hdr = NULL;
129     msg->fds = NULL;
130     msg->complete = false;
131     msg->pending = false;
132     QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
133 }
134 
135 VFIOUserFDs *vfio_user_getfds(int numfds)
136 {
137     VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
138 
139     fds->fds = (int *)((char *)fds + sizeof(*fds));
140 
141     return fds;
142 }
143 
144 /*
145  * Functions only called by iothread
146  */
147 
148 /*
149  * Process a received message.
150  */
151 static void vfio_user_process(VFIOUserProxy *proxy, VFIOUserMsg *msg,
152                               bool isreply)
153 {
154 
155     /*
156      * Replies signal a waiter, if none just check for errors
157      * and free the message buffer.
158      *
159      * Requests get queued for the BH.
160      */
161     if (isreply) {
162         msg->complete = true;
163         if (msg->type == VFIO_MSG_WAIT) {
164             qemu_cond_signal(&msg->cv);
165         } else {
166             if (msg->hdr->flags & VFIO_USER_ERROR) {
167                 error_printf("vfio_user_process: error reply on async ");
168                 error_printf("request command %x error %s\n",
169                              msg->hdr->command,
170                              strerror(msg->hdr->error_reply));
171             }
172             /* youngest nowait msg has been ack'd */
173             if (proxy->last_nowait == msg) {
174                 proxy->last_nowait = NULL;
175             }
176             vfio_user_recycle(proxy, msg);
177         }
178     } else {
179         QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
180         qemu_bh_schedule(proxy->req_bh);
181     }
182 }
183 
184 /*
185  * Complete a partial message read
186  */
187 static int vfio_user_complete(VFIOUserProxy *proxy, Error **errp)
188 {
189     VFIOUserMsg *msg = proxy->part_recv;
190     size_t msgleft = proxy->recv_left;
191     bool isreply;
192     char *data;
193     int ret;
194 
195     data = (char *)msg->hdr + (msg->hdr->size - msgleft);
196     while (msgleft > 0) {
197         ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
198 
199         /* error or would block */
200         if (ret <= 0) {
201             /* try for rest on next iternation */
202             if (ret == QIO_CHANNEL_ERR_BLOCK) {
203                 proxy->recv_left = msgleft;
204             }
205             return ret;
206         }
207         trace_vfio_user_recv_read(msg->hdr->id, ret);
208 
209         msgleft -= ret;
210         data += ret;
211     }
212 
213     /*
214      * Read complete message, process it.
215      */
216     proxy->part_recv = NULL;
217     proxy->recv_left = 0;
218     isreply = (msg->hdr->flags & VFIO_USER_TYPE) == VFIO_USER_REPLY;
219     vfio_user_process(proxy, msg, isreply);
220 
221     /* return positive value */
222     return 1;
223 }
224 
225 /*
226  * Receive and process one incoming message.
227  *
228  * For replies, find matching outgoing request and wake any waiters.
229  * For requests, queue in incoming list and run request BH.
230  */
231 static int vfio_user_recv_one(VFIOUserProxy *proxy, Error **errp)
232 {
233     VFIOUserMsg *msg = NULL;
234     g_autofree int *fdp = NULL;
235     VFIOUserFDs *reqfds;
236     VFIOUserHdr hdr;
237     struct iovec iov = {
238         .iov_base = &hdr,
239         .iov_len = sizeof(hdr),
240     };
241     bool isreply = false;
242     int i, ret;
243     size_t msgleft, numfds = 0;
244     char *data = NULL;
245     char *buf = NULL;
246 
247     /*
248      * Complete any partial reads
249      */
250     if (proxy->part_recv != NULL) {
251         ret = vfio_user_complete(proxy, errp);
252 
253         /* still not complete, try later */
254         if (ret == QIO_CHANNEL_ERR_BLOCK) {
255             return ret;
256         }
257 
258         if (ret <= 0) {
259             goto fatal;
260         }
261         /* else fall into reading another msg */
262     }
263 
264     /*
265      * Read header
266      */
267     ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, 0,
268                                  errp);
269     if (ret == QIO_CHANNEL_ERR_BLOCK) {
270         return ret;
271     }
272 
273     /* read error or other side closed connection */
274     if (ret <= 0) {
275         goto fatal;
276     }
277 
278     if (ret < sizeof(hdr)) {
279         error_setg(errp, "short read of header");
280         goto fatal;
281     }
282 
283     /*
284      * Validate header
285      */
286     if (hdr.size < sizeof(VFIOUserHdr)) {
287         error_setg(errp, "bad header size");
288         goto fatal;
289     }
290     switch (hdr.flags & VFIO_USER_TYPE) {
291     case VFIO_USER_REQUEST:
292         isreply = false;
293         break;
294     case VFIO_USER_REPLY:
295         isreply = true;
296         break;
297     default:
298         error_setg(errp, "unknown message type");
299         goto fatal;
300     }
301     trace_vfio_user_recv_hdr(proxy->sockname, hdr.id, hdr.command, hdr.size,
302                              hdr.flags);
303 
304     /*
305      * For replies, find the matching pending request.
306      * For requests, reap incoming FDs.
307      */
308     if (isreply) {
309         QTAILQ_FOREACH(msg, &proxy->pending, next) {
310             if (hdr.id == msg->id) {
311                 break;
312             }
313         }
314         if (msg == NULL) {
315             error_setg(errp, "unexpected reply");
316             goto err;
317         }
318         QTAILQ_REMOVE(&proxy->pending, msg, next);
319 
320         /*
321          * Process any received FDs
322          */
323         if (numfds != 0) {
324             if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
325                 error_setg(errp, "unexpected FDs");
326                 goto err;
327             }
328             msg->fds->recv_fds = numfds;
329             memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
330         }
331     } else {
332         if (numfds != 0) {
333             reqfds = vfio_user_getfds(numfds);
334             memcpy(reqfds->fds, fdp, numfds * sizeof(int));
335         } else {
336             reqfds = NULL;
337         }
338     }
339 
340     /*
341      * Put the whole message into a single buffer.
342      */
343     if (isreply) {
344         if (hdr.size > msg->rsize) {
345             error_setg(errp, "reply larger than recv buffer");
346             goto err;
347         }
348         *msg->hdr = hdr;
349         data = (char *)msg->hdr + sizeof(hdr);
350     } else {
351         if (hdr.size > proxy->max_xfer_size + sizeof(VFIOUserDMARW)) {
352             error_setg(errp, "vfio_user_recv request larger than max");
353             goto err;
354         }
355         buf = g_malloc0(hdr.size);
356         memcpy(buf, &hdr, sizeof(hdr));
357         data = buf + sizeof(hdr);
358         msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
359         msg->type = VFIO_MSG_REQ;
360     }
361 
362     /*
363      * Read rest of message.
364      */
365     msgleft = hdr.size - sizeof(hdr);
366     while (msgleft > 0) {
367         ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
368 
369         /* prepare to complete read on next iternation */
370         if (ret == QIO_CHANNEL_ERR_BLOCK) {
371             proxy->part_recv = msg;
372             proxy->recv_left = msgleft;
373             return ret;
374         }
375 
376         if (ret <= 0) {
377             goto fatal;
378         }
379         trace_vfio_user_recv_read(hdr.id, ret);
380 
381         msgleft -= ret;
382         data += ret;
383     }
384 
385     vfio_user_process(proxy, msg, isreply);
386     return 0;
387 
388     /*
389      * fatal means the other side closed or we don't trust the stream
390      * err means this message is corrupt
391      */
392 fatal:
393     vfio_user_shutdown(proxy);
394     proxy->state = VFIO_PROXY_ERROR;
395 
396     /* set error if server side closed */
397     if (ret == 0) {
398         error_setg(errp, "server closed socket");
399     }
400 
401 err:
402     for (i = 0; i < numfds; i++) {
403         close(fdp[i]);
404     }
405     if (isreply && msg != NULL) {
406         /* force an error to keep sending thread from hanging */
407         vfio_user_set_error(msg->hdr, EINVAL);
408         msg->complete = true;
409         qemu_cond_signal(&msg->cv);
410     }
411     return -1;
412 }
413 
414 static void vfio_user_recv(void *opaque)
415 {
416     VFIOUserProxy *proxy = opaque;
417 
418     QEMU_LOCK_GUARD(&proxy->lock);
419 
420     if (proxy->state == VFIO_PROXY_CONNECTED) {
421         Error *local_err = NULL;
422 
423         while (vfio_user_recv_one(proxy, &local_err) == 0) {
424             ;
425         }
426 
427         if (local_err != NULL) {
428             error_report_err(local_err);
429         }
430     }
431 }
432 
433 /*
434  * Send a single message, same return semantics as vfio_user_send_qio().
435  *
436  * Sent async messages are freed, others are moved to pending queue.
437  */
438 static ssize_t vfio_user_send_one(VFIOUserProxy *proxy, Error **errp)
439 {
440     VFIOUserMsg *msg;
441     ssize_t ret;
442 
443     msg = QTAILQ_FIRST(&proxy->outgoing);
444     ret = vfio_user_send_qio(proxy, msg, errp);
445     if (ret < 0) {
446         return ret;
447     }
448 
449     QTAILQ_REMOVE(&proxy->outgoing, msg, next);
450     proxy->num_outgoing--;
451     if (msg->type == VFIO_MSG_ASYNC) {
452         vfio_user_recycle(proxy, msg);
453     } else {
454         QTAILQ_INSERT_TAIL(&proxy->pending, msg, next);
455         msg->pending = true;
456     }
457 
458     return ret;
459 }
460 
461 /*
462  * Send messages from outgoing queue when the socket buffer has space.
463  * If we deplete 'outgoing', remove ourselves from the poll list.
464  */
465 static void vfio_user_send(void *opaque)
466 {
467     VFIOUserProxy *proxy = opaque;
468 
469     QEMU_LOCK_GUARD(&proxy->lock);
470 
471     if (proxy->state == VFIO_PROXY_CONNECTED) {
472         while (!QTAILQ_EMPTY(&proxy->outgoing)) {
473             Error *local_err = NULL;
474             int ret;
475 
476             ret = vfio_user_send_one(proxy, &local_err);
477 
478             if (ret == QIO_CHANNEL_ERR_BLOCK) {
479                 return;
480             } else if (ret == -1) {
481                 error_report_err(local_err);
482                 return;
483             }
484         }
485         qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
486                                        vfio_user_recv, NULL, NULL, proxy);
487 
488         /* queue empty - send any pending multi write msgs */
489         if (proxy->wr_multi != NULL) {
490             vfio_user_flush_multi(proxy);
491         }
492     }
493 }
494 
495 static void vfio_user_cb(void *opaque)
496 {
497     VFIOUserProxy *proxy = opaque;
498 
499     QEMU_LOCK_GUARD(&proxy->lock);
500 
501     proxy->state = VFIO_PROXY_CLOSED;
502     qemu_cond_signal(&proxy->close_cv);
503 }
504 
505 
506 /*
507  * Functions called by main or CPU threads
508  */
509 
510 /*
511  * Process incoming requests.
512  *
513  * The bus-specific callback has the form:
514  *    request(opaque, msg)
515  * where 'opaque' was specified in vfio_user_set_handler
516  * and 'msg' is the inbound message.
517  *
518  * The callback is responsible for disposing of the message buffer,
519  * usually by re-using it when calling vfio_send_reply or vfio_send_error,
520  * both of which free their message buffer when the reply is sent.
521  *
522  * If the callback uses a new buffer, it needs to free the old one.
523  */
524 static void vfio_user_request(void *opaque)
525 {
526     VFIOUserProxy *proxy = opaque;
527     VFIOUserMsgQ new, free;
528     VFIOUserMsg *msg, *m1;
529 
530     /* reap all incoming */
531     QTAILQ_INIT(&new);
532     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
533         QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
534             QTAILQ_REMOVE(&proxy->incoming, msg, next);
535             QTAILQ_INSERT_TAIL(&new, msg, next);
536         }
537     }
538 
539     /* process list */
540     QTAILQ_INIT(&free);
541     QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
542         QTAILQ_REMOVE(&new, msg, next);
543         trace_vfio_user_recv_request(msg->hdr->command);
544         proxy->request(proxy->req_arg, msg);
545         QTAILQ_INSERT_HEAD(&free, msg, next);
546     }
547 
548     /* free list */
549     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
550         QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
551             vfio_user_recycle(proxy, msg);
552         }
553     }
554 }
555 
556 /*
557  * Messages are queued onto the proxy's outgoing list.
558  *
559  * It handles 3 types of messages:
560  *
561  * async messages - replies and posted writes
562  *
563  * There will be no reply from the server, so message
564  * buffers are freed after they're sent.
565  *
566  * nowait messages - map/unmap during address space transactions
567  *
568  * These are also sent async, but a reply is expected so that
569  * vfio_wait_reqs() can wait for the youngest nowait request.
570  * They transition from the outgoing list to the pending list
571  * when sent, and are freed when the reply is received.
572  *
573  * wait messages - all other requests
574  *
575  * The reply to these messages is waited for by their caller.
576  * They also transition from outgoing to pending when sent, but
577  * the message buffer is returned to the caller with the reply
578  * contents.  The caller is responsible for freeing these messages.
579  *
580  * As an optimization, if the outgoing list and the socket send
581  * buffer are empty, the message is sent inline instead of being
582  * added to the outgoing list.  The rest of the transitions are
583  * unchanged.
584  */
585 static bool vfio_user_send_queued(VFIOUserProxy *proxy, VFIOUserMsg *msg,
586                                   Error **errp)
587 {
588     int ret;
589 
590     /* older coalesced writes go first */
591     if (proxy->wr_multi != NULL &&
592         ((msg->hdr->flags & VFIO_USER_TYPE) == VFIO_USER_REQUEST)) {
593         vfio_user_flush_multi(proxy);
594     }
595 
596     /*
597      * Unsent outgoing msgs - add to tail
598      */
599     if (!QTAILQ_EMPTY(&proxy->outgoing)) {
600         QTAILQ_INSERT_TAIL(&proxy->outgoing, msg, next);
601         proxy->num_outgoing++;
602         return true;
603     }
604 
605     /*
606      * Try inline - if blocked, queue it and kick send poller
607      */
608     if (proxy->flags & VFIO_PROXY_FORCE_QUEUED) {
609         ret = QIO_CHANNEL_ERR_BLOCK;
610     } else {
611         ret = vfio_user_send_qio(proxy, msg, errp);
612     }
613 
614     if (ret == QIO_CHANNEL_ERR_BLOCK) {
615         QTAILQ_INSERT_HEAD(&proxy->outgoing, msg, next);
616         proxy->num_outgoing = 1;
617         qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
618                                        vfio_user_recv, proxy->ctx,
619                                        vfio_user_send, proxy);
620         return true;
621     }
622     if (ret == -1) {
623         return false;
624     }
625 
626     /*
627      * Sent - free async, add others to pending
628      */
629     if (msg->type == VFIO_MSG_ASYNC) {
630         vfio_user_recycle(proxy, msg);
631     } else {
632         QTAILQ_INSERT_TAIL(&proxy->pending, msg, next);
633         msg->pending = true;
634     }
635 
636     return true;
637 }
638 
639 /*
640  * nowait send - vfio_wait_reqs() can wait for it later
641  *
642  * Returns false if we did not successfully receive a reply message, in which
643  * case @errp will be populated.
644  *
645  * In either case, ownership of @hdr and @fds is taken, and the caller must
646  * *not* free them itself.
647  */
648 bool vfio_user_send_nowait(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
649                            VFIOUserFDs *fds, int rsize, Error **errp)
650 {
651     VFIOUserMsg *msg;
652 
653     QEMU_LOCK_GUARD(&proxy->lock);
654 
655     msg = vfio_user_getmsg(proxy, hdr, fds);
656     msg->id = hdr->id;
657     msg->rsize = rsize ? rsize : hdr->size;
658     msg->type = VFIO_MSG_NOWAIT;
659 
660     if (hdr->flags & VFIO_USER_NO_REPLY) {
661         error_setg_errno(errp, EINVAL, "%s on NO_REPLY message", __func__);
662         vfio_user_recycle(proxy, msg);
663         return false;
664     }
665 
666     if (!vfio_user_send_queued(proxy, msg, errp)) {
667         vfio_user_recycle(proxy, msg);
668         return false;
669     }
670 
671     proxy->last_nowait = msg;
672 
673     return true;
674 }
675 
676 /*
677  * Returns false if we did not successfully receive a reply message, in which
678  * case @errp will be populated.
679  *
680  * In either case, the caller must free @hdr and @fds if needed.
681  */
682 bool vfio_user_send_wait(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
683                          VFIOUserFDs *fds, int rsize, Error **errp)
684 {
685     VFIOUserMsg *msg;
686     bool ok = false;
687 
688     if (hdr->flags & VFIO_USER_NO_REPLY) {
689         error_setg_errno(errp, EINVAL, "%s on NO_REPLY message", __func__);
690         return false;
691     }
692 
693     qemu_mutex_lock(&proxy->lock);
694 
695     msg = vfio_user_getmsg(proxy, hdr, fds);
696     msg->id = hdr->id;
697     msg->rsize = rsize ? rsize : hdr->size;
698     msg->type = VFIO_MSG_WAIT;
699 
700     ok = vfio_user_send_queued(proxy, msg, errp);
701 
702     if (ok) {
703         while (!msg->complete) {
704             if (!qemu_cond_timedwait(&msg->cv, &proxy->lock,
705                                      proxy->wait_time)) {
706                 VFIOUserMsgQ *list;
707 
708                 list = msg->pending ? &proxy->pending : &proxy->outgoing;
709                 QTAILQ_REMOVE(list, msg, next);
710                 error_setg_errno(errp, ETIMEDOUT,
711                                  "timed out waiting for reply");
712                 ok = false;
713                 break;
714             }
715         }
716     }
717 
718     vfio_user_recycle(proxy, msg);
719 
720     qemu_mutex_unlock(&proxy->lock);
721 
722     return ok;
723 }
724 
725 /*
726  * async send - msg can be queued, but will be freed when sent
727  *
728  * Returns false on failure, in which case @errp will be populated.
729  *
730  * In either case, ownership of @hdr and @fds is taken, and the caller must
731  * *not* free them itself.
732  */
733 bool vfio_user_send_async(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
734                           VFIOUserFDs *fds, Error **errp)
735 {
736     VFIOUserMsg *msg;
737 
738     QEMU_LOCK_GUARD(&proxy->lock);
739 
740     msg = vfio_user_getmsg(proxy, hdr, fds);
741     msg->id = hdr->id;
742     msg->rsize = 0;
743     msg->type = VFIO_MSG_ASYNC;
744 
745     if (!(hdr->flags & (VFIO_USER_NO_REPLY | VFIO_USER_REPLY))) {
746         error_setg_errno(errp, EINVAL, "%s on sync message", __func__);
747         vfio_user_recycle(proxy, msg);
748         return false;
749     }
750 
751     if (!vfio_user_send_queued(proxy, msg, errp)) {
752         vfio_user_recycle(proxy, msg);
753         return false;
754     }
755 
756     return true;
757 }
758 
759 void vfio_user_wait_reqs(VFIOUserProxy *proxy)
760 {
761     VFIOUserMsg *msg;
762 
763     /*
764      * Any DMA map/unmap requests sent in the middle
765      * of a memory region transaction were sent nowait.
766      * Wait for them here.
767      */
768     qemu_mutex_lock(&proxy->lock);
769     if (proxy->last_nowait != NULL) {
770         /*
771          * Change type to WAIT to wait for reply
772          */
773         msg = proxy->last_nowait;
774         msg->type = VFIO_MSG_WAIT;
775         proxy->last_nowait = NULL;
776         while (!msg->complete) {
777             if (!qemu_cond_timedwait(&msg->cv, &proxy->lock,
778                                      proxy->wait_time)) {
779                 VFIOUserMsgQ *list;
780 
781                 list = msg->pending ? &proxy->pending : &proxy->outgoing;
782                 QTAILQ_REMOVE(list, msg, next);
783                 error_printf("vfio_wait_reqs - timed out\n");
784                 break;
785             }
786         }
787 
788         if (msg->hdr->flags & VFIO_USER_ERROR) {
789             error_printf("vfio_user_wait_reqs - error reply on async ");
790             error_printf("request: command %x error %s\n", msg->hdr->command,
791                          strerror(msg->hdr->error_reply));
792         }
793 
794         /*
795          * Change type back to NOWAIT to free
796          */
797         msg->type = VFIO_MSG_NOWAIT;
798         vfio_user_recycle(proxy, msg);
799     }
800 
801     qemu_mutex_unlock(&proxy->lock);
802 }
803 
804 /*
805  * Reply to an incoming request.
806  */
807 void vfio_user_send_reply(VFIOUserProxy *proxy, VFIOUserHdr *hdr, int size)
808 {
809     Error *local_err = NULL;
810 
811     if (size < sizeof(VFIOUserHdr)) {
812         error_printf("%s: size too small", __func__);
813         g_free(hdr);
814         return;
815     }
816 
817     /*
818      * convert header to associated reply
819      */
820     hdr->flags = VFIO_USER_REPLY;
821     hdr->size = size;
822 
823     if (!vfio_user_send_async(proxy, hdr, NULL, &local_err)) {
824         error_report_err(local_err);
825     }
826 }
827 
828 /*
829  * Send an error reply to an incoming request.
830  */
831 void vfio_user_send_error(VFIOUserProxy *proxy, VFIOUserHdr *hdr, int error)
832 {
833     Error *local_err = NULL;
834 
835     /*
836      * convert header to associated reply
837      */
838     hdr->flags = VFIO_USER_REPLY;
839     hdr->flags |= VFIO_USER_ERROR;
840     hdr->error_reply = error;
841     hdr->size = sizeof(*hdr);
842 
843     if (!vfio_user_send_async(proxy, hdr, NULL, &local_err)) {
844         error_report_err(local_err);
845     }
846 }
847 
848 /*
849  * Close FDs erroneously received in an incoming request.
850  */
851 void vfio_user_putfds(VFIOUserMsg *msg)
852 {
853     VFIOUserFDs *fds = msg->fds;
854     int i;
855 
856     for (i = 0; i < fds->recv_fds; i++) {
857         close(fds->fds[i]);
858     }
859     g_free(fds);
860     msg->fds = NULL;
861 }
862 
863 void
864 vfio_user_disable_posted_writes(VFIOUserProxy *proxy)
865 {
866     WITH_QEMU_LOCK_GUARD(&proxy->lock) {
867          proxy->flags |= VFIO_PROXY_NO_POST;
868     }
869 }
870 
871 static QLIST_HEAD(, VFIOUserProxy) vfio_user_sockets =
872     QLIST_HEAD_INITIALIZER(vfio_user_sockets);
873 
874 VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
875 {
876     VFIOUserProxy *proxy;
877     QIOChannelSocket *sioc;
878     QIOChannel *ioc;
879     char *sockname;
880 
881     if (addr->type != SOCKET_ADDRESS_TYPE_UNIX) {
882         error_setg(errp, "vfio_user_connect - bad address family");
883         return NULL;
884     }
885     sockname = addr->u.q_unix.path;
886 
887     sioc = qio_channel_socket_new();
888     ioc = QIO_CHANNEL(sioc);
889     if (qio_channel_socket_connect_sync(sioc, addr, errp)) {
890         object_unref(OBJECT(ioc));
891         return NULL;
892     }
893     qio_channel_set_blocking(ioc, false, NULL);
894 
895     proxy = g_malloc0(sizeof(VFIOUserProxy));
896     proxy->sockname = g_strdup_printf("unix:%s", sockname);
897     proxy->ioc = ioc;
898 
899     /* init defaults */
900     proxy->max_xfer_size = VFIO_USER_DEF_MAX_XFER;
901     proxy->max_send_fds = VFIO_USER_DEF_MAX_FDS;
902     proxy->max_dma = VFIO_USER_DEF_MAP_MAX;
903     proxy->dma_pgsizes = VFIO_USER_DEF_PGSIZE;
904     proxy->max_bitmap = VFIO_USER_DEF_MAX_BITMAP;
905     proxy->migr_pgsize = VFIO_USER_DEF_PGSIZE;
906 
907     proxy->flags = VFIO_PROXY_CLIENT;
908     proxy->state = VFIO_PROXY_CONNECTED;
909 
910     qemu_mutex_init(&proxy->lock);
911     qemu_cond_init(&proxy->close_cv);
912 
913     if (vfio_user_iothread == NULL) {
914         vfio_user_iothread = iothread_create("VFIO user", errp);
915     }
916 
917     proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
918     proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
919 
920     QTAILQ_INIT(&proxy->outgoing);
921     QTAILQ_INIT(&proxy->incoming);
922     QTAILQ_INIT(&proxy->free);
923     QTAILQ_INIT(&proxy->pending);
924     QLIST_INSERT_HEAD(&vfio_user_sockets, proxy, next);
925 
926     return proxy;
927 }
928 
929 void vfio_user_set_handler(VFIODevice *vbasedev,
930                            void (*handler)(void *opaque, VFIOUserMsg *msg),
931                            void *req_arg)
932 {
933     VFIOUserProxy *proxy = vbasedev->proxy;
934 
935     proxy->request = handler;
936     proxy->req_arg = req_arg;
937     qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
938                                    vfio_user_recv, NULL, NULL, proxy);
939 }
940 
941 void vfio_user_disconnect(VFIOUserProxy *proxy)
942 {
943     VFIOUserMsg *r1, *r2;
944 
945     qemu_mutex_lock(&proxy->lock);
946 
947     /* our side is quitting */
948     if (proxy->state == VFIO_PROXY_CONNECTED) {
949         vfio_user_shutdown(proxy);
950         if (!QTAILQ_EMPTY(&proxy->pending)) {
951             error_printf("vfio_user_disconnect: outstanding requests\n");
952         }
953     }
954     object_unref(OBJECT(proxy->ioc));
955     proxy->ioc = NULL;
956     qemu_bh_delete(proxy->req_bh);
957     proxy->req_bh = NULL;
958 
959     proxy->state = VFIO_PROXY_CLOSING;
960     QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
961         qemu_cond_destroy(&r1->cv);
962         QTAILQ_REMOVE(&proxy->outgoing, r1, next);
963         g_free(r1);
964     }
965     QTAILQ_FOREACH_SAFE(r1, &proxy->incoming, next, r2) {
966         qemu_cond_destroy(&r1->cv);
967         QTAILQ_REMOVE(&proxy->incoming, r1, next);
968         g_free(r1);
969     }
970     QTAILQ_FOREACH_SAFE(r1, &proxy->pending, next, r2) {
971         qemu_cond_destroy(&r1->cv);
972         QTAILQ_REMOVE(&proxy->pending, r1, next);
973         g_free(r1);
974     }
975     QTAILQ_FOREACH_SAFE(r1, &proxy->free, next, r2) {
976         qemu_cond_destroy(&r1->cv);
977         QTAILQ_REMOVE(&proxy->free, r1, next);
978         g_free(r1);
979     }
980 
981     /*
982      * Make sure the iothread isn't blocking anywhere
983      * with a ref to this proxy by waiting for a BH
984      * handler to run after the proxy fd handlers were
985      * deleted above.
986      */
987     aio_bh_schedule_oneshot(proxy->ctx, vfio_user_cb, proxy);
988     qemu_cond_wait(&proxy->close_cv, &proxy->lock);
989 
990     /* we now hold the only ref to proxy */
991     qemu_mutex_unlock(&proxy->lock);
992     qemu_cond_destroy(&proxy->close_cv);
993     qemu_mutex_destroy(&proxy->lock);
994 
995     QLIST_REMOVE(proxy, next);
996     if (QLIST_EMPTY(&vfio_user_sockets)) {
997         iothread_destroy(vfio_user_iothread);
998         vfio_user_iothread = NULL;
999     }
1000 
1001     g_free(proxy->sockname);
1002     g_free(proxy);
1003 }
1004 
1005 void vfio_user_request_msg(VFIOUserHdr *hdr, uint16_t cmd,
1006                            uint32_t size, uint32_t flags)
1007 {
1008     static uint16_t next_id;
1009 
1010     hdr->id = qatomic_fetch_inc(&next_id);
1011     hdr->command = cmd;
1012     hdr->size = size;
1013     hdr->flags = (flags & ~VFIO_USER_TYPE) | VFIO_USER_REQUEST;
1014     hdr->error_reply = 0;
1015 }
1016 
1017 struct cap_entry {
1018     const char *name;
1019     bool (*check)(VFIOUserProxy *proxy, QObject *qobj, Error **errp);
1020 };
1021 
1022 static bool caps_parse(VFIOUserProxy *proxy, QDict *qdict,
1023                        struct cap_entry caps[], Error **errp)
1024 {
1025     QObject *qobj;
1026     struct cap_entry *p;
1027 
1028     for (p = caps; p->name != NULL; p++) {
1029         qobj = qdict_get(qdict, p->name);
1030         if (qobj != NULL) {
1031             if (!p->check(proxy, qobj, errp)) {
1032                 return false;
1033             }
1034             qdict_del(qdict, p->name);
1035         }
1036     }
1037 
1038     /* warning, for now */
1039     if (qdict_size(qdict) != 0) {
1040         warn_report("spurious capabilities");
1041     }
1042     return true;
1043 }
1044 
1045 static bool check_migr_pgsize(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1046 {
1047     QNum *qn = qobject_to(QNum, qobj);
1048     uint64_t pgsize;
1049 
1050     if (qn == NULL || !qnum_get_try_uint(qn, &pgsize)) {
1051         error_setg(errp, "malformed %s", VFIO_USER_CAP_PGSIZE);
1052         return false;
1053     }
1054 
1055     /* must be larger than default */
1056     if (pgsize & (VFIO_USER_DEF_PGSIZE - 1)) {
1057         error_setg(errp, "pgsize 0x%"PRIx64" too small", pgsize);
1058         return false;
1059     }
1060 
1061     proxy->migr_pgsize = pgsize;
1062     return true;
1063 }
1064 
1065 static bool check_bitmap(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1066 {
1067     QNum *qn = qobject_to(QNum, qobj);
1068     uint64_t bitmap_size;
1069 
1070     if (qn == NULL || !qnum_get_try_uint(qn, &bitmap_size)) {
1071         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_BITMAP);
1072         return false;
1073     }
1074 
1075     /* can only lower it */
1076     if (bitmap_size > VFIO_USER_DEF_MAX_BITMAP) {
1077         error_setg(errp, "%s too large", VFIO_USER_CAP_MAX_BITMAP);
1078         return false;
1079     }
1080 
1081     proxy->max_bitmap = bitmap_size;
1082     return true;
1083 }
1084 
1085 static struct cap_entry caps_migr[] = {
1086     { VFIO_USER_CAP_PGSIZE, check_migr_pgsize },
1087     { VFIO_USER_CAP_MAX_BITMAP, check_bitmap },
1088     { NULL }
1089 };
1090 
1091 static bool check_max_fds(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1092 {
1093     QNum *qn = qobject_to(QNum, qobj);
1094     uint64_t max_send_fds;
1095 
1096     if (qn == NULL || !qnum_get_try_uint(qn, &max_send_fds) ||
1097         max_send_fds > VFIO_USER_MAX_MAX_FDS) {
1098         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_FDS);
1099         return false;
1100     }
1101     proxy->max_send_fds = max_send_fds;
1102     return true;
1103 }
1104 
1105 static bool check_max_xfer(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1106 {
1107     QNum *qn = qobject_to(QNum, qobj);
1108     uint64_t max_xfer_size;
1109 
1110     if (qn == NULL || !qnum_get_try_uint(qn, &max_xfer_size) ||
1111         max_xfer_size > VFIO_USER_MAX_MAX_XFER) {
1112         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_XFER);
1113         return false;
1114     }
1115     proxy->max_xfer_size = max_xfer_size;
1116     return true;
1117 }
1118 
1119 static bool check_pgsizes(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1120 {
1121     QNum *qn = qobject_to(QNum, qobj);
1122     uint64_t pgsizes;
1123 
1124     if (qn == NULL || !qnum_get_try_uint(qn, &pgsizes)) {
1125         error_setg(errp, "malformed %s", VFIO_USER_CAP_PGSIZES);
1126         return false;
1127     }
1128 
1129     /* must be larger than default */
1130     if (pgsizes & (VFIO_USER_DEF_PGSIZE - 1)) {
1131         error_setg(errp, "pgsize 0x%"PRIx64" too small", pgsizes);
1132         return false;
1133     }
1134 
1135     proxy->dma_pgsizes = pgsizes;
1136     return true;
1137 }
1138 
1139 static bool check_max_dma(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1140 {
1141     QNum *qn = qobject_to(QNum, qobj);
1142     uint64_t max_dma;
1143 
1144     if (qn == NULL || !qnum_get_try_uint(qn, &max_dma)) {
1145         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAP_MAX);
1146         return false;
1147     }
1148 
1149     /* can only lower it */
1150     if (max_dma > VFIO_USER_DEF_MAP_MAX) {
1151         error_setg(errp, "%s too large", VFIO_USER_CAP_MAP_MAX);
1152         return false;
1153     }
1154 
1155     proxy->max_dma = max_dma;
1156     return true;
1157 }
1158 
1159 static bool check_migr(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1160 {
1161     QDict *qdict = qobject_to(QDict, qobj);
1162 
1163     if (qdict == NULL) {
1164         error_setg(errp, "malformed %s", VFIO_USER_CAP_MAX_FDS);
1165         return true;
1166     }
1167     return caps_parse(proxy, qdict, caps_migr, errp);
1168 }
1169 
1170 static bool check_multi(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1171 {
1172     QBool *qb = qobject_to(QBool, qobj);
1173 
1174     if (qb == NULL) {
1175         error_setg(errp, "malformed %s", VFIO_USER_CAP_MULTI);
1176         return false;
1177     }
1178     if (qbool_get_bool(qb)) {
1179         proxy->flags |= VFIO_PROXY_USE_MULTI;
1180     }
1181     return true;
1182 }
1183 
1184 static struct cap_entry caps_cap[] = {
1185     { VFIO_USER_CAP_MAX_FDS, check_max_fds },
1186     { VFIO_USER_CAP_MAX_XFER, check_max_xfer },
1187     { VFIO_USER_CAP_PGSIZES, check_pgsizes },
1188     { VFIO_USER_CAP_MAP_MAX, check_max_dma },
1189     { VFIO_USER_CAP_MIGR, check_migr },
1190     { VFIO_USER_CAP_MULTI, check_multi },
1191     { NULL }
1192 };
1193 
1194 static bool check_cap(VFIOUserProxy *proxy, QObject *qobj, Error **errp)
1195 {
1196    QDict *qdict = qobject_to(QDict, qobj);
1197 
1198     if (qdict == NULL) {
1199         error_setg(errp, "malformed %s", VFIO_USER_CAP);
1200         return false;
1201     }
1202     return caps_parse(proxy, qdict, caps_cap, errp);
1203 }
1204 
1205 static struct cap_entry ver_0_0[] = {
1206     { VFIO_USER_CAP, check_cap },
1207     { NULL }
1208 };
1209 
1210 static bool caps_check(VFIOUserProxy *proxy, int minor, const char *caps,
1211                        Error **errp)
1212 {
1213     QObject *qobj;
1214     QDict *qdict;
1215     bool ret;
1216 
1217     qobj = qobject_from_json(caps, NULL);
1218     if (qobj == NULL) {
1219         error_setg(errp, "malformed capabilities %s", caps);
1220         return false;
1221     }
1222     qdict = qobject_to(QDict, qobj);
1223     if (qdict == NULL) {
1224         error_setg(errp, "capabilities %s not an object", caps);
1225         qobject_unref(qobj);
1226         return false;
1227     }
1228     ret = caps_parse(proxy, qdict, ver_0_0, errp);
1229 
1230     qobject_unref(qobj);
1231     return ret;
1232 }
1233 
1234 static GString *caps_json(void)
1235 {
1236     QDict *dict = qdict_new();
1237     QDict *capdict = qdict_new();
1238     QDict *migdict = qdict_new();
1239     GString *str;
1240 
1241     qdict_put_int(migdict, VFIO_USER_CAP_PGSIZE, VFIO_USER_DEF_PGSIZE);
1242     qdict_put_int(migdict, VFIO_USER_CAP_MAX_BITMAP, VFIO_USER_DEF_MAX_BITMAP);
1243     qdict_put_obj(capdict, VFIO_USER_CAP_MIGR, QOBJECT(migdict));
1244 
1245     qdict_put_int(capdict, VFIO_USER_CAP_MAX_FDS, VFIO_USER_MAX_MAX_FDS);
1246     qdict_put_int(capdict, VFIO_USER_CAP_MAX_XFER, VFIO_USER_DEF_MAX_XFER);
1247     qdict_put_int(capdict, VFIO_USER_CAP_PGSIZES, VFIO_USER_DEF_PGSIZE);
1248     qdict_put_int(capdict, VFIO_USER_CAP_MAP_MAX, VFIO_USER_DEF_MAP_MAX);
1249     qdict_put_bool(capdict, VFIO_USER_CAP_MULTI, true);
1250 
1251     qdict_put_obj(dict, VFIO_USER_CAP, QOBJECT(capdict));
1252 
1253     str = qobject_to_json(QOBJECT(dict));
1254     qobject_unref(dict);
1255     return str;
1256 }
1257 
1258 bool vfio_user_validate_version(VFIOUserProxy *proxy, Error **errp)
1259 {
1260     g_autofree VFIOUserVersion *msgp = NULL;
1261     GString *caps;
1262     char *reply;
1263     int size, caplen;
1264 
1265     caps = caps_json();
1266     caplen = caps->len + 1;
1267     size = sizeof(*msgp) + caplen;
1268     msgp = g_malloc0(size);
1269 
1270     vfio_user_request_msg(&msgp->hdr, VFIO_USER_VERSION, size, 0);
1271     msgp->major = VFIO_USER_MAJOR_VER;
1272     msgp->minor = VFIO_USER_MINOR_VER;
1273     memcpy(&msgp->capabilities, caps->str, caplen);
1274     g_string_free(caps, true);
1275     trace_vfio_user_version(msgp->major, msgp->minor, msgp->capabilities);
1276 
1277     if (!vfio_user_send_wait(proxy, &msgp->hdr, NULL, 0, errp)) {
1278         return false;
1279     }
1280 
1281     if (msgp->hdr.flags & VFIO_USER_ERROR) {
1282         error_setg_errno(errp, msgp->hdr.error_reply, "version reply");
1283         return false;
1284     }
1285 
1286     if (msgp->major != VFIO_USER_MAJOR_VER ||
1287         msgp->minor > VFIO_USER_MINOR_VER) {
1288         error_setg(errp, "incompatible server version");
1289         return false;
1290     }
1291 
1292     reply = msgp->capabilities;
1293     if (reply[msgp->hdr.size - sizeof(*msgp) - 1] != '\0') {
1294         error_setg(errp, "corrupt version reply");
1295         return false;
1296     }
1297 
1298     if (!caps_check(proxy, msgp->minor, reply, errp)) {
1299         return false;
1300     }
1301 
1302     trace_vfio_user_version(msgp->major, msgp->minor, msgp->capabilities);
1303     return true;
1304 }
1305 
1306 void vfio_user_flush_multi(VFIOUserProxy *proxy)
1307 {
1308     VFIOUserMsg *msg;
1309     VFIOUserWRMulti *wm = proxy->wr_multi;
1310     Error *local_err = NULL;
1311 
1312     proxy->wr_multi = NULL;
1313 
1314     /* adjust size for actual # of writes */
1315     wm->hdr.size -= (VFIO_USER_MULTI_MAX - wm->wr_cnt) * sizeof(VFIOUserWROne);
1316 
1317     msg = vfio_user_getmsg(proxy, &wm->hdr, NULL);
1318     msg->id = wm->hdr.id;
1319     msg->rsize = 0;
1320     msg->type = VFIO_MSG_ASYNC;
1321     trace_vfio_user_wrmulti("flush", wm->wr_cnt);
1322 
1323     if (!vfio_user_send_queued(proxy, msg, &local_err)) {
1324         error_report_err(local_err);
1325         vfio_user_recycle(proxy, msg);
1326     }
1327 }
1328 
1329 void vfio_user_create_multi(VFIOUserProxy *proxy)
1330 {
1331     VFIOUserWRMulti *wm;
1332 
1333     wm = g_malloc0(sizeof(*wm));
1334     vfio_user_request_msg(&wm->hdr, VFIO_USER_REGION_WRITE_MULTI,
1335                           sizeof(*wm), VFIO_USER_NO_REPLY);
1336     proxy->wr_multi = wm;
1337 }
1338 
1339 void vfio_user_add_multi(VFIOUserProxy *proxy, uint8_t index,
1340                          off_t offset, uint32_t count, void *data)
1341 {
1342     VFIOUserWRMulti *wm = proxy->wr_multi;
1343     VFIOUserWROne *w1 = &wm->wrs[wm->wr_cnt];
1344 
1345     w1->offset = offset;
1346     w1->region = index;
1347     w1->count = count;
1348     memcpy(&w1->data, data, count);
1349 
1350     wm->wr_cnt++;
1351     trace_vfio_user_wrmulti("add", wm->wr_cnt);
1352     if (wm->wr_cnt == VFIO_USER_MULTI_MAX ||
1353         proxy->num_outgoing < VFIO_USER_OUT_LOW) {
1354         vfio_user_flush_multi(proxy);
1355     }
1356 }
1357