xref: /openbmc/qemu/io/channel-socket.c (revision 803ca43e)
1 /*
2  * QEMU I/O channels sockets driver
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "qemu/osdep.h"
21 #include "qapi/error.h"
22 #include "qapi/qapi-visit-sockets.h"
23 #include "qemu/module.h"
24 #include "io/channel-socket.h"
25 #include "io/channel-watch.h"
26 #include "trace.h"
27 #include "qapi/clone-visitor.h"
28 #ifdef CONFIG_LINUX
29 #include <linux/errqueue.h>
30 #include <sys/socket.h>
31 
32 #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY))
33 #define QEMU_MSG_ZEROCOPY
34 #endif
35 #endif
36 
37 #define SOCKET_MAX_FDS 16
38 
39 SocketAddress *
40 qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
41                                      Error **errp)
42 {
43     return socket_sockaddr_to_address(&ioc->localAddr,
44                                       ioc->localAddrLen,
45                                       errp);
46 }
47 
48 SocketAddress *
49 qio_channel_socket_get_remote_address(QIOChannelSocket *ioc,
50                                       Error **errp)
51 {
52     return socket_sockaddr_to_address(&ioc->remoteAddr,
53                                       ioc->remoteAddrLen,
54                                       errp);
55 }
56 
57 QIOChannelSocket *
58 qio_channel_socket_new(void)
59 {
60     QIOChannelSocket *sioc;
61     QIOChannel *ioc;
62 
63     sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
64     sioc->fd = -1;
65     sioc->zero_copy_queued = 0;
66     sioc->zero_copy_sent = 0;
67 
68     ioc = QIO_CHANNEL(sioc);
69     qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
70 
71 #ifdef WIN32
72     ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL);
73 #endif
74 
75     trace_qio_channel_socket_new(sioc);
76 
77     return sioc;
78 }
79 
80 
81 static int
82 qio_channel_socket_set_fd(QIOChannelSocket *sioc,
83                           int fd,
84                           Error **errp)
85 {
86     if (sioc->fd != -1) {
87         error_setg(errp, "Socket is already open");
88         return -1;
89     }
90 
91     sioc->fd = fd;
92     sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
93     sioc->localAddrLen = sizeof(sioc->localAddr);
94 
95 
96     if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr,
97                     &sioc->remoteAddrLen) < 0) {
98         if (errno == ENOTCONN) {
99             memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr));
100             sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
101         } else {
102             error_setg_errno(errp, errno,
103                              "Unable to query remote socket address");
104             goto error;
105         }
106     }
107 
108     if (getsockname(fd, (struct sockaddr *)&sioc->localAddr,
109                     &sioc->localAddrLen) < 0) {
110         error_setg_errno(errp, errno,
111                          "Unable to query local socket address");
112         goto error;
113     }
114 
115 #ifndef WIN32
116     if (sioc->localAddr.ss_family == AF_UNIX) {
117         QIOChannel *ioc = QIO_CHANNEL(sioc);
118         qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS);
119     }
120 #endif /* WIN32 */
121 
122     return 0;
123 
124  error:
125     sioc->fd = -1; /* Let the caller close FD on failure */
126     return -1;
127 }
128 
129 QIOChannelSocket *
130 qio_channel_socket_new_fd(int fd,
131                           Error **errp)
132 {
133     QIOChannelSocket *ioc;
134 
135     ioc = qio_channel_socket_new();
136     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
137         object_unref(OBJECT(ioc));
138         return NULL;
139     }
140 
141     trace_qio_channel_socket_new_fd(ioc, fd);
142 
143     return ioc;
144 }
145 
146 
147 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
148                                     SocketAddress *addr,
149                                     Error **errp)
150 {
151     int fd;
152 
153     trace_qio_channel_socket_connect_sync(ioc, addr);
154     fd = socket_connect(addr, errp);
155     if (fd < 0) {
156         trace_qio_channel_socket_connect_fail(ioc);
157         return -1;
158     }
159 
160     trace_qio_channel_socket_connect_complete(ioc, fd);
161     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
162         close(fd);
163         return -1;
164     }
165 
166 #ifdef QEMU_MSG_ZEROCOPY
167     int ret, v = 1;
168     ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
169     if (ret == 0) {
170         /* Zero copy available on host */
171         qio_channel_set_feature(QIO_CHANNEL(ioc),
172                                 QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY);
173     }
174 #endif
175 
176     return 0;
177 }
178 
179 
180 static void qio_channel_socket_connect_worker(QIOTask *task,
181                                               gpointer opaque)
182 {
183     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
184     SocketAddress *addr = opaque;
185     Error *err = NULL;
186 
187     qio_channel_socket_connect_sync(ioc, addr, &err);
188 
189     qio_task_set_error(task, err);
190 }
191 
192 
193 void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
194                                       SocketAddress *addr,
195                                       QIOTaskFunc callback,
196                                       gpointer opaque,
197                                       GDestroyNotify destroy,
198                                       GMainContext *context)
199 {
200     QIOTask *task = qio_task_new(
201         OBJECT(ioc), callback, opaque, destroy);
202     SocketAddress *addrCopy;
203 
204     addrCopy = QAPI_CLONE(SocketAddress, addr);
205 
206     /* socket_connect() does a non-blocking connect(), but it
207      * still blocks in DNS lookups, so we must use a thread */
208     trace_qio_channel_socket_connect_async(ioc, addr);
209     qio_task_run_in_thread(task,
210                            qio_channel_socket_connect_worker,
211                            addrCopy,
212                            (GDestroyNotify)qapi_free_SocketAddress,
213                            context);
214 }
215 
216 
217 int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
218                                    SocketAddress *addr,
219                                    int num,
220                                    Error **errp)
221 {
222     int fd;
223 
224     trace_qio_channel_socket_listen_sync(ioc, addr, num);
225     fd = socket_listen(addr, num, errp);
226     if (fd < 0) {
227         trace_qio_channel_socket_listen_fail(ioc);
228         return -1;
229     }
230 
231     trace_qio_channel_socket_listen_complete(ioc, fd);
232     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
233         close(fd);
234         return -1;
235     }
236     qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN);
237 
238     return 0;
239 }
240 
241 
242 struct QIOChannelListenWorkerData {
243     SocketAddress *addr;
244     int num; /* amount of expected connections */
245 };
246 
247 static void qio_channel_listen_worker_free(gpointer opaque)
248 {
249     struct QIOChannelListenWorkerData *data = opaque;
250 
251     qapi_free_SocketAddress(data->addr);
252     g_free(data);
253 }
254 
255 static void qio_channel_socket_listen_worker(QIOTask *task,
256                                              gpointer opaque)
257 {
258     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
259     struct QIOChannelListenWorkerData *data = opaque;
260     Error *err = NULL;
261 
262     qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err);
263 
264     qio_task_set_error(task, err);
265 }
266 
267 
268 void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
269                                      SocketAddress *addr,
270                                      int num,
271                                      QIOTaskFunc callback,
272                                      gpointer opaque,
273                                      GDestroyNotify destroy,
274                                      GMainContext *context)
275 {
276     QIOTask *task = qio_task_new(
277         OBJECT(ioc), callback, opaque, destroy);
278     struct QIOChannelListenWorkerData *data;
279 
280     data = g_new0(struct QIOChannelListenWorkerData, 1);
281     data->addr = QAPI_CLONE(SocketAddress, addr);
282     data->num = num;
283 
284     /* socket_listen() blocks in DNS lookups, so we must use a thread */
285     trace_qio_channel_socket_listen_async(ioc, addr, num);
286     qio_task_run_in_thread(task,
287                            qio_channel_socket_listen_worker,
288                            data,
289                            qio_channel_listen_worker_free,
290                            context);
291 }
292 
293 
294 int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc,
295                                   SocketAddress *localAddr,
296                                   SocketAddress *remoteAddr,
297                                   Error **errp)
298 {
299     int fd;
300 
301     trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr);
302     fd = socket_dgram(remoteAddr, localAddr, errp);
303     if (fd < 0) {
304         trace_qio_channel_socket_dgram_fail(ioc);
305         return -1;
306     }
307 
308     trace_qio_channel_socket_dgram_complete(ioc, fd);
309     if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
310         close(fd);
311         return -1;
312     }
313 
314     return 0;
315 }
316 
317 
318 struct QIOChannelSocketDGramWorkerData {
319     SocketAddress *localAddr;
320     SocketAddress *remoteAddr;
321 };
322 
323 
324 static void qio_channel_socket_dgram_worker_free(gpointer opaque)
325 {
326     struct QIOChannelSocketDGramWorkerData *data = opaque;
327     qapi_free_SocketAddress(data->localAddr);
328     qapi_free_SocketAddress(data->remoteAddr);
329     g_free(data);
330 }
331 
332 static void qio_channel_socket_dgram_worker(QIOTask *task,
333                                             gpointer opaque)
334 {
335     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
336     struct QIOChannelSocketDGramWorkerData *data = opaque;
337     Error *err = NULL;
338 
339     /* socket_dgram() blocks in DNS lookups, so we must use a thread */
340     qio_channel_socket_dgram_sync(ioc, data->localAddr,
341                                   data->remoteAddr, &err);
342 
343     qio_task_set_error(task, err);
344 }
345 
346 
347 void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
348                                     SocketAddress *localAddr,
349                                     SocketAddress *remoteAddr,
350                                     QIOTaskFunc callback,
351                                     gpointer opaque,
352                                     GDestroyNotify destroy,
353                                     GMainContext *context)
354 {
355     QIOTask *task = qio_task_new(
356         OBJECT(ioc), callback, opaque, destroy);
357     struct QIOChannelSocketDGramWorkerData *data = g_new0(
358         struct QIOChannelSocketDGramWorkerData, 1);
359 
360     data->localAddr = QAPI_CLONE(SocketAddress, localAddr);
361     data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr);
362 
363     trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr);
364     qio_task_run_in_thread(task,
365                            qio_channel_socket_dgram_worker,
366                            data,
367                            qio_channel_socket_dgram_worker_free,
368                            context);
369 }
370 
371 
372 QIOChannelSocket *
373 qio_channel_socket_accept(QIOChannelSocket *ioc,
374                           Error **errp)
375 {
376     QIOChannelSocket *cioc;
377 
378     cioc = qio_channel_socket_new();
379     cioc->remoteAddrLen = sizeof(ioc->remoteAddr);
380     cioc->localAddrLen = sizeof(ioc->localAddr);
381 
382  retry:
383     trace_qio_channel_socket_accept(ioc);
384     cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr,
385                            &cioc->remoteAddrLen);
386     if (cioc->fd < 0) {
387         if (errno == EINTR) {
388             goto retry;
389         }
390         error_setg_errno(errp, errno, "Unable to accept connection");
391         trace_qio_channel_socket_accept_fail(ioc);
392         goto error;
393     }
394 
395     if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr,
396                     &cioc->localAddrLen) < 0) {
397         error_setg_errno(errp, errno,
398                          "Unable to query local socket address");
399         goto error;
400     }
401 
402 #ifndef WIN32
403     if (cioc->localAddr.ss_family == AF_UNIX) {
404         QIOChannel *ioc_local = QIO_CHANNEL(cioc);
405         qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS);
406     }
407 #endif /* WIN32 */
408 
409     trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd);
410     return cioc;
411 
412  error:
413     object_unref(OBJECT(cioc));
414     return NULL;
415 }
416 
417 static void qio_channel_socket_init(Object *obj)
418 {
419     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
420     ioc->fd = -1;
421 }
422 
423 static void qio_channel_socket_finalize(Object *obj)
424 {
425     QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
426 
427     if (ioc->fd != -1) {
428         QIOChannel *ioc_local = QIO_CHANNEL(ioc);
429         if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) {
430             Error *err = NULL;
431 
432             socket_listen_cleanup(ioc->fd, &err);
433             if (err) {
434                 error_report_err(err);
435                 err = NULL;
436             }
437         }
438 #ifdef WIN32
439         WSAEventSelect(ioc->fd, NULL, 0);
440 #endif
441         closesocket(ioc->fd);
442         ioc->fd = -1;
443     }
444 }
445 
446 
447 #ifndef WIN32
448 static void qio_channel_socket_copy_fds(struct msghdr *msg,
449                                         int **fds, size_t *nfds)
450 {
451     struct cmsghdr *cmsg;
452 
453     *nfds = 0;
454     *fds = NULL;
455 
456     for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
457         int fd_size, i;
458         int gotfds;
459 
460         if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) ||
461             cmsg->cmsg_level != SOL_SOCKET ||
462             cmsg->cmsg_type != SCM_RIGHTS) {
463             continue;
464         }
465 
466         fd_size = cmsg->cmsg_len - CMSG_LEN(0);
467 
468         if (!fd_size) {
469             continue;
470         }
471 
472         gotfds = fd_size / sizeof(int);
473         *fds = g_renew(int, *fds, *nfds + gotfds);
474         memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size);
475 
476         for (i = 0; i < gotfds; i++) {
477             int fd = (*fds)[*nfds + i];
478             if (fd < 0) {
479                 continue;
480             }
481 
482             /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
483             qemu_socket_set_block(fd);
484 
485 #ifndef MSG_CMSG_CLOEXEC
486             qemu_set_cloexec(fd);
487 #endif
488         }
489         *nfds += gotfds;
490     }
491 }
492 
493 
494 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
495                                         const struct iovec *iov,
496                                         size_t niov,
497                                         int **fds,
498                                         size_t *nfds,
499                                         Error **errp)
500 {
501     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
502     ssize_t ret;
503     struct msghdr msg = { NULL, };
504     char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
505     int sflags = 0;
506 
507     memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
508 
509     msg.msg_iov = (struct iovec *)iov;
510     msg.msg_iovlen = niov;
511     if (fds && nfds) {
512         msg.msg_control = control;
513         msg.msg_controllen = sizeof(control);
514 #ifdef MSG_CMSG_CLOEXEC
515         sflags |= MSG_CMSG_CLOEXEC;
516 #endif
517 
518     }
519 
520  retry:
521     ret = recvmsg(sioc->fd, &msg, sflags);
522     if (ret < 0) {
523         if (errno == EAGAIN) {
524             return QIO_CHANNEL_ERR_BLOCK;
525         }
526         if (errno == EINTR) {
527             goto retry;
528         }
529 
530         error_setg_errno(errp, errno,
531                          "Unable to read from socket");
532         return -1;
533     }
534 
535     if (fds && nfds) {
536         qio_channel_socket_copy_fds(&msg, fds, nfds);
537     }
538 
539     return ret;
540 }
541 
542 static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
543                                          const struct iovec *iov,
544                                          size_t niov,
545                                          int *fds,
546                                          size_t nfds,
547                                          int flags,
548                                          Error **errp)
549 {
550     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
551     ssize_t ret;
552     struct msghdr msg = { NULL, };
553     char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
554     size_t fdsize = sizeof(int) * nfds;
555     struct cmsghdr *cmsg;
556     int sflags = 0;
557 
558     memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
559 
560     msg.msg_iov = (struct iovec *)iov;
561     msg.msg_iovlen = niov;
562 
563     if (nfds) {
564         if (nfds > SOCKET_MAX_FDS) {
565             error_setg_errno(errp, EINVAL,
566                              "Only %d FDs can be sent, got %zu",
567                              SOCKET_MAX_FDS, nfds);
568             return -1;
569         }
570 
571         msg.msg_control = control;
572         msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds);
573 
574         cmsg = CMSG_FIRSTHDR(&msg);
575         cmsg->cmsg_len = CMSG_LEN(fdsize);
576         cmsg->cmsg_level = SOL_SOCKET;
577         cmsg->cmsg_type = SCM_RIGHTS;
578         memcpy(CMSG_DATA(cmsg), fds, fdsize);
579     }
580 
581     if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
582 #ifdef QEMU_MSG_ZEROCOPY
583         sflags = MSG_ZEROCOPY;
584 #else
585         /*
586          * We expect QIOChannel class entry point to have
587          * blocked this code path already
588          */
589         g_assert_not_reached();
590 #endif
591     }
592 
593  retry:
594     ret = sendmsg(sioc->fd, &msg, sflags);
595     if (ret <= 0) {
596         switch (errno) {
597         case EAGAIN:
598             return QIO_CHANNEL_ERR_BLOCK;
599         case EINTR:
600             goto retry;
601         case ENOBUFS:
602             if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
603                 error_setg_errno(errp, errno,
604                                  "Process can't lock enough memory for using MSG_ZEROCOPY");
605                 return -1;
606             }
607             break;
608         }
609 
610         error_setg_errno(errp, errno,
611                          "Unable to write to socket");
612         return -1;
613     }
614     return ret;
615 }
616 #else /* WIN32 */
617 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
618                                         const struct iovec *iov,
619                                         size_t niov,
620                                         int **fds,
621                                         size_t *nfds,
622                                         Error **errp)
623 {
624     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
625     ssize_t done = 0;
626     ssize_t i;
627 
628     for (i = 0; i < niov; i++) {
629         ssize_t ret;
630     retry:
631         ret = recv(sioc->fd,
632                    iov[i].iov_base,
633                    iov[i].iov_len,
634                    0);
635         if (ret < 0) {
636             if (errno == EAGAIN) {
637                 if (done) {
638                     return done;
639                 } else {
640                     return QIO_CHANNEL_ERR_BLOCK;
641                 }
642             } else if (errno == EINTR) {
643                 goto retry;
644             } else {
645                 error_setg_errno(errp, errno,
646                                  "Unable to read from socket");
647                 return -1;
648             }
649         }
650         done += ret;
651         if (ret < iov[i].iov_len) {
652             return done;
653         }
654     }
655 
656     return done;
657 }
658 
659 static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
660                                          const struct iovec *iov,
661                                          size_t niov,
662                                          int *fds,
663                                          size_t nfds,
664                                          int flags,
665                                          Error **errp)
666 {
667     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
668     ssize_t done = 0;
669     ssize_t i;
670 
671     for (i = 0; i < niov; i++) {
672         ssize_t ret;
673     retry:
674         ret = send(sioc->fd,
675                    iov[i].iov_base,
676                    iov[i].iov_len,
677                    0);
678         if (ret < 0) {
679             if (errno == EAGAIN) {
680                 if (done) {
681                     return done;
682                 } else {
683                     return QIO_CHANNEL_ERR_BLOCK;
684                 }
685             } else if (errno == EINTR) {
686                 goto retry;
687             } else {
688                 error_setg_errno(errp, errno,
689                                  "Unable to write to socket");
690                 return -1;
691             }
692         }
693         done += ret;
694         if (ret < iov[i].iov_len) {
695             return done;
696         }
697     }
698 
699     return done;
700 }
701 #endif /* WIN32 */
702 
703 
704 #ifdef QEMU_MSG_ZEROCOPY
705 static int qio_channel_socket_flush(QIOChannel *ioc,
706                                     Error **errp)
707 {
708     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
709     struct msghdr msg = {};
710     struct sock_extended_err *serr;
711     struct cmsghdr *cm;
712     char control[CMSG_SPACE(sizeof(*serr))];
713     int received;
714     int ret = 1;
715 
716     msg.msg_control = control;
717     msg.msg_controllen = sizeof(control);
718     memset(control, 0, sizeof(control));
719 
720     while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
721         received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
722         if (received < 0) {
723             switch (errno) {
724             case EAGAIN:
725                 /* Nothing on errqueue, wait until something is available */
726                 qio_channel_wait(ioc, G_IO_ERR);
727                 continue;
728             case EINTR:
729                 continue;
730             default:
731                 error_setg_errno(errp, errno,
732                                  "Unable to read errqueue");
733                 return -1;
734             }
735         }
736 
737         cm = CMSG_FIRSTHDR(&msg);
738         if (cm->cmsg_level != SOL_IP &&
739             cm->cmsg_type != IP_RECVERR) {
740             error_setg_errno(errp, EPROTOTYPE,
741                              "Wrong cmsg in errqueue");
742             return -1;
743         }
744 
745         serr = (void *) CMSG_DATA(cm);
746         if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
747             error_setg_errno(errp, serr->ee_errno,
748                              "Error on socket");
749             return -1;
750         }
751         if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
752             error_setg_errno(errp, serr->ee_origin,
753                              "Error not from zero copy");
754             return -1;
755         }
756 
757         /* No errors, count successfully finished sendmsg()*/
758         sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1;
759 
760         /* If any sendmsg() succeeded using zero copy, return 0 at the end */
761         if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) {
762             ret = 0;
763         }
764     }
765 
766     return ret;
767 }
768 
769 #endif /* QEMU_MSG_ZEROCOPY */
770 
771 static int
772 qio_channel_socket_set_blocking(QIOChannel *ioc,
773                                 bool enabled,
774                                 Error **errp)
775 {
776     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
777 
778     if (enabled) {
779         qemu_socket_set_block(sioc->fd);
780     } else {
781         qemu_socket_set_nonblock(sioc->fd);
782     }
783     return 0;
784 }
785 
786 
787 static void
788 qio_channel_socket_set_delay(QIOChannel *ioc,
789                              bool enabled)
790 {
791     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
792     int v = enabled ? 0 : 1;
793 
794     setsockopt(sioc->fd,
795                IPPROTO_TCP, TCP_NODELAY,
796                &v, sizeof(v));
797 }
798 
799 
800 static void
801 qio_channel_socket_set_cork(QIOChannel *ioc,
802                             bool enabled)
803 {
804     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
805     int v = enabled ? 1 : 0;
806 
807     socket_set_cork(sioc->fd, v);
808 }
809 
810 
811 static int
812 qio_channel_socket_close(QIOChannel *ioc,
813                          Error **errp)
814 {
815     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
816     int rc = 0;
817     Error *err = NULL;
818 
819     if (sioc->fd != -1) {
820 #ifdef WIN32
821         WSAEventSelect(sioc->fd, NULL, 0);
822 #endif
823         if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) {
824             socket_listen_cleanup(sioc->fd, errp);
825         }
826 
827         if (closesocket(sioc->fd) < 0) {
828             sioc->fd = -1;
829             error_setg_errno(&err, errno, "Unable to close socket");
830             error_propagate(errp, err);
831             return -1;
832         }
833         sioc->fd = -1;
834     }
835     return rc;
836 }
837 
838 static int
839 qio_channel_socket_shutdown(QIOChannel *ioc,
840                             QIOChannelShutdown how,
841                             Error **errp)
842 {
843     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
844     int sockhow;
845 
846     switch (how) {
847     case QIO_CHANNEL_SHUTDOWN_READ:
848         sockhow = SHUT_RD;
849         break;
850     case QIO_CHANNEL_SHUTDOWN_WRITE:
851         sockhow = SHUT_WR;
852         break;
853     case QIO_CHANNEL_SHUTDOWN_BOTH:
854     default:
855         sockhow = SHUT_RDWR;
856         break;
857     }
858 
859     if (shutdown(sioc->fd, sockhow) < 0) {
860         error_setg_errno(errp, errno,
861                          "Unable to shutdown socket");
862         return -1;
863     }
864     return 0;
865 }
866 
867 static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
868                                                   AioContext *ctx,
869                                                   IOHandler *io_read,
870                                                   IOHandler *io_write,
871                                                   void *opaque)
872 {
873     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
874     aio_set_fd_handler(ctx, sioc->fd, false,
875                        io_read, io_write, NULL, NULL, opaque);
876 }
877 
878 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
879                                                 GIOCondition condition)
880 {
881     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
882     return qio_channel_create_socket_watch(ioc,
883                                            sioc->fd,
884                                            condition);
885 }
886 
887 static void qio_channel_socket_class_init(ObjectClass *klass,
888                                           void *class_data G_GNUC_UNUSED)
889 {
890     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
891 
892     ioc_klass->io_writev = qio_channel_socket_writev;
893     ioc_klass->io_readv = qio_channel_socket_readv;
894     ioc_klass->io_set_blocking = qio_channel_socket_set_blocking;
895     ioc_klass->io_close = qio_channel_socket_close;
896     ioc_klass->io_shutdown = qio_channel_socket_shutdown;
897     ioc_klass->io_set_cork = qio_channel_socket_set_cork;
898     ioc_klass->io_set_delay = qio_channel_socket_set_delay;
899     ioc_klass->io_create_watch = qio_channel_socket_create_watch;
900     ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
901 #ifdef QEMU_MSG_ZEROCOPY
902     ioc_klass->io_flush = qio_channel_socket_flush;
903 #endif
904 }
905 
906 static const TypeInfo qio_channel_socket_info = {
907     .parent = TYPE_QIO_CHANNEL,
908     .name = TYPE_QIO_CHANNEL_SOCKET,
909     .instance_size = sizeof(QIOChannelSocket),
910     .instance_init = qio_channel_socket_init,
911     .instance_finalize = qio_channel_socket_finalize,
912     .class_init = qio_channel_socket_class_init,
913 };
914 
915 static void qio_channel_socket_register_types(void)
916 {
917     type_register_static(&qio_channel_socket_info);
918 }
919 
920 type_init(qio_channel_socket_register_types);
921