xref: /openbmc/qemu/io/channel-socket.c (revision e9c692eabbbb7f395347605a6ef33a32d398ea25)
1 /*
2  * QEMU I/O channels sockets driver
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "qemu/osdep.h"
21 #include "qapi/error.h"
22 #include "qapi/qapi-visit-sockets.h"
23 #include "qemu/module.h"
24 #include "io/channel-socket.h"
25 #include "io/channel-util.h"
26 #include "io/channel-watch.h"
27 #include "trace.h"
28 #include "qapi/clone-visitor.h"
29 #ifdef CONFIG_LINUX
30 #include <linux/errqueue.h>
31 #include <sys/socket.h>
32 
33 #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY))
34 #define QEMU_MSG_ZEROCOPY
35 #endif
36 #endif
37 
38 #define SOCKET_MAX_FDS 16
39 
40 #ifdef QEMU_MSG_ZEROCOPY
41 static int qio_channel_socket_flush_internal(QIOChannel *ioc,
42                                              bool block,
43                                              Error **errp);
44 #endif
45 
46 SocketAddress *
47 qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
48                                      Error **errp)
49 {
50     return socket_sockaddr_to_address(&ioc->localAddr,
51                                       ioc->localAddrLen,
52                                       errp);
53 }
54 
55 SocketAddress *
56 qio_channel_socket_get_remote_address(QIOChannelSocket *ioc,
57                                       Error **errp)
58 {
59     return socket_sockaddr_to_address(&ioc->remoteAddr,
60                                       ioc->remoteAddrLen,
61                                       errp);
62 }
63 
64 QIOChannelSocket *
65 qio_channel_socket_new(void)
66 {
67     QIOChannelSocket *sioc;
68     QIOChannel *ioc;
69 
70     sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
71     sioc->fd = -1;
72     sioc->zero_copy_queued = 0;
73     sioc->zero_copy_sent = 0;
74     sioc->blocking = false;
75     sioc->new_zero_copy_sent_success = false;
76 
77     ioc = QIO_CHANNEL(sioc);
78     qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
79 
80 #ifdef WIN32
81     ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL);
82 #endif
83 
84     trace_qio_channel_socket_new(sioc);
85 
86     return sioc;
87 }
88 
89 int qio_channel_socket_set_send_buffer(QIOChannelSocket *ioc,
90                                        size_t size,
91                                        Error **errp)
92 {
93     if (setsockopt(ioc->fd, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) < 0) {
94         error_setg_errno(errp, errno, "Unable to set socket send buffer size");
95         return -1;
96     }
97 
98     return 0;
99 }
100 
101 static int
102 qio_channel_socket_set_fd(QIOChannelSocket *sioc,
103                           int fd,
104                           Error **errp)
105 {
106     if (sioc->fd != -1) {
107         error_setg(errp, "Socket is already open");
108         return -1;
109     }
110 
111     sioc->fd = fd;
112     sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
113     sioc->localAddrLen = sizeof(sioc->localAddr);
114 
115 
116     if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr,
117                     &sioc->remoteAddrLen) < 0) {
118         if (errno == ENOTCONN) {
119             memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr));
120             sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
121         } else {
122             error_setg_errno(errp, errno,
123                              "Unable to query remote socket address");
124             goto error;
125         }
126     }
127 
128     if (getsockname(fd, (struct sockaddr *)&sioc->localAddr,
129                     &sioc->localAddrLen) < 0) {
130         error_setg_errno(errp, errno,
131                          "Unable to query local socket address");
132         goto error;
133     }
134 
135 #ifndef WIN32
136     if (sioc->localAddr.ss_family == AF_UNIX) {
137         QIOChannel *ioc = QIO_CHANNEL(sioc);
138         qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS);
139     }
140 #endif /* WIN32 */
141 
142     return 0;
143 
144  error:
145     sioc->fd = -1; /* Let the caller close FD on failure */
146     return -1;
147 }
148 
149 QIOChannelSocket *
150 qio_channel_socket_new_fd(int fd,
151                           Error **errp)
152 {
153     QIOChannelSocket *ioc;
154 
155     ioc = qio_channel_socket_new();
156     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
157         object_unref(OBJECT(ioc));
158         return NULL;
159     }
160 
161     trace_qio_channel_socket_new_fd(ioc, fd);
162 
163     return ioc;
164 }
165 
166 
167 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
168                                     SocketAddress *addr,
169                                     Error **errp)
170 {
171     int fd;
172 
173     trace_qio_channel_socket_connect_sync(ioc, addr);
174     fd = socket_connect(addr, errp);
175     if (fd < 0) {
176         trace_qio_channel_socket_connect_fail(ioc);
177         return -1;
178     }
179 
180     trace_qio_channel_socket_connect_complete(ioc, fd);
181     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
182         close(fd);
183         return -1;
184     }
185 
186 #ifdef QEMU_MSG_ZEROCOPY
187     int ret, v = 1;
188     ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
189     if (ret == 0) {
190         /* Zero copy available on host */
191         qio_channel_set_feature(QIO_CHANNEL(ioc),
192                                 QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY);
193     }
194 #endif
195 
196     qio_channel_set_feature(QIO_CHANNEL(ioc),
197                             QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
198 
199     return 0;
200 }
201 
202 
203 static void qio_channel_socket_connect_worker(QIOTask *task,
204                                               gpointer opaque)
205 {
206     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
207     SocketAddress *addr = opaque;
208     Error *err = NULL;
209 
210     qio_channel_socket_connect_sync(ioc, addr, &err);
211 
212     qio_task_set_error(task, err);
213 }
214 
215 
216 void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
217                                       SocketAddress *addr,
218                                       QIOTaskFunc callback,
219                                       gpointer opaque,
220                                       GDestroyNotify destroy,
221                                       GMainContext *context)
222 {
223     QIOTask *task = qio_task_new(
224         OBJECT(ioc), callback, opaque, destroy);
225     SocketAddress *addrCopy;
226 
227     addrCopy = QAPI_CLONE(SocketAddress, addr);
228 
229     /* socket_connect() does a non-blocking connect(), but it
230      * still blocks in DNS lookups, so we must use a thread */
231     trace_qio_channel_socket_connect_async(ioc, addr);
232     qio_task_run_in_thread(task,
233                            qio_channel_socket_connect_worker,
234                            addrCopy,
235                            (GDestroyNotify)qapi_free_SocketAddress,
236                            context);
237 }
238 
239 
240 int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
241                                    SocketAddress *addr,
242                                    int num,
243                                    Error **errp)
244 {
245     int fd;
246 
247     trace_qio_channel_socket_listen_sync(ioc, addr, num);
248     fd = socket_listen(addr, num, errp);
249     if (fd < 0) {
250         trace_qio_channel_socket_listen_fail(ioc);
251         return -1;
252     }
253 
254     trace_qio_channel_socket_listen_complete(ioc, fd);
255     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
256         close(fd);
257         return -1;
258     }
259     qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN);
260 
261     return 0;
262 }
263 
264 
265 struct QIOChannelListenWorkerData {
266     SocketAddress *addr;
267     int num; /* amount of expected connections */
268 };
269 
270 static void qio_channel_listen_worker_free(gpointer opaque)
271 {
272     struct QIOChannelListenWorkerData *data = opaque;
273 
274     qapi_free_SocketAddress(data->addr);
275     g_free(data);
276 }
277 
278 static void qio_channel_socket_listen_worker(QIOTask *task,
279                                              gpointer opaque)
280 {
281     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
282     struct QIOChannelListenWorkerData *data = opaque;
283     Error *err = NULL;
284 
285     qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err);
286 
287     qio_task_set_error(task, err);
288 }
289 
290 
291 void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
292                                      SocketAddress *addr,
293                                      int num,
294                                      QIOTaskFunc callback,
295                                      gpointer opaque,
296                                      GDestroyNotify destroy,
297                                      GMainContext *context)
298 {
299     QIOTask *task = qio_task_new(
300         OBJECT(ioc), callback, opaque, destroy);
301     struct QIOChannelListenWorkerData *data;
302 
303     data = g_new0(struct QIOChannelListenWorkerData, 1);
304     data->addr = QAPI_CLONE(SocketAddress, addr);
305     data->num = num;
306 
307     /* socket_listen() blocks in DNS lookups, so we must use a thread */
308     trace_qio_channel_socket_listen_async(ioc, addr, num);
309     qio_task_run_in_thread(task,
310                            qio_channel_socket_listen_worker,
311                            data,
312                            qio_channel_listen_worker_free,
313                            context);
314 }
315 
316 
317 int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc,
318                                   SocketAddress *localAddr,
319                                   SocketAddress *remoteAddr,
320                                   Error **errp)
321 {
322     int fd;
323 
324     trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr);
325     fd = socket_dgram(remoteAddr, localAddr, errp);
326     if (fd < 0) {
327         trace_qio_channel_socket_dgram_fail(ioc);
328         return -1;
329     }
330 
331     trace_qio_channel_socket_dgram_complete(ioc, fd);
332     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
333         close(fd);
334         return -1;
335     }
336 
337     return 0;
338 }
339 
340 
341 struct QIOChannelSocketDGramWorkerData {
342     SocketAddress *localAddr;
343     SocketAddress *remoteAddr;
344 };
345 
346 
347 static void qio_channel_socket_dgram_worker_free(gpointer opaque)
348 {
349     struct QIOChannelSocketDGramWorkerData *data = opaque;
350     qapi_free_SocketAddress(data->localAddr);
351     qapi_free_SocketAddress(data->remoteAddr);
352     g_free(data);
353 }
354 
355 static void qio_channel_socket_dgram_worker(QIOTask *task,
356                                             gpointer opaque)
357 {
358     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
359     struct QIOChannelSocketDGramWorkerData *data = opaque;
360     Error *err = NULL;
361 
362     /* socket_dgram() blocks in DNS lookups, so we must use a thread */
363     qio_channel_socket_dgram_sync(ioc, data->localAddr,
364                                   data->remoteAddr, &err);
365 
366     qio_task_set_error(task, err);
367 }
368 
369 
370 void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
371                                     SocketAddress *localAddr,
372                                     SocketAddress *remoteAddr,
373                                     QIOTaskFunc callback,
374                                     gpointer opaque,
375                                     GDestroyNotify destroy,
376                                     GMainContext *context)
377 {
378     QIOTask *task = qio_task_new(
379         OBJECT(ioc), callback, opaque, destroy);
380     struct QIOChannelSocketDGramWorkerData *data = g_new0(
381         struct QIOChannelSocketDGramWorkerData, 1);
382 
383     data->localAddr = QAPI_CLONE(SocketAddress, localAddr);
384     data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr);
385 
386     trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr);
387     qio_task_run_in_thread(task,
388                            qio_channel_socket_dgram_worker,
389                            data,
390                            qio_channel_socket_dgram_worker_free,
391                            context);
392 }
393 
394 
395 QIOChannelSocket *
396 qio_channel_socket_accept(QIOChannelSocket *ioc,
397                           Error **errp)
398 {
399     QIOChannelSocket *cioc;
400 
401     cioc = qio_channel_socket_new();
402     cioc->remoteAddrLen = sizeof(ioc->remoteAddr);
403     cioc->localAddrLen = sizeof(ioc->localAddr);
404 
405  retry:
406     trace_qio_channel_socket_accept(ioc);
407     cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr,
408                            &cioc->remoteAddrLen);
409     if (cioc->fd < 0) {
410         if (errno == EINTR) {
411             goto retry;
412         }
413         error_setg_errno(errp, errno, "Unable to accept connection");
414         trace_qio_channel_socket_accept_fail(ioc);
415         goto error;
416     }
417 
418     if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr,
419                     &cioc->localAddrLen) < 0) {
420         error_setg_errno(errp, errno,
421                          "Unable to query local socket address");
422         goto error;
423     }
424 
425 #ifndef WIN32
426     if (cioc->localAddr.ss_family == AF_UNIX) {
427         QIOChannel *ioc_local = QIO_CHANNEL(cioc);
428         qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS);
429     }
430 #endif /* WIN32 */
431 
432     qio_channel_set_feature(QIO_CHANNEL(cioc),
433                             QIO_CHANNEL_FEATURE_READ_MSG_PEEK);
434 
435     trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd);
436     return cioc;
437 
438  error:
439     object_unref(OBJECT(cioc));
440     return NULL;
441 }
442 
443 static void qio_channel_socket_init(Object *obj)
444 {
445     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
446     ioc->fd = -1;
447 }
448 
449 static void qio_channel_socket_finalize(Object *obj)
450 {
451     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
452 
453     if (ioc->fd != -1) {
454         QIOChannel *ioc_local = QIO_CHANNEL(ioc);
455         if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) {
456             Error *err = NULL;
457 
458             socket_listen_cleanup(ioc->fd, &err);
459             if (err) {
460                 error_report_err(err);
461                 err = NULL;
462             }
463         }
464 #ifdef WIN32
465         qemu_socket_unselect_nofail(ioc->fd);
466 #endif
467         close(ioc->fd);
468         ioc->fd = -1;
469     }
470 }
471 
472 
473 #ifndef WIN32
474 static void qio_channel_socket_copy_fds(struct msghdr *msg,
475                                         int **fds, size_t *nfds)
476 {
477     struct cmsghdr *cmsg;
478 
479     *nfds = 0;
480     *fds = NULL;
481 
482     for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
483         int fd_size;
484         int gotfds;
485 
486         if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) ||
487             cmsg->cmsg_level != SOL_SOCKET ||
488             cmsg->cmsg_type != SCM_RIGHTS) {
489             continue;
490         }
491 
492         fd_size = cmsg->cmsg_len - CMSG_LEN(0);
493 
494         if (!fd_size) {
495             continue;
496         }
497 
498         gotfds = fd_size / sizeof(int);
499         *fds = g_renew(int, *fds, *nfds + gotfds);
500         memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size);
501         *nfds += gotfds;
502     }
503 }
504 
505 static bool qio_channel_handle_fds(int *fds, size_t nfds,
506                                    bool preserve_blocking, Error **errp)
507 {
508     int *end = fds + nfds, *fd;
509 
510 #ifdef MSG_CMSG_CLOEXEC
511     if (preserve_blocking) {
512         /* Nothing to do */
513         return true;
514     }
515 #endif
516 
517     for (fd = fds; fd != end; fd++) {
518         if (*fd < 0) {
519             continue;
520         }
521 
522         if (!preserve_blocking) {
523             /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
524             if (!qemu_set_blocking(*fd, true, errp)) {
525                 return false;
526             }
527         }
528 
529 #ifndef MSG_CMSG_CLOEXEC
530         qemu_set_cloexec(*fd);
531 #endif
532     }
533 
534     return true;
535 }
536 
537 static void qio_channel_cleanup_fds(int **fds, size_t *nfds)
538 {
539     for (size_t i = 0; i < *nfds; i++) {
540         if ((*fds)[i] < 0) {
541             continue;
542         }
543         close((*fds)[i]);
544     }
545 
546     g_clear_pointer(fds, g_free);
547     *nfds = 0;
548 }
549 
550 
551 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
552                                         const struct iovec *iov,
553                                         size_t niov,
554                                         int **fds,
555                                         size_t *nfds,
556                                         int flags,
557                                         Error **errp)
558 {
559     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
560     ssize_t ret;
561     struct msghdr msg = { NULL, };
562     char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
563     int sflags = 0;
564 
565     memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
566 
567     msg.msg_iov = (struct iovec *)iov;
568     msg.msg_iovlen = niov;
569     if (fds && nfds) {
570         msg.msg_control = control;
571         msg.msg_controllen = sizeof(control);
572 #ifdef MSG_CMSG_CLOEXEC
573         sflags |= MSG_CMSG_CLOEXEC;
574 #endif
575 
576     }
577 
578     if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
579         sflags |= MSG_PEEK;
580     }
581 
582  retry:
583     ret = recvmsg(sioc->fd, &msg, sflags);
584     if (ret < 0) {
585         if (errno == EAGAIN) {
586             return QIO_CHANNEL_ERR_BLOCK;
587         }
588         if (errno == EINTR) {
589             goto retry;
590         }
591 
592         error_setg_errno(errp, errno,
593                          "Unable to read from socket");
594         return -1;
595     }
596 
597     if (fds && nfds) {
598         bool preserve_blocking =
599             flags & QIO_CHANNEL_READ_FLAG_FD_PRESERVE_BLOCKING;
600 
601         qio_channel_socket_copy_fds(&msg, fds, nfds);
602 
603         if (!qio_channel_handle_fds(*fds, *nfds,
604                                     preserve_blocking, errp)) {
605             qio_channel_cleanup_fds(fds, nfds);
606             return -1;
607         }
608     }
609 
610     return ret;
611 }
612 
613 static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
614                                          const struct iovec *iov,
615                                          size_t niov,
616                                          int *fds,
617                                          size_t nfds,
618                                          int flags,
619                                          Error **errp)
620 {
621     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
622     ssize_t ret;
623     struct msghdr msg = { NULL, };
624     char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
625     size_t fdsize = sizeof(int) * nfds;
626     struct cmsghdr *cmsg;
627     int sflags = 0;
628 #ifdef QEMU_MSG_ZEROCOPY
629     bool blocking = sioc->blocking;
630     bool zerocopy_flushed_once = false;
631 #endif
632 
633     memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
634 
635     msg.msg_iov = (struct iovec *)iov;
636     msg.msg_iovlen = niov;
637 
638     if (nfds) {
639         if (nfds > SOCKET_MAX_FDS) {
640             error_setg_errno(errp, EINVAL,
641                              "Only %d FDs can be sent, got %zu",
642                              SOCKET_MAX_FDS, nfds);
643             return -1;
644         }
645 
646         msg.msg_control = control;
647         msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds);
648 
649         cmsg = CMSG_FIRSTHDR(&msg);
650         cmsg->cmsg_len = CMSG_LEN(fdsize);
651         cmsg->cmsg_level = SOL_SOCKET;
652         cmsg->cmsg_type = SCM_RIGHTS;
653         memcpy(CMSG_DATA(cmsg), fds, fdsize);
654     }
655 
656     if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
657 #ifdef QEMU_MSG_ZEROCOPY
658         sflags = MSG_ZEROCOPY;
659 #else
660         /*
661          * We expect QIOChannel class entry point to have
662          * blocked this code path already
663          */
664         g_assert_not_reached();
665 #endif
666     }
667 
668  retry:
669     ret = sendmsg(sioc->fd, &msg, sflags);
670     if (ret <= 0) {
671         switch (errno) {
672         case EAGAIN:
673             return QIO_CHANNEL_ERR_BLOCK;
674         case EINTR:
675             goto retry;
676 #ifdef QEMU_MSG_ZEROCOPY
677         case ENOBUFS:
678             if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
679                 /**
680                  * Socket error queueing may exhaust the OPTMEM limit. Try
681                  * flushing the error queue once.
682                  */
683                 if (!zerocopy_flushed_once) {
684                     ret = qio_channel_socket_flush_internal(ioc, blocking,
685                                                             errp);
686                     if (ret < 0) {
687                         return -1;
688                     }
689                     zerocopy_flushed_once = true;
690                     goto retry;
691                 } else {
692                     error_setg_errno(errp, errno,
693                                      "Process can't lock enough memory for "
694                                      "using MSG_ZEROCOPY");
695                     return -1;
696                 }
697             }
698             break;
699 #endif
700         }
701 
702         error_setg_errno(errp, errno,
703                          "Unable to write to socket");
704         return -1;
705     }
706 
707     if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
708         sioc->zero_copy_queued++;
709     }
710 
711     return ret;
712 }
713 #else /* WIN32 */
714 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
715                                         const struct iovec *iov,
716                                         size_t niov,
717                                         int **fds,
718                                         size_t *nfds,
719                                         int flags,
720                                         Error **errp)
721 {
722     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
723     ssize_t done = 0;
724     ssize_t i;
725     int sflags = 0;
726 
727     if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) {
728         sflags |= MSG_PEEK;
729     }
730 
731     for (i = 0; i < niov; i++) {
732         ssize_t ret;
733     retry:
734         ret = recv(sioc->fd,
735                    iov[i].iov_base,
736                    iov[i].iov_len,
737                    sflags);
738         if (ret < 0) {
739             if (errno == EAGAIN) {
740                 if (done) {
741                     return done;
742                 } else {
743                     return QIO_CHANNEL_ERR_BLOCK;
744                 }
745             } else if (errno == EINTR) {
746                 goto retry;
747             } else {
748                 error_setg_errno(errp, errno,
749                                  "Unable to read from socket");
750                 return -1;
751             }
752         }
753         done += ret;
754         if (ret < iov[i].iov_len) {
755             return done;
756         }
757     }
758 
759     return done;
760 }
761 
762 static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
763                                          const struct iovec *iov,
764                                          size_t niov,
765                                          int *fds,
766                                          size_t nfds,
767                                          int flags,
768                                          Error **errp)
769 {
770     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
771     ssize_t done = 0;
772     ssize_t i;
773 
774     for (i = 0; i < niov; i++) {
775         ssize_t ret;
776     retry:
777         ret = send(sioc->fd,
778                    iov[i].iov_base,
779                    iov[i].iov_len,
780                    0);
781         if (ret < 0) {
782             if (errno == EAGAIN) {
783                 if (done) {
784                     return done;
785                 } else {
786                     return QIO_CHANNEL_ERR_BLOCK;
787                 }
788             } else if (errno == EINTR) {
789                 goto retry;
790             } else {
791                 error_setg_errno(errp, errno,
792                                  "Unable to write to socket");
793                 return -1;
794             }
795         }
796         done += ret;
797         if (ret < iov[i].iov_len) {
798             return done;
799         }
800     }
801 
802     return done;
803 }
804 #endif /* WIN32 */
805 
806 
807 #ifdef QEMU_MSG_ZEROCOPY
808 static int qio_channel_socket_flush_internal(QIOChannel *ioc,
809                                              bool block,
810                                              Error **errp)
811 {
812     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
813     struct msghdr msg = {};
814     struct sock_extended_err *serr;
815     struct cmsghdr *cm;
816     char control[CMSG_SPACE(sizeof(*serr))];
817     int received;
818 
819     if (sioc->zero_copy_queued == sioc->zero_copy_sent) {
820         return 0;
821     }
822 
823     msg.msg_control = control;
824     msg.msg_controllen = sizeof(control);
825     memset(control, 0, sizeof(control));
826 
827     while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
828         received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
829         if (received < 0) {
830             switch (errno) {
831             case EAGAIN:
832                 if (block) {
833                     /*
834                      * Nothing on errqueue, wait until something is
835                      * available.
836                      *
837                      * Use G_IO_ERR instead of G_IO_IN since MSG_ERRQUEUE reads
838                      * are signaled via POLLERR, not POLLIN, as the kernel
839                      * sets POLLERR when zero-copy notificatons appear on the
840                      * socket error queue.
841                      */
842                     qio_channel_wait(ioc, G_IO_ERR);
843                     continue;
844                 }
845                 return 0;
846             case EINTR:
847                 continue;
848             default:
849                 error_setg_errno(errp, errno,
850                                  "Unable to read errqueue");
851                 return -1;
852             }
853         }
854 
855         cm = CMSG_FIRSTHDR(&msg);
856         if (cm->cmsg_level != SOL_IP   && cm->cmsg_type != IP_RECVERR &&
857             cm->cmsg_level != SOL_IPV6 && cm->cmsg_type != IPV6_RECVERR) {
858             error_setg_errno(errp, EPROTOTYPE,
859                              "Wrong cmsg in errqueue");
860             return -1;
861         }
862 
863         serr = (void *) CMSG_DATA(cm);
864         if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
865             error_setg_errno(errp, serr->ee_errno,
866                              "Error on socket");
867             return -1;
868         }
869         if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
870             error_setg_errno(errp, serr->ee_origin,
871                              "Error not from zero copy");
872             return -1;
873         }
874         if (serr->ee_data < serr->ee_info) {
875             error_setg_errno(errp, serr->ee_origin,
876                              "Wrong notification bounds");
877             return -1;
878         }
879 
880         /* No errors, count successfully finished sendmsg()*/
881         sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1;
882 
883         /* If any sendmsg() succeeded using zero copy, mark zerocopy success */
884         if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) {
885             sioc->new_zero_copy_sent_success = true;
886         }
887     }
888 
889     return 0;
890 }
891 
892 static int qio_channel_socket_flush(QIOChannel *ioc,
893                                     Error **errp)
894 {
895     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
896     int ret;
897 
898     ret = qio_channel_socket_flush_internal(ioc, true, errp);
899     if (ret < 0) {
900         return ret;
901     }
902 
903     if (sioc->new_zero_copy_sent_success) {
904         sioc->new_zero_copy_sent_success = false;
905         return 0;
906     }
907 
908     return 1;
909 }
910 
911 #endif /* QEMU_MSG_ZEROCOPY */
912 
913 static int
914 qio_channel_socket_set_blocking(QIOChannel *ioc,
915                                 bool enabled,
916                                 Error **errp)
917 {
918     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
919     sioc->blocking = enabled;
920 
921     if (!qemu_set_blocking(sioc->fd, enabled, errp)) {
922         return -1;
923     }
924 
925     return 0;
926 }
927 
928 
929 static void
930 qio_channel_socket_set_delay(QIOChannel *ioc,
931                              bool enabled)
932 {
933     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
934     int v = enabled ? 0 : 1;
935 
936     setsockopt(sioc->fd,
937                IPPROTO_TCP, TCP_NODELAY,
938                &v, sizeof(v));
939 }
940 
941 
942 static void
943 qio_channel_socket_set_cork(QIOChannel *ioc,
944                             bool enabled)
945 {
946     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
947     int v = enabled ? 1 : 0;
948 
949     socket_set_cork(sioc->fd, v);
950 }
951 
952 static int
953 qio_channel_socket_get_peerpid(QIOChannel *ioc,
954                                unsigned int *pid,
955                                Error **errp)
956 {
957 #ifdef CONFIG_LINUX
958     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
959     Error *err = NULL;
960     socklen_t len = sizeof(struct ucred);
961 
962     struct ucred cred;
963     if (getsockopt(sioc->fd,
964                SOL_SOCKET, SO_PEERCRED,
965                &cred, &len) == -1) {
966         error_setg_errno(&err, errno, "Unable to get peer credentials");
967         error_propagate(errp, err);
968         *pid = -1;
969         return -1;
970     }
971     *pid = (unsigned int)cred.pid;
972     return 0;
973 #else
974     error_setg(errp, "Unsupported feature");
975     *pid = -1;
976     return -1;
977 #endif
978 }
979 
980 static int
981 qio_channel_socket_close(QIOChannel *ioc,
982                          Error **errp)
983 {
984     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
985     int rc = 0;
986     Error *err = NULL;
987 
988     if (sioc->fd != -1) {
989 #ifdef WIN32
990         qemu_socket_unselect_nofail(sioc->fd);
991 #endif
992         if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) {
993             socket_listen_cleanup(sioc->fd, errp);
994         }
995 
996         if (close(sioc->fd) < 0) {
997             sioc->fd = -1;
998             error_setg_errno(&err, errno, "Unable to close socket");
999             error_propagate(errp, err);
1000             return -1;
1001         }
1002         sioc->fd = -1;
1003     }
1004     return rc;
1005 }
1006 
1007 static int
1008 qio_channel_socket_shutdown(QIOChannel *ioc,
1009                             QIOChannelShutdown how,
1010                             Error **errp)
1011 {
1012     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
1013     int sockhow;
1014 
1015     switch (how) {
1016     case QIO_CHANNEL_SHUTDOWN_READ:
1017         sockhow = SHUT_RD;
1018         break;
1019     case QIO_CHANNEL_SHUTDOWN_WRITE:
1020         sockhow = SHUT_WR;
1021         break;
1022     case QIO_CHANNEL_SHUTDOWN_BOTH:
1023     default:
1024         sockhow = SHUT_RDWR;
1025         break;
1026     }
1027 
1028     if (shutdown(sioc->fd, sockhow) < 0) {
1029         error_setg_errno(errp, errno,
1030                          "Unable to shutdown socket");
1031         return -1;
1032     }
1033     return 0;
1034 }
1035 
1036 static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
1037                                                   AioContext *read_ctx,
1038                                                   IOHandler *io_read,
1039                                                   AioContext *write_ctx,
1040                                                   IOHandler *io_write,
1041                                                   void *opaque)
1042 {
1043     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
1044 
1045     qio_channel_util_set_aio_fd_handler(sioc->fd, read_ctx, io_read,
1046                                         sioc->fd, write_ctx, io_write,
1047                                         opaque);
1048 }
1049 
1050 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
1051                                                 GIOCondition condition)
1052 {
1053     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
1054     return qio_channel_create_socket_watch(ioc,
1055                                            sioc->fd,
1056                                            condition);
1057 }
1058 
1059 static void qio_channel_socket_class_init(ObjectClass *klass,
1060                                           const void *class_data G_GNUC_UNUSED)
1061 {
1062     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
1063 
1064     ioc_klass->io_writev = qio_channel_socket_writev;
1065     ioc_klass->io_readv = qio_channel_socket_readv;
1066     ioc_klass->io_set_blocking = qio_channel_socket_set_blocking;
1067     ioc_klass->io_close = qio_channel_socket_close;
1068     ioc_klass->io_shutdown = qio_channel_socket_shutdown;
1069     ioc_klass->io_set_cork = qio_channel_socket_set_cork;
1070     ioc_klass->io_set_delay = qio_channel_socket_set_delay;
1071     ioc_klass->io_create_watch = qio_channel_socket_create_watch;
1072     ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
1073 #ifdef QEMU_MSG_ZEROCOPY
1074     ioc_klass->io_flush = qio_channel_socket_flush;
1075 #endif
1076     ioc_klass->io_peerpid = qio_channel_socket_get_peerpid;
1077 }
1078 
1079 static const TypeInfo qio_channel_socket_info = {
1080     .parent = TYPE_QIO_CHANNEL,
1081     .name = TYPE_QIO_CHANNEL_SOCKET,
1082     .instance_size = sizeof(QIOChannelSocket),
1083     .instance_init = qio_channel_socket_init,
1084     .instance_finalize = qio_channel_socket_finalize,
1085     .class_init = qio_channel_socket_class_init,
1086 };
1087 
1088 static void qio_channel_socket_register_types(void)
1089 {
1090     type_register_static(&qio_channel_socket_info);
1091 }
1092 
1093 type_init(qio_channel_socket_register_types);
1094