1 /* 2 * QEMU I/O channels sockets driver 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2.1 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 */ 19 20 #include "qemu/osdep.h" 21 #include "qapi/error.h" 22 #include "qapi/qapi-visit-sockets.h" 23 #include "qemu/module.h" 24 #include "io/channel-socket.h" 25 #include "io/channel-util.h" 26 #include "io/channel-watch.h" 27 #include "trace.h" 28 #include "qapi/clone-visitor.h" 29 #ifdef CONFIG_LINUX 30 #include <linux/errqueue.h> 31 #include <sys/socket.h> 32 33 #if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY)) 34 #define QEMU_MSG_ZEROCOPY 35 #endif 36 #endif 37 38 #define SOCKET_MAX_FDS 16 39 40 #ifdef QEMU_MSG_ZEROCOPY 41 static int qio_channel_socket_flush_internal(QIOChannel *ioc, 42 bool block, 43 Error **errp); 44 #endif 45 46 SocketAddress * 47 qio_channel_socket_get_local_address(QIOChannelSocket *ioc, 48 Error **errp) 49 { 50 return socket_sockaddr_to_address(&ioc->localAddr, 51 ioc->localAddrLen, 52 errp); 53 } 54 55 SocketAddress * 56 qio_channel_socket_get_remote_address(QIOChannelSocket *ioc, 57 Error **errp) 58 { 59 return socket_sockaddr_to_address(&ioc->remoteAddr, 60 ioc->remoteAddrLen, 61 errp); 62 } 63 64 QIOChannelSocket * 65 qio_channel_socket_new(void) 66 { 67 QIOChannelSocket *sioc; 68 QIOChannel *ioc; 69 70 sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET)); 71 sioc->fd = -1; 72 sioc->zero_copy_queued = 0; 73 sioc->zero_copy_sent = 0; 74 sioc->blocking = false; 75 sioc->new_zero_copy_sent_success = false; 76 77 ioc = QIO_CHANNEL(sioc); 78 qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN); 79 80 #ifdef WIN32 81 ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL); 82 #endif 83 84 trace_qio_channel_socket_new(sioc); 85 86 return sioc; 87 } 88 89 int qio_channel_socket_set_send_buffer(QIOChannelSocket *ioc, 90 size_t size, 91 Error **errp) 92 { 93 if (setsockopt(ioc->fd, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) < 0) { 94 error_setg_errno(errp, errno, "Unable to set socket send buffer size"); 95 return -1; 96 } 97 98 return 0; 99 } 100 101 static int 102 qio_channel_socket_set_fd(QIOChannelSocket *sioc, 103 int fd, 104 Error **errp) 105 { 106 if (sioc->fd != -1) { 107 error_setg(errp, "Socket is already open"); 108 return -1; 109 } 110 111 sioc->fd = fd; 112 sioc->remoteAddrLen = sizeof(sioc->remoteAddr); 113 sioc->localAddrLen = sizeof(sioc->localAddr); 114 115 116 if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr, 117 &sioc->remoteAddrLen) < 0) { 118 if (errno == ENOTCONN) { 119 memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr)); 120 sioc->remoteAddrLen = sizeof(sioc->remoteAddr); 121 } else { 122 error_setg_errno(errp, errno, 123 "Unable to query remote socket address"); 124 goto error; 125 } 126 } 127 128 if (getsockname(fd, (struct sockaddr *)&sioc->localAddr, 129 &sioc->localAddrLen) < 0) { 130 error_setg_errno(errp, errno, 131 "Unable to query local socket address"); 132 goto error; 133 } 134 135 #ifndef WIN32 136 if (sioc->localAddr.ss_family == AF_UNIX) { 137 QIOChannel *ioc = QIO_CHANNEL(sioc); 138 qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS); 139 } 140 #endif /* WIN32 */ 141 142 return 0; 143 144 error: 145 sioc->fd = -1; /* Let the caller close FD on failure */ 146 return -1; 147 } 148 149 QIOChannelSocket * 150 qio_channel_socket_new_fd(int fd, 151 Error **errp) 152 { 153 QIOChannelSocket *ioc; 154 155 ioc = qio_channel_socket_new(); 156 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 157 object_unref(OBJECT(ioc)); 158 return NULL; 159 } 160 161 trace_qio_channel_socket_new_fd(ioc, fd); 162 163 return ioc; 164 } 165 166 167 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc, 168 SocketAddress *addr, 169 Error **errp) 170 { 171 int fd; 172 173 trace_qio_channel_socket_connect_sync(ioc, addr); 174 fd = socket_connect(addr, errp); 175 if (fd < 0) { 176 trace_qio_channel_socket_connect_fail(ioc); 177 return -1; 178 } 179 180 trace_qio_channel_socket_connect_complete(ioc, fd); 181 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 182 close(fd); 183 return -1; 184 } 185 186 #ifdef QEMU_MSG_ZEROCOPY 187 int ret, v = 1; 188 ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v)); 189 if (ret == 0) { 190 /* Zero copy available on host */ 191 qio_channel_set_feature(QIO_CHANNEL(ioc), 192 QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY); 193 } 194 #endif 195 196 qio_channel_set_feature(QIO_CHANNEL(ioc), 197 QIO_CHANNEL_FEATURE_READ_MSG_PEEK); 198 199 return 0; 200 } 201 202 203 static void qio_channel_socket_connect_worker(QIOTask *task, 204 gpointer opaque) 205 { 206 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); 207 SocketAddress *addr = opaque; 208 Error *err = NULL; 209 210 qio_channel_socket_connect_sync(ioc, addr, &err); 211 212 qio_task_set_error(task, err); 213 } 214 215 216 void qio_channel_socket_connect_async(QIOChannelSocket *ioc, 217 SocketAddress *addr, 218 QIOTaskFunc callback, 219 gpointer opaque, 220 GDestroyNotify destroy, 221 GMainContext *context) 222 { 223 QIOTask *task = qio_task_new( 224 OBJECT(ioc), callback, opaque, destroy); 225 SocketAddress *addrCopy; 226 227 addrCopy = QAPI_CLONE(SocketAddress, addr); 228 229 /* socket_connect() does a non-blocking connect(), but it 230 * still blocks in DNS lookups, so we must use a thread */ 231 trace_qio_channel_socket_connect_async(ioc, addr); 232 qio_task_run_in_thread(task, 233 qio_channel_socket_connect_worker, 234 addrCopy, 235 (GDestroyNotify)qapi_free_SocketAddress, 236 context); 237 } 238 239 240 int qio_channel_socket_listen_sync(QIOChannelSocket *ioc, 241 SocketAddress *addr, 242 int num, 243 Error **errp) 244 { 245 int fd; 246 247 trace_qio_channel_socket_listen_sync(ioc, addr, num); 248 fd = socket_listen(addr, num, errp); 249 if (fd < 0) { 250 trace_qio_channel_socket_listen_fail(ioc); 251 return -1; 252 } 253 254 trace_qio_channel_socket_listen_complete(ioc, fd); 255 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 256 close(fd); 257 return -1; 258 } 259 qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN); 260 261 return 0; 262 } 263 264 265 struct QIOChannelListenWorkerData { 266 SocketAddress *addr; 267 int num; /* amount of expected connections */ 268 }; 269 270 static void qio_channel_listen_worker_free(gpointer opaque) 271 { 272 struct QIOChannelListenWorkerData *data = opaque; 273 274 qapi_free_SocketAddress(data->addr); 275 g_free(data); 276 } 277 278 static void qio_channel_socket_listen_worker(QIOTask *task, 279 gpointer opaque) 280 { 281 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); 282 struct QIOChannelListenWorkerData *data = opaque; 283 Error *err = NULL; 284 285 qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err); 286 287 qio_task_set_error(task, err); 288 } 289 290 291 void qio_channel_socket_listen_async(QIOChannelSocket *ioc, 292 SocketAddress *addr, 293 int num, 294 QIOTaskFunc callback, 295 gpointer opaque, 296 GDestroyNotify destroy, 297 GMainContext *context) 298 { 299 QIOTask *task = qio_task_new( 300 OBJECT(ioc), callback, opaque, destroy); 301 struct QIOChannelListenWorkerData *data; 302 303 data = g_new0(struct QIOChannelListenWorkerData, 1); 304 data->addr = QAPI_CLONE(SocketAddress, addr); 305 data->num = num; 306 307 /* socket_listen() blocks in DNS lookups, so we must use a thread */ 308 trace_qio_channel_socket_listen_async(ioc, addr, num); 309 qio_task_run_in_thread(task, 310 qio_channel_socket_listen_worker, 311 data, 312 qio_channel_listen_worker_free, 313 context); 314 } 315 316 317 int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc, 318 SocketAddress *localAddr, 319 SocketAddress *remoteAddr, 320 Error **errp) 321 { 322 int fd; 323 324 trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr); 325 fd = socket_dgram(remoteAddr, localAddr, errp); 326 if (fd < 0) { 327 trace_qio_channel_socket_dgram_fail(ioc); 328 return -1; 329 } 330 331 trace_qio_channel_socket_dgram_complete(ioc, fd); 332 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 333 close(fd); 334 return -1; 335 } 336 337 return 0; 338 } 339 340 341 struct QIOChannelSocketDGramWorkerData { 342 SocketAddress *localAddr; 343 SocketAddress *remoteAddr; 344 }; 345 346 347 static void qio_channel_socket_dgram_worker_free(gpointer opaque) 348 { 349 struct QIOChannelSocketDGramWorkerData *data = opaque; 350 qapi_free_SocketAddress(data->localAddr); 351 qapi_free_SocketAddress(data->remoteAddr); 352 g_free(data); 353 } 354 355 static void qio_channel_socket_dgram_worker(QIOTask *task, 356 gpointer opaque) 357 { 358 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); 359 struct QIOChannelSocketDGramWorkerData *data = opaque; 360 Error *err = NULL; 361 362 /* socket_dgram() blocks in DNS lookups, so we must use a thread */ 363 qio_channel_socket_dgram_sync(ioc, data->localAddr, 364 data->remoteAddr, &err); 365 366 qio_task_set_error(task, err); 367 } 368 369 370 void qio_channel_socket_dgram_async(QIOChannelSocket *ioc, 371 SocketAddress *localAddr, 372 SocketAddress *remoteAddr, 373 QIOTaskFunc callback, 374 gpointer opaque, 375 GDestroyNotify destroy, 376 GMainContext *context) 377 { 378 QIOTask *task = qio_task_new( 379 OBJECT(ioc), callback, opaque, destroy); 380 struct QIOChannelSocketDGramWorkerData *data = g_new0( 381 struct QIOChannelSocketDGramWorkerData, 1); 382 383 data->localAddr = QAPI_CLONE(SocketAddress, localAddr); 384 data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr); 385 386 trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr); 387 qio_task_run_in_thread(task, 388 qio_channel_socket_dgram_worker, 389 data, 390 qio_channel_socket_dgram_worker_free, 391 context); 392 } 393 394 395 QIOChannelSocket * 396 qio_channel_socket_accept(QIOChannelSocket *ioc, 397 Error **errp) 398 { 399 QIOChannelSocket *cioc; 400 401 cioc = qio_channel_socket_new(); 402 cioc->remoteAddrLen = sizeof(ioc->remoteAddr); 403 cioc->localAddrLen = sizeof(ioc->localAddr); 404 405 retry: 406 trace_qio_channel_socket_accept(ioc); 407 cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr, 408 &cioc->remoteAddrLen); 409 if (cioc->fd < 0) { 410 if (errno == EINTR) { 411 goto retry; 412 } 413 error_setg_errno(errp, errno, "Unable to accept connection"); 414 trace_qio_channel_socket_accept_fail(ioc); 415 goto error; 416 } 417 418 if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr, 419 &cioc->localAddrLen) < 0) { 420 error_setg_errno(errp, errno, 421 "Unable to query local socket address"); 422 goto error; 423 } 424 425 #ifndef WIN32 426 if (cioc->localAddr.ss_family == AF_UNIX) { 427 QIOChannel *ioc_local = QIO_CHANNEL(cioc); 428 qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS); 429 } 430 #endif /* WIN32 */ 431 432 qio_channel_set_feature(QIO_CHANNEL(cioc), 433 QIO_CHANNEL_FEATURE_READ_MSG_PEEK); 434 435 trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd); 436 return cioc; 437 438 error: 439 object_unref(OBJECT(cioc)); 440 return NULL; 441 } 442 443 static void qio_channel_socket_init(Object *obj) 444 { 445 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj); 446 ioc->fd = -1; 447 } 448 449 static void qio_channel_socket_finalize(Object *obj) 450 { 451 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj); 452 453 if (ioc->fd != -1) { 454 QIOChannel *ioc_local = QIO_CHANNEL(ioc); 455 if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) { 456 Error *err = NULL; 457 458 socket_listen_cleanup(ioc->fd, &err); 459 if (err) { 460 error_report_err(err); 461 err = NULL; 462 } 463 } 464 #ifdef WIN32 465 qemu_socket_unselect_nofail(ioc->fd); 466 #endif 467 close(ioc->fd); 468 ioc->fd = -1; 469 } 470 } 471 472 473 #ifndef WIN32 474 static void qio_channel_socket_copy_fds(struct msghdr *msg, 475 int **fds, size_t *nfds) 476 { 477 struct cmsghdr *cmsg; 478 479 *nfds = 0; 480 *fds = NULL; 481 482 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 483 int fd_size; 484 int gotfds; 485 486 if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) || 487 cmsg->cmsg_level != SOL_SOCKET || 488 cmsg->cmsg_type != SCM_RIGHTS) { 489 continue; 490 } 491 492 fd_size = cmsg->cmsg_len - CMSG_LEN(0); 493 494 if (!fd_size) { 495 continue; 496 } 497 498 gotfds = fd_size / sizeof(int); 499 *fds = g_renew(int, *fds, *nfds + gotfds); 500 memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size); 501 *nfds += gotfds; 502 } 503 } 504 505 static bool qio_channel_handle_fds(int *fds, size_t nfds, 506 bool preserve_blocking, Error **errp) 507 { 508 int *end = fds + nfds, *fd; 509 510 #ifdef MSG_CMSG_CLOEXEC 511 if (preserve_blocking) { 512 /* Nothing to do */ 513 return true; 514 } 515 #endif 516 517 for (fd = fds; fd != end; fd++) { 518 if (*fd < 0) { 519 continue; 520 } 521 522 if (!preserve_blocking) { 523 /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */ 524 if (!qemu_set_blocking(*fd, true, errp)) { 525 return false; 526 } 527 } 528 529 #ifndef MSG_CMSG_CLOEXEC 530 qemu_set_cloexec(*fd); 531 #endif 532 } 533 534 return true; 535 } 536 537 static void qio_channel_cleanup_fds(int **fds, size_t *nfds) 538 { 539 for (size_t i = 0; i < *nfds; i++) { 540 if ((*fds)[i] < 0) { 541 continue; 542 } 543 close((*fds)[i]); 544 } 545 546 g_clear_pointer(fds, g_free); 547 *nfds = 0; 548 } 549 550 551 static ssize_t qio_channel_socket_readv(QIOChannel *ioc, 552 const struct iovec *iov, 553 size_t niov, 554 int **fds, 555 size_t *nfds, 556 int flags, 557 Error **errp) 558 { 559 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 560 ssize_t ret; 561 struct msghdr msg = { NULL, }; 562 char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)]; 563 int sflags = 0; 564 565 memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)); 566 567 msg.msg_iov = (struct iovec *)iov; 568 msg.msg_iovlen = niov; 569 if (fds && nfds) { 570 msg.msg_control = control; 571 msg.msg_controllen = sizeof(control); 572 #ifdef MSG_CMSG_CLOEXEC 573 sflags |= MSG_CMSG_CLOEXEC; 574 #endif 575 576 } 577 578 if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) { 579 sflags |= MSG_PEEK; 580 } 581 582 retry: 583 ret = recvmsg(sioc->fd, &msg, sflags); 584 if (ret < 0) { 585 if (errno == EAGAIN) { 586 return QIO_CHANNEL_ERR_BLOCK; 587 } 588 if (errno == EINTR) { 589 goto retry; 590 } 591 592 error_setg_errno(errp, errno, 593 "Unable to read from socket"); 594 return -1; 595 } 596 597 if (fds && nfds) { 598 bool preserve_blocking = 599 flags & QIO_CHANNEL_READ_FLAG_FD_PRESERVE_BLOCKING; 600 601 qio_channel_socket_copy_fds(&msg, fds, nfds); 602 603 if (!qio_channel_handle_fds(*fds, *nfds, 604 preserve_blocking, errp)) { 605 qio_channel_cleanup_fds(fds, nfds); 606 return -1; 607 } 608 } 609 610 return ret; 611 } 612 613 static ssize_t qio_channel_socket_writev(QIOChannel *ioc, 614 const struct iovec *iov, 615 size_t niov, 616 int *fds, 617 size_t nfds, 618 int flags, 619 Error **errp) 620 { 621 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 622 ssize_t ret; 623 struct msghdr msg = { NULL, }; 624 char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)]; 625 size_t fdsize = sizeof(int) * nfds; 626 struct cmsghdr *cmsg; 627 int sflags = 0; 628 #ifdef QEMU_MSG_ZEROCOPY 629 bool blocking = sioc->blocking; 630 bool zerocopy_flushed_once = false; 631 #endif 632 633 memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)); 634 635 msg.msg_iov = (struct iovec *)iov; 636 msg.msg_iovlen = niov; 637 638 if (nfds) { 639 if (nfds > SOCKET_MAX_FDS) { 640 error_setg_errno(errp, EINVAL, 641 "Only %d FDs can be sent, got %zu", 642 SOCKET_MAX_FDS, nfds); 643 return -1; 644 } 645 646 msg.msg_control = control; 647 msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds); 648 649 cmsg = CMSG_FIRSTHDR(&msg); 650 cmsg->cmsg_len = CMSG_LEN(fdsize); 651 cmsg->cmsg_level = SOL_SOCKET; 652 cmsg->cmsg_type = SCM_RIGHTS; 653 memcpy(CMSG_DATA(cmsg), fds, fdsize); 654 } 655 656 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) { 657 #ifdef QEMU_MSG_ZEROCOPY 658 sflags = MSG_ZEROCOPY; 659 #else 660 /* 661 * We expect QIOChannel class entry point to have 662 * blocked this code path already 663 */ 664 g_assert_not_reached(); 665 #endif 666 } 667 668 retry: 669 ret = sendmsg(sioc->fd, &msg, sflags); 670 if (ret <= 0) { 671 switch (errno) { 672 case EAGAIN: 673 return QIO_CHANNEL_ERR_BLOCK; 674 case EINTR: 675 goto retry; 676 #ifdef QEMU_MSG_ZEROCOPY 677 case ENOBUFS: 678 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) { 679 /** 680 * Socket error queueing may exhaust the OPTMEM limit. Try 681 * flushing the error queue once. 682 */ 683 if (!zerocopy_flushed_once) { 684 ret = qio_channel_socket_flush_internal(ioc, blocking, 685 errp); 686 if (ret < 0) { 687 return -1; 688 } 689 zerocopy_flushed_once = true; 690 goto retry; 691 } else { 692 error_setg_errno(errp, errno, 693 "Process can't lock enough memory for " 694 "using MSG_ZEROCOPY"); 695 return -1; 696 } 697 } 698 break; 699 #endif 700 } 701 702 error_setg_errno(errp, errno, 703 "Unable to write to socket"); 704 return -1; 705 } 706 707 if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) { 708 sioc->zero_copy_queued++; 709 } 710 711 return ret; 712 } 713 #else /* WIN32 */ 714 static ssize_t qio_channel_socket_readv(QIOChannel *ioc, 715 const struct iovec *iov, 716 size_t niov, 717 int **fds, 718 size_t *nfds, 719 int flags, 720 Error **errp) 721 { 722 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 723 ssize_t done = 0; 724 ssize_t i; 725 int sflags = 0; 726 727 if (flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) { 728 sflags |= MSG_PEEK; 729 } 730 731 for (i = 0; i < niov; i++) { 732 ssize_t ret; 733 retry: 734 ret = recv(sioc->fd, 735 iov[i].iov_base, 736 iov[i].iov_len, 737 sflags); 738 if (ret < 0) { 739 if (errno == EAGAIN) { 740 if (done) { 741 return done; 742 } else { 743 return QIO_CHANNEL_ERR_BLOCK; 744 } 745 } else if (errno == EINTR) { 746 goto retry; 747 } else { 748 error_setg_errno(errp, errno, 749 "Unable to read from socket"); 750 return -1; 751 } 752 } 753 done += ret; 754 if (ret < iov[i].iov_len) { 755 return done; 756 } 757 } 758 759 return done; 760 } 761 762 static ssize_t qio_channel_socket_writev(QIOChannel *ioc, 763 const struct iovec *iov, 764 size_t niov, 765 int *fds, 766 size_t nfds, 767 int flags, 768 Error **errp) 769 { 770 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 771 ssize_t done = 0; 772 ssize_t i; 773 774 for (i = 0; i < niov; i++) { 775 ssize_t ret; 776 retry: 777 ret = send(sioc->fd, 778 iov[i].iov_base, 779 iov[i].iov_len, 780 0); 781 if (ret < 0) { 782 if (errno == EAGAIN) { 783 if (done) { 784 return done; 785 } else { 786 return QIO_CHANNEL_ERR_BLOCK; 787 } 788 } else if (errno == EINTR) { 789 goto retry; 790 } else { 791 error_setg_errno(errp, errno, 792 "Unable to write to socket"); 793 return -1; 794 } 795 } 796 done += ret; 797 if (ret < iov[i].iov_len) { 798 return done; 799 } 800 } 801 802 return done; 803 } 804 #endif /* WIN32 */ 805 806 807 #ifdef QEMU_MSG_ZEROCOPY 808 static int qio_channel_socket_flush_internal(QIOChannel *ioc, 809 bool block, 810 Error **errp) 811 { 812 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 813 struct msghdr msg = {}; 814 struct sock_extended_err *serr; 815 struct cmsghdr *cm; 816 char control[CMSG_SPACE(sizeof(*serr))]; 817 int received; 818 819 if (sioc->zero_copy_queued == sioc->zero_copy_sent) { 820 return 0; 821 } 822 823 msg.msg_control = control; 824 msg.msg_controllen = sizeof(control); 825 memset(control, 0, sizeof(control)); 826 827 while (sioc->zero_copy_sent < sioc->zero_copy_queued) { 828 received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE); 829 if (received < 0) { 830 switch (errno) { 831 case EAGAIN: 832 if (block) { 833 /* 834 * Nothing on errqueue, wait until something is 835 * available. 836 * 837 * Use G_IO_ERR instead of G_IO_IN since MSG_ERRQUEUE reads 838 * are signaled via POLLERR, not POLLIN, as the kernel 839 * sets POLLERR when zero-copy notificatons appear on the 840 * socket error queue. 841 */ 842 qio_channel_wait(ioc, G_IO_ERR); 843 continue; 844 } 845 return 0; 846 case EINTR: 847 continue; 848 default: 849 error_setg_errno(errp, errno, 850 "Unable to read errqueue"); 851 return -1; 852 } 853 } 854 855 cm = CMSG_FIRSTHDR(&msg); 856 if (cm->cmsg_level != SOL_IP && cm->cmsg_type != IP_RECVERR && 857 cm->cmsg_level != SOL_IPV6 && cm->cmsg_type != IPV6_RECVERR) { 858 error_setg_errno(errp, EPROTOTYPE, 859 "Wrong cmsg in errqueue"); 860 return -1; 861 } 862 863 serr = (void *) CMSG_DATA(cm); 864 if (serr->ee_errno != SO_EE_ORIGIN_NONE) { 865 error_setg_errno(errp, serr->ee_errno, 866 "Error on socket"); 867 return -1; 868 } 869 if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) { 870 error_setg_errno(errp, serr->ee_origin, 871 "Error not from zero copy"); 872 return -1; 873 } 874 if (serr->ee_data < serr->ee_info) { 875 error_setg_errno(errp, serr->ee_origin, 876 "Wrong notification bounds"); 877 return -1; 878 } 879 880 /* No errors, count successfully finished sendmsg()*/ 881 sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1; 882 883 /* If any sendmsg() succeeded using zero copy, mark zerocopy success */ 884 if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) { 885 sioc->new_zero_copy_sent_success = true; 886 } 887 } 888 889 return 0; 890 } 891 892 static int qio_channel_socket_flush(QIOChannel *ioc, 893 Error **errp) 894 { 895 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 896 int ret; 897 898 ret = qio_channel_socket_flush_internal(ioc, true, errp); 899 if (ret < 0) { 900 return ret; 901 } 902 903 if (sioc->new_zero_copy_sent_success) { 904 sioc->new_zero_copy_sent_success = false; 905 return 0; 906 } 907 908 return 1; 909 } 910 911 #endif /* QEMU_MSG_ZEROCOPY */ 912 913 static int 914 qio_channel_socket_set_blocking(QIOChannel *ioc, 915 bool enabled, 916 Error **errp) 917 { 918 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 919 sioc->blocking = enabled; 920 921 if (!qemu_set_blocking(sioc->fd, enabled, errp)) { 922 return -1; 923 } 924 925 return 0; 926 } 927 928 929 static void 930 qio_channel_socket_set_delay(QIOChannel *ioc, 931 bool enabled) 932 { 933 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 934 int v = enabled ? 0 : 1; 935 936 setsockopt(sioc->fd, 937 IPPROTO_TCP, TCP_NODELAY, 938 &v, sizeof(v)); 939 } 940 941 942 static void 943 qio_channel_socket_set_cork(QIOChannel *ioc, 944 bool enabled) 945 { 946 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 947 int v = enabled ? 1 : 0; 948 949 socket_set_cork(sioc->fd, v); 950 } 951 952 static int 953 qio_channel_socket_get_peerpid(QIOChannel *ioc, 954 unsigned int *pid, 955 Error **errp) 956 { 957 #ifdef CONFIG_LINUX 958 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 959 Error *err = NULL; 960 socklen_t len = sizeof(struct ucred); 961 962 struct ucred cred; 963 if (getsockopt(sioc->fd, 964 SOL_SOCKET, SO_PEERCRED, 965 &cred, &len) == -1) { 966 error_setg_errno(&err, errno, "Unable to get peer credentials"); 967 error_propagate(errp, err); 968 *pid = -1; 969 return -1; 970 } 971 *pid = (unsigned int)cred.pid; 972 return 0; 973 #else 974 error_setg(errp, "Unsupported feature"); 975 *pid = -1; 976 return -1; 977 #endif 978 } 979 980 static int 981 qio_channel_socket_close(QIOChannel *ioc, 982 Error **errp) 983 { 984 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 985 int rc = 0; 986 Error *err = NULL; 987 988 if (sioc->fd != -1) { 989 #ifdef WIN32 990 qemu_socket_unselect_nofail(sioc->fd); 991 #endif 992 if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) { 993 socket_listen_cleanup(sioc->fd, errp); 994 } 995 996 if (close(sioc->fd) < 0) { 997 sioc->fd = -1; 998 error_setg_errno(&err, errno, "Unable to close socket"); 999 error_propagate(errp, err); 1000 return -1; 1001 } 1002 sioc->fd = -1; 1003 } 1004 return rc; 1005 } 1006 1007 static int 1008 qio_channel_socket_shutdown(QIOChannel *ioc, 1009 QIOChannelShutdown how, 1010 Error **errp) 1011 { 1012 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 1013 int sockhow; 1014 1015 switch (how) { 1016 case QIO_CHANNEL_SHUTDOWN_READ: 1017 sockhow = SHUT_RD; 1018 break; 1019 case QIO_CHANNEL_SHUTDOWN_WRITE: 1020 sockhow = SHUT_WR; 1021 break; 1022 case QIO_CHANNEL_SHUTDOWN_BOTH: 1023 default: 1024 sockhow = SHUT_RDWR; 1025 break; 1026 } 1027 1028 if (shutdown(sioc->fd, sockhow) < 0) { 1029 error_setg_errno(errp, errno, 1030 "Unable to shutdown socket"); 1031 return -1; 1032 } 1033 return 0; 1034 } 1035 1036 static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc, 1037 AioContext *read_ctx, 1038 IOHandler *io_read, 1039 AioContext *write_ctx, 1040 IOHandler *io_write, 1041 void *opaque) 1042 { 1043 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 1044 1045 qio_channel_util_set_aio_fd_handler(sioc->fd, read_ctx, io_read, 1046 sioc->fd, write_ctx, io_write, 1047 opaque); 1048 } 1049 1050 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc, 1051 GIOCondition condition) 1052 { 1053 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 1054 return qio_channel_create_socket_watch(ioc, 1055 sioc->fd, 1056 condition); 1057 } 1058 1059 static void qio_channel_socket_class_init(ObjectClass *klass, 1060 const void *class_data G_GNUC_UNUSED) 1061 { 1062 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 1063 1064 ioc_klass->io_writev = qio_channel_socket_writev; 1065 ioc_klass->io_readv = qio_channel_socket_readv; 1066 ioc_klass->io_set_blocking = qio_channel_socket_set_blocking; 1067 ioc_klass->io_close = qio_channel_socket_close; 1068 ioc_klass->io_shutdown = qio_channel_socket_shutdown; 1069 ioc_klass->io_set_cork = qio_channel_socket_set_cork; 1070 ioc_klass->io_set_delay = qio_channel_socket_set_delay; 1071 ioc_klass->io_create_watch = qio_channel_socket_create_watch; 1072 ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler; 1073 #ifdef QEMU_MSG_ZEROCOPY 1074 ioc_klass->io_flush = qio_channel_socket_flush; 1075 #endif 1076 ioc_klass->io_peerpid = qio_channel_socket_get_peerpid; 1077 } 1078 1079 static const TypeInfo qio_channel_socket_info = { 1080 .parent = TYPE_QIO_CHANNEL, 1081 .name = TYPE_QIO_CHANNEL_SOCKET, 1082 .instance_size = sizeof(QIOChannelSocket), 1083 .instance_init = qio_channel_socket_init, 1084 .instance_finalize = qio_channel_socket_finalize, 1085 .class_init = qio_channel_socket_class_init, 1086 }; 1087 1088 static void qio_channel_socket_register_types(void) 1089 { 1090 type_register_static(&qio_channel_socket_info); 1091 } 1092 1093 type_init(qio_channel_socket_register_types); 1094