1 /* 2 * QEMU I/O channels sockets driver 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2.1 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 */ 19 20 #include "qemu/osdep.h" 21 #include "qapi/error.h" 22 #include "qapi/qapi-visit-sockets.h" 23 #include "qemu/module.h" 24 #include "io/channel-socket.h" 25 #include "io/channel-util.h" 26 #include "io/channel-watch.h" 27 #include "trace.h" 28 #include "qapi/clone-visitor.h" 29 #ifdef CONFIG_LINUX 30 #include <linux/errqueue.h> 31 #include <sys/socket.h> 32 33 #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY)) 34 #define QEMU_MSG_ZEROCOPY 35 #endif 36 #endif 37 38 #define SOCKET_MAX_FDS 16 39 40 SocketAddress * 41 qio_channel_socket_get_local_address(QIOChannelSocket *ioc, 42 Error **errp) 43 { 44 return socket_sockaddr_to_address(&ioc->localAddr, 45 ioc->localAddrLen, 46 errp); 47 } 48 49 SocketAddress * 50 qio_channel_socket_get_remote_address(QIOChannelSocket *ioc, 51 Error **errp) 52 { 53 return socket_sockaddr_to_address(&ioc->remoteAddr, 54 ioc->remoteAddrLen, 55 errp); 56 } 57 58 QIOChannelSocket * 59 qio_channel_socket_new(void) 60 { 61 QIOChannelSocket *sioc; 62 QIOChannel *ioc; 63 64 sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET)); 65 sioc->fd = -1; 66 sioc->zero_copy_queued = 0; 67 sioc->zero_copy_sent = 0; 68 69 ioc = QIO_CHANNEL(sioc); 70 qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN); 71 72 #ifdef WIN32 73 ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL); 74 #endif 75 76 trace_qio_channel_socket_new(sioc); 77 78 return sioc; 79 } 80 81 82 static int 83 qio_channel_socket_set_fd(QIOChannelSocket *sioc, 84 int fd, 85 Error **errp) 86 { 87 if (sioc->fd != -1) { 88 error_setg(errp, "Socket is already open"); 89 return -1; 90 } 91 92 sioc->fd = fd; 93 sioc->remoteAddrLen = sizeof(sioc->remoteAddr); 94 sioc->localAddrLen = sizeof(sioc->localAddr); 95 96 97 if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr, 98 &sioc->remoteAddrLen) < 0) { 99 if (errno == ENOTCONN) { 100 memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr)); 101 sioc->remoteAddrLen = sizeof(sioc->remoteAddr); 102 } else { 103 error_setg_errno(errp, errno, 104 "Unable to query remote socket address"); 105 goto error; 106 } 107 } 108 109 if (getsockname(fd, (struct sockaddr *)&sioc->localAddr, 110 &sioc->localAddrLen) < 0) { 111 error_setg_errno(errp, errno, 112 "Unable to query local socket address"); 113 goto error; 114 } 115 116 #ifndef WIN32 117 if (sioc->localAddr.ss_family == AF_UNIX) { 118 QIOChannel *ioc = QIO_CHANNEL(sioc); 119 qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS); 120 } 121 #endif /* WIN32 */ 122 123 return 0; 124 125 error: 126 sioc->fd = -1; /* Let the caller close FD on failure */ 127 return -1; 128 } 129 130 QIOChannelSocket * 131 qio_channel_socket_new_fd(int fd, 132 Error **errp) 133 { 134 QIOChannelSocket *ioc; 135 136 ioc = qio_channel_socket_new(); 137 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 138 object_unref(OBJECT(ioc)); 139 return NULL; 140 } 141 142 trace_qio_channel_socket_new_fd(ioc, fd); 143 144 return ioc; 145 } 146 147 148 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc, 149 SocketAddress *addr, 150 Error **errp) 151 { 152 int fd; 153 154 trace_qio_channel_socket_connect_sync(ioc, addr); 155 fd = socket_connect(addr, errp); 156 if (fd < 0) { 157 trace_qio_channel_socket_connect_fail(ioc); 158 return -1; 159 } 160 161 trace_qio_channel_socket_connect_complete(ioc, fd); 162 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 163 close(fd); 164 return -1; 165 } 166 167 #ifdef QEMU_MSG_ZEROCOPY 168 int ret, v = 1; 169 ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v)); 170 if (ret == 0) { 171 /* Zero copy available on host */ 172 qio_channel_set_feature(QIO_CHANNEL(ioc), 173 QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY); 174 } 175 #endif 176 177 qio_channel_set_feature(QIO_CHANNEL(ioc), 178 QIO_CHANNEL_FEATURE_READ_MSG_PEEK); 179 180 return 0; 181 } 182 183 184 static void qio_channel_socket_connect_worker(QIOTask *task, 185 gpointer opaque) 186 { 187 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); 188 SocketAddress *addr = opaque; 189 Error *err = NULL; 190 191 qio_channel_socket_connect_sync(ioc, addr, &err); 192 193 qio_task_set_error(task, err); 194 } 195 196 197 void qio_channel_socket_connect_async(QIOChannelSocket *ioc, 198 SocketAddress *addr, 199 QIOTaskFunc callback, 200 gpointer opaque, 201 GDestroyNotify destroy, 202 GMainContext *context) 203 { 204 QIOTask *task = qio_task_new( 205 OBJECT(ioc), callback, opaque, destroy); 206 SocketAddress *addrCopy; 207 208 addrCopy = QAPI_CLONE(SocketAddress, addr); 209 210 /* socket_connect() does a non-blocking connect(), but it 211 * still blocks in DNS lookups, so we must use a thread */ 212 trace_qio_channel_socket_connect_async(ioc, addr); 213 qio_task_run_in_thread(task, 214 qio_channel_socket_connect_worker, 215 addrCopy, 216 (GDestroyNotify)qapi_free_SocketAddress, 217 context); 218 } 219 220 221 int qio_channel_socket_listen_sync(QIOChannelSocket *ioc, 222 SocketAddress *addr, 223 int num, 224 Error **errp) 225 { 226 int fd; 227 228 trace_qio_channel_socket_listen_sync(ioc, addr, num); 229 fd = socket_listen(addr, num, errp); 230 if (fd < 0) { 231 trace_qio_channel_socket_listen_fail(ioc); 232 return -1; 233 } 234 235 trace_qio_channel_socket_listen_complete(ioc, fd); 236 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 237 close(fd); 238 return -1; 239 } 240 qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN); 241 242 return 0; 243 } 244 245 246 struct QIOChannelListenWorkerData { 247 SocketAddress *addr; 248 int num; /* amount of expected connections */ 249 }; 250 251 static void qio_channel_listen_worker_free(gpointer opaque) 252 { 253 struct QIOChannelListenWorkerData *data = opaque; 254 255 qapi_free_SocketAddress(data->addr); 256 g_free(data); 257 } 258 259 static void qio_channel_socket_listen_worker(QIOTask *task, 260 gpointer opaque) 261 { 262 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); 263 struct QIOChannelListenWorkerData *data = opaque; 264 Error *err = NULL; 265 266 qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err); 267 268 qio_task_set_error(task, err); 269 } 270 271 272 void qio_channel_socket_listen_async(QIOChannelSocket *ioc, 273 SocketAddress *addr, 274 int num, 275 QIOTaskFunc callback, 276 gpointer opaque, 277 GDestroyNotify destroy, 278 GMainContext *context) 279 { 280 QIOTask *task = qio_task_new( 281 OBJECT(ioc), callback, opaque, destroy); 282 struct QIOChannelListenWorkerData *data; 283 284 data = g_new0(struct QIOChannelListenWorkerData, 1); 285 data->addr = QAPI_CLONE(SocketAddress, addr); 286 data->num = num; 287 288 /* socket_listen() blocks in DNS lookups, so we must use a thread */ 289 trace_qio_channel_socket_listen_async(ioc, addr, num); 290 qio_task_run_in_thread(task, 291 qio_channel_socket_listen_worker, 292 data, 293 qio_channel_listen_worker_free, 294 context); 295 } 296 297 298 int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc, 299 SocketAddress *localAddr, 300 SocketAddress *remoteAddr, 301 Error **errp) 302 { 303 int fd; 304 305 trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr); 306 fd = socket_dgram(remoteAddr, localAddr, errp); 307 if (fd < 0) { 308 trace_qio_channel_socket_dgram_fail(ioc); 309 return -1; 310 } 311 312 trace_qio_channel_socket_dgram_complete(ioc, fd); 313 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 314 close(fd); 315 return -1; 316 } 317 318 return 0; 319 } 320 321 322 struct QIOChannelSocketDGramWorkerData { 323 SocketAddress *localAddr; 324 SocketAddress *remoteAddr; 325 }; 326 327 328 static void qio_channel_socket_dgram_worker_free(gpointer opaque) 329 { 330 struct QIOChannelSocketDGramWorkerData *data = opaque; 331 qapi_free_SocketAddress(data->localAddr); 332 qapi_free_SocketAddress(data->remoteAddr); 333 g_free(data); 334 } 335 336 static void qio_channel_socket_dgram_worker(QIOTask *task, 337 gpointer opaque) 338 { 339 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); 340 struct QIOChannelSocketDGramWorkerData *data = opaque; 341 Error *err = NULL; 342 343 /* socket_dgram() blocks in DNS lookups, so we must use a thread */ 344 qio_channel_socket_dgram_sync(ioc, data->localAddr, 345 data->remoteAddr, &err); 346 347 qio_task_set_error(task, err); 348 } 349 350 351 void qio_channel_socket_dgram_async(QIOChannelSocket *ioc, 352 SocketAddress *localAddr, 353 SocketAddress *remoteAddr, 354 QIOTaskFunc callback, 355 gpointer opaque, 356 GDestroyNotify destroy, 357 GMainContext *context) 358 { 359 QIOTask *task = qio_task_new( 360 OBJECT(ioc), callback, opaque, destroy); 361 struct QIOChannelSocketDGramWorkerData *data = g_new0( 362 struct QIOChannelSocketDGramWorkerData, 1); 363 364 data->localAddr = QAPI_CLONE(SocketAddress, localAddr); 365 data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr); 366 367 trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr); 368 qio_task_run_in_thread(task, 369 qio_channel_socket_dgram_worker, 370 data, 371 qio_channel_socket_dgram_worker_free, 372 context); 373 } 374 375 376 QIOChannelSocket * 377 qio_channel_socket_accept(QIOChannelSocket *ioc, 378 Error **errp) 379 { 380 QIOChannelSocket *cioc; 381 382 cioc = qio_channel_socket_new(); 383 cioc->remoteAddrLen = sizeof(ioc->remoteAddr); 384 cioc->localAddrLen = sizeof(ioc->localAddr); 385 386 retry: 387 trace_qio_channel_socket_accept(ioc); 388 cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr, 389 &cioc->remoteAddrLen); 390 if (cioc->fd < 0) { 391 if (errno == EINTR) { 392 goto retry; 393 } 394 error_setg_errno(errp, errno, "Unable to accept connection"); 395 trace_qio_channel_socket_accept_fail(ioc); 396 goto error; 397 } 398 399 if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr, 400 &cioc->localAddrLen) < 0) { 401 error_setg_errno(errp, errno, 402 "Unable to query local socket address"); 403 goto error; 404 } 405 406 #ifndef WIN32 407 if (cioc->localAddr.ss_family == AF_UNIX) { 408 QIOChannel *ioc_local = QIO_CHANNEL(cioc); 409 qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS); 410 } 411 #endif /* WIN32 */ 412 413 qio_channel_set_feature(QIO_CHANNEL(cioc), 414 QIO_CHANNEL_FEATURE_READ_MSG_PEEK); 415 416 trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd); 417 return cioc; 418 419 error: 420 object_unref(OBJECT(cioc)); 421 return NULL; 422 } 423 424 static void qio_channel_socket_init(Object *obj) 425 { 426 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj); 427 ioc->fd = -1; 428 } 429 430 static void qio_channel_socket_finalize(Object *obj) 431 { 432 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj); 433 434 if (ioc->fd != -1) { 435 QIOChannel *ioc_local = QIO_CHANNEL(ioc); 436 if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) { 437 Error *err = NULL; 438 439 socket_listen_cleanup(ioc->fd, &err); 440 if (err) { 441 error_report_err(err); 442 err = NULL; 443 } 444 } 445 #ifdef WIN32 446 qemu_socket_unselect(ioc->fd, NULL); 447 #endif 448 close(ioc->fd); 449 ioc->fd = -1; 450 } 451 } 452 453 454 #ifndef WIN32 455 static void qio_channel_socket_copy_fds(struct msghdr *msg, 456 int **fds, size_t *nfds) 457 { 458 struct cmsghdr *cmsg; 459 460 *nfds = 0; 461 *fds = NULL; 462 463 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 464 int fd_size, i; 465 int gotfds; 466 467 if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) || 468 cmsg->cmsg_level != SOL_SOCKET || 469 cmsg->cmsg_type != SCM_RIGHTS) { 470 continue; 471 } 472 473 fd_size = cmsg->cmsg_len - CMSG_LEN(0); 474 475 if (!fd_size) { 476 continue; 477 } 478 479 gotfds = fd_size / sizeof(int); 480 *fds = g_renew(int, *fds, *nfds + gotfds); 481 memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size); 482 483 for (i = 0; i < gotfds; i++) { 484 int fd = (*fds)[*nfds + i]; 485 if (fd < 0) { 486 continue; 487 } 488 489 /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */ 490 qemu_socket_set_block(fd); 491 492 #ifndef MSG_CMSG_CLOEXEC 493 qemu_set_cloexec(fd); 494 #endif 495 } 496 *nfds += gotfds; 497 } 498 } 499 500 501 static ssize_t qio_channel_socket_readv(QIOChannel *ioc, 502 const struct iovec *iov, 503 size_t niov, 504 int **fds, 505 size_t *nfds, 506 int flags, 507 Error **errp) 508 { 509 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 510 ssize_t ret; 511 struct msghdr msg = { NULL, }; 512 char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)]; 513 int sflags = 0; 514 515 memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)); 516 517 msg.msg_iov = (struct iovec *)iov; 518 msg.msg_iovlen = niov; 519 if (fds && nfds) { 520 msg.msg_control = control; 521 msg.msg_controllen = sizeof(control); 522 #ifdef MSG_CMSG_CLOEXEC 523 sflags |= MSG_CMSG_CLOEXEC; 524 #endif 525 526 } 527 528 if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) { 529 sflags |= MSG_PEEK; 530 } 531 532 retry: 533 ret = recvmsg(sioc->fd, &msg, sflags); 534 if (ret < 0) { 535 if (errno == EAGAIN) { 536 return QIO_CHANNEL_ERR_BLOCK; 537 } 538 if (errno == EINTR) { 539 goto retry; 540 } 541 542 error_setg_errno(errp, errno, 543 "Unable to read from socket"); 544 return -1; 545 } 546 547 if (fds && nfds) { 548 qio_channel_socket_copy_fds(&msg, fds, nfds); 549 } 550 551 return ret; 552 } 553 554 static ssize_t qio_channel_socket_writev(QIOChannel *ioc, 555 const struct iovec *iov, 556 size_t niov, 557 int *fds, 558 size_t nfds, 559 int flags, 560 Error **errp) 561 { 562 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 563 ssize_t ret; 564 struct msghdr msg = { NULL, }; 565 char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)]; 566 size_t fdsize = sizeof(int) * nfds; 567 struct cmsghdr *cmsg; 568 int sflags = 0; 569 570 memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)); 571 572 msg.msg_iov = (struct iovec *)iov; 573 msg.msg_iovlen = niov; 574 575 if (nfds) { 576 if (nfds > SOCKET_MAX_FDS) { 577 error_setg_errno(errp, EINVAL, 578 "Only %d FDs can be sent, got %zu", 579 SOCKET_MAX_FDS, nfds); 580 return -1; 581 } 582 583 msg.msg_control = control; 584 msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds); 585 586 cmsg = CMSG_FIRSTHDR(&msg); 587 cmsg->cmsg_len = CMSG_LEN(fdsize); 588 cmsg->cmsg_level = SOL_SOCKET; 589 cmsg->cmsg_type = SCM_RIGHTS; 590 memcpy(CMSG_DATA(cmsg), fds, fdsize); 591 } 592 593 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) { 594 #ifdef QEMU_MSG_ZEROCOPY 595 sflags = MSG_ZEROCOPY; 596 #else 597 /* 598 * We expect QIOChannel class entry point to have 599 * blocked this code path already 600 */ 601 g_assert_not_reached(); 602 #endif 603 } 604 605 retry: 606 ret = sendmsg(sioc->fd, &msg, sflags); 607 if (ret <= 0) { 608 switch (errno) { 609 case EAGAIN: 610 return QIO_CHANNEL_ERR_BLOCK; 611 case EINTR: 612 goto retry; 613 case ENOBUFS: 614 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) { 615 error_setg_errno(errp, errno, 616 "Process can't lock enough memory for using MSG_ZEROCOPY"); 617 return -1; 618 } 619 break; 620 } 621 622 error_setg_errno(errp, errno, 623 "Unable to write to socket"); 624 return -1; 625 } 626 627 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) { 628 sioc->zero_copy_queued++; 629 } 630 631 return ret; 632 } 633 #else /* WIN32 */ 634 static ssize_t qio_channel_socket_readv(QIOChannel *ioc, 635 const struct iovec *iov, 636 size_t niov, 637 int **fds, 638 size_t *nfds, 639 int flags, 640 Error **errp) 641 { 642 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 643 ssize_t done = 0; 644 ssize_t i; 645 int sflags = 0; 646 647 if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) { 648 sflags |= MSG_PEEK; 649 } 650 651 for (i = 0; i < niov; i++) { 652 ssize_t ret; 653 retry: 654 ret = recv(sioc->fd, 655 iov[i].iov_base, 656 iov[i].iov_len, 657 sflags); 658 if (ret < 0) { 659 if (errno == EAGAIN) { 660 if (done) { 661 return done; 662 } else { 663 return QIO_CHANNEL_ERR_BLOCK; 664 } 665 } else if (errno == EINTR) { 666 goto retry; 667 } else { 668 error_setg_errno(errp, errno, 669 "Unable to read from socket"); 670 return -1; 671 } 672 } 673 done += ret; 674 if (ret < iov[i].iov_len) { 675 return done; 676 } 677 } 678 679 return done; 680 } 681 682 static ssize_t qio_channel_socket_writev(QIOChannel *ioc, 683 const struct iovec *iov, 684 size_t niov, 685 int *fds, 686 size_t nfds, 687 int flags, 688 Error **errp) 689 { 690 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 691 ssize_t done = 0; 692 ssize_t i; 693 694 for (i = 0; i < niov; i++) { 695 ssize_t ret; 696 retry: 697 ret = send(sioc->fd, 698 iov[i].iov_base, 699 iov[i].iov_len, 700 0); 701 if (ret < 0) { 702 if (errno == EAGAIN) { 703 if (done) { 704 return done; 705 } else { 706 return QIO_CHANNEL_ERR_BLOCK; 707 } 708 } else if (errno == EINTR) { 709 goto retry; 710 } else { 711 error_setg_errno(errp, errno, 712 "Unable to write to socket"); 713 return -1; 714 } 715 } 716 done += ret; 717 if (ret < iov[i].iov_len) { 718 return done; 719 } 720 } 721 722 return done; 723 } 724 #endif /* WIN32 */ 725 726 727 #ifdef QEMU_MSG_ZEROCOPY 728 static int qio_channel_socket_flush(QIOChannel *ioc, 729 Error **errp) 730 { 731 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 732 struct msghdr msg = {}; 733 struct sock_extended_err *serr; 734 struct cmsghdr *cm; 735 char control[CMSG_SPACE(sizeof(*serr))]; 736 int received; 737 int ret; 738 739 if (sioc->zero_copy_queued == sioc->zero_copy_sent) { 740 return 0; 741 } 742 743 msg.msg_control = control; 744 msg.msg_controllen = sizeof(control); 745 memset(control, 0, sizeof(control)); 746 747 ret = 1; 748 749 while (sioc->zero_copy_sent < sioc->zero_copy_queued) { 750 received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE); 751 if (received < 0) { 752 switch (errno) { 753 case EAGAIN: 754 /* Nothing on errqueue, wait until something is available */ 755 qio_channel_wait(ioc, G_IO_ERR); 756 continue; 757 case EINTR: 758 continue; 759 default: 760 error_setg_errno(errp, errno, 761 "Unable to read errqueue"); 762 return -1; 763 } 764 } 765 766 cm = CMSG_FIRSTHDR(&msg); 767 if (cm->cmsg_level != SOL_IP && cm->cmsg_type != IP_RECVERR && 768 cm->cmsg_level != SOL_IPV6 && cm->cmsg_type != IPV6_RECVERR) { 769 error_setg_errno(errp, EPROTOTYPE, 770 "Wrong cmsg in errqueue"); 771 return -1; 772 } 773 774 serr = (void *) CMSG_DATA(cm); 775 if (serr->ee_errno != SO_EE_ORIGIN_NONE) { 776 error_setg_errno(errp, serr->ee_errno, 777 "Error on socket"); 778 return -1; 779 } 780 if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 781 error_setg_errno(errp, serr->ee_origin, 782 "Error not from zero copy"); 783 return -1; 784 } 785 if (serr->ee_data < serr->ee_info) { 786 error_setg_errno(errp, serr->ee_origin, 787 "Wrong notification bounds"); 788 return -1; 789 } 790 791 /* No errors, count successfully finished sendmsg()*/ 792 sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1; 793 794 /* If any sendmsg() succeeded using zero copy, return 0 at the end */ 795 if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) { 796 ret = 0; 797 } 798 } 799 800 return ret; 801 } 802 803 #endif /* QEMU_MSG_ZEROCOPY */ 804 805 static int 806 qio_channel_socket_set_blocking(QIOChannel *ioc, 807 bool enabled, 808 Error **errp) 809 { 810 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 811 812 if (enabled) { 813 qemu_socket_set_block(sioc->fd); 814 } else { 815 qemu_socket_set_nonblock(sioc->fd); 816 } 817 return 0; 818 } 819 820 821 static void 822 qio_channel_socket_set_delay(QIOChannel *ioc, 823 bool enabled) 824 { 825 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 826 int v = enabled ? 0 : 1; 827 828 setsockopt(sioc->fd, 829 IPPROTO_TCP, TCP_NODELAY, 830 &v, sizeof(v)); 831 } 832 833 834 static void 835 qio_channel_socket_set_cork(QIOChannel *ioc, 836 bool enabled) 837 { 838 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 839 int v = enabled ? 1 : 0; 840 841 socket_set_cork(sioc->fd, v); 842 } 843 844 845 static int 846 qio_channel_socket_close(QIOChannel *ioc, 847 Error **errp) 848 { 849 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 850 int rc = 0; 851 Error *err = NULL; 852 853 if (sioc->fd != -1) { 854 #ifdef WIN32 855 qemu_socket_unselect(sioc->fd, NULL); 856 #endif 857 if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) { 858 socket_listen_cleanup(sioc->fd, errp); 859 } 860 861 if (close(sioc->fd) < 0) { 862 sioc->fd = -1; 863 error_setg_errno(&err, errno, "Unable to close socket"); 864 error_propagate(errp, err); 865 return -1; 866 } 867 sioc->fd = -1; 868 } 869 return rc; 870 } 871 872 static int 873 qio_channel_socket_shutdown(QIOChannel *ioc, 874 QIOChannelShutdown how, 875 Error **errp) 876 { 877 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 878 int sockhow; 879 880 switch (how) { 881 case QIO_CHANNEL_SHUTDOWN_READ: 882 sockhow = SHUT_RD; 883 break; 884 case QIO_CHANNEL_SHUTDOWN_WRITE: 885 sockhow = SHUT_WR; 886 break; 887 case QIO_CHANNEL_SHUTDOWN_BOTH: 888 default: 889 sockhow = SHUT_RDWR; 890 break; 891 } 892 893 if (shutdown(sioc->fd, sockhow) < 0) { 894 error_setg_errno(errp, errno, 895 "Unable to shutdown socket"); 896 return -1; 897 } 898 return 0; 899 } 900 901 static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc, 902 AioContext *read_ctx, 903 IOHandler *io_read, 904 AioContext *write_ctx, 905 IOHandler *io_write, 906 void *opaque) 907 { 908 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 909 910 qio_channel_util_set_aio_fd_handler(sioc->fd, read_ctx, io_read, 911 sioc->fd, write_ctx, io_write, 912 opaque); 913 } 914 915 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc, 916 GIOCondition condition) 917 { 918 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 919 return qio_channel_create_socket_watch(ioc, 920 sioc->fd, 921 condition); 922 } 923 924 static void qio_channel_socket_class_init(ObjectClass *klass, 925 void *class_data G_GNUC_UNUSED) 926 { 927 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 928 929 ioc_klass->io_writev = qio_channel_socket_writev; 930 ioc_klass->io_readv = qio_channel_socket_readv; 931 ioc_klass->io_set_blocking = qio_channel_socket_set_blocking; 932 ioc_klass->io_close = qio_channel_socket_close; 933 ioc_klass->io_shutdown = qio_channel_socket_shutdown; 934 ioc_klass->io_set_cork = qio_channel_socket_set_cork; 935 ioc_klass->io_set_delay = qio_channel_socket_set_delay; 936 ioc_klass->io_create_watch = qio_channel_socket_create_watch; 937 ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler; 938 #ifdef QEMU_MSG_ZEROCOPY 939 ioc_klass->io_flush = qio_channel_socket_flush; 940 #endif 941 } 942 943 static const TypeInfo qio_channel_socket_info = { 944 .parent = TYPE_QIO_CHANNEL, 945 .name = TYPE_QIO_CHANNEL_SOCKET, 946 .instance_size = sizeof(QIOChannelSocket), 947 .instance_init = qio_channel_socket_init, 948 .instance_finalize = qio_channel_socket_finalize, 949 .class_init = qio_channel_socket_class_init, 950 }; 951 952 static void qio_channel_socket_register_types(void) 953 { 954 type_register_static(&qio_channel_socket_info); 955 } 956 957 type_init(qio_channel_socket_register_types); 958