1 /* 2 * QEMU I/O channels sockets driver 3 * 4 * Copyright (c) 2015 Red Hat, Inc. 5 * 6 * This library is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU Lesser General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This library is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * Lesser General Public License for more details. 15 * 16 * You should have received a copy of the GNU Lesser General Public 17 * License along with this library; if not, see <http://www.gnu.org/licenses/>. 18 * 19 */ 20 21 #include "qemu/osdep.h" 22 #include "io/channel-socket.h" 23 #include "io/channel-watch.h" 24 #include "trace.h" 25 26 #define SOCKET_MAX_FDS 16 27 28 SocketAddress * 29 qio_channel_socket_get_local_address(QIOChannelSocket *ioc, 30 Error **errp) 31 { 32 return socket_sockaddr_to_address(&ioc->localAddr, 33 ioc->localAddrLen, 34 errp); 35 } 36 37 SocketAddress * 38 qio_channel_socket_get_remote_address(QIOChannelSocket *ioc, 39 Error **errp) 40 { 41 return socket_sockaddr_to_address(&ioc->remoteAddr, 42 ioc->remoteAddrLen, 43 errp); 44 } 45 46 QIOChannelSocket * 47 qio_channel_socket_new(void) 48 { 49 QIOChannelSocket *sioc; 50 QIOChannel *ioc; 51 52 sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET)); 53 sioc->fd = -1; 54 55 ioc = QIO_CHANNEL(sioc); 56 ioc->features |= (1 << QIO_CHANNEL_FEATURE_SHUTDOWN); 57 58 trace_qio_channel_socket_new(sioc); 59 60 return sioc; 61 } 62 63 64 static int 65 qio_channel_socket_set_fd(QIOChannelSocket *sioc, 66 int fd, 67 Error **errp) 68 { 69 if (sioc->fd != -1) { 70 error_setg(errp, "Socket is already open"); 71 return -1; 72 } 73 74 sioc->fd = fd; 75 sioc->remoteAddrLen = sizeof(sioc->remoteAddr); 76 sioc->localAddrLen = sizeof(sioc->localAddr); 77 78 79 if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr, 80 &sioc->remoteAddrLen) < 0) { 81 if (socket_error() == ENOTCONN) { 82 memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr)); 83 sioc->remoteAddrLen = sizeof(sioc->remoteAddr); 84 } else { 85 error_setg_errno(errp, socket_error(), 86 "Unable to query remote socket address"); 87 goto error; 88 } 89 } 90 91 if (getsockname(fd, (struct sockaddr *)&sioc->localAddr, 92 &sioc->localAddrLen) < 0) { 93 error_setg_errno(errp, socket_error(), 94 "Unable to query local socket address"); 95 goto error; 96 } 97 98 #ifndef WIN32 99 if (sioc->localAddr.ss_family == AF_UNIX) { 100 QIOChannel *ioc = QIO_CHANNEL(sioc); 101 ioc->features |= (1 << QIO_CHANNEL_FEATURE_FD_PASS); 102 } 103 #endif /* WIN32 */ 104 105 return 0; 106 107 error: 108 sioc->fd = -1; /* Let the caller close FD on failure */ 109 return -1; 110 } 111 112 QIOChannelSocket * 113 qio_channel_socket_new_fd(int fd, 114 Error **errp) 115 { 116 QIOChannelSocket *ioc; 117 118 ioc = qio_channel_socket_new(); 119 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 120 object_unref(OBJECT(ioc)); 121 return NULL; 122 } 123 124 trace_qio_channel_socket_new_fd(ioc, fd); 125 126 return ioc; 127 } 128 129 130 int qio_channel_socket_connect_sync(QIOChannelSocket *ioc, 131 SocketAddress *addr, 132 Error **errp) 133 { 134 int fd; 135 136 trace_qio_channel_socket_connect_sync(ioc, addr); 137 fd = socket_connect(addr, errp, NULL, NULL); 138 if (fd < 0) { 139 trace_qio_channel_socket_connect_fail(ioc); 140 return -1; 141 } 142 143 trace_qio_channel_socket_connect_complete(ioc, fd); 144 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 145 close(fd); 146 return -1; 147 } 148 149 return 0; 150 } 151 152 153 static int qio_channel_socket_connect_worker(QIOTask *task, 154 Error **errp, 155 gpointer opaque) 156 { 157 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); 158 SocketAddress *addr = opaque; 159 int ret; 160 161 ret = qio_channel_socket_connect_sync(ioc, 162 addr, 163 errp); 164 165 object_unref(OBJECT(ioc)); 166 return ret; 167 } 168 169 170 void qio_channel_socket_connect_async(QIOChannelSocket *ioc, 171 SocketAddress *addr, 172 QIOTaskFunc callback, 173 gpointer opaque, 174 GDestroyNotify destroy) 175 { 176 QIOTask *task = qio_task_new( 177 OBJECT(ioc), callback, opaque, destroy); 178 SocketAddress *addrCopy; 179 180 qapi_copy_SocketAddress(&addrCopy, addr); 181 182 /* socket_connect() does a non-blocking connect(), but it 183 * still blocks in DNS lookups, so we must use a thread */ 184 trace_qio_channel_socket_connect_async(ioc, addr); 185 qio_task_run_in_thread(task, 186 qio_channel_socket_connect_worker, 187 addrCopy, 188 (GDestroyNotify)qapi_free_SocketAddress); 189 } 190 191 192 int qio_channel_socket_listen_sync(QIOChannelSocket *ioc, 193 SocketAddress *addr, 194 Error **errp) 195 { 196 int fd; 197 198 trace_qio_channel_socket_listen_sync(ioc, addr); 199 fd = socket_listen(addr, errp); 200 if (fd < 0) { 201 trace_qio_channel_socket_listen_fail(ioc); 202 return -1; 203 } 204 205 trace_qio_channel_socket_listen_complete(ioc, fd); 206 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 207 close(fd); 208 return -1; 209 } 210 211 return 0; 212 } 213 214 215 static int qio_channel_socket_listen_worker(QIOTask *task, 216 Error **errp, 217 gpointer opaque) 218 { 219 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); 220 SocketAddress *addr = opaque; 221 int ret; 222 223 ret = qio_channel_socket_listen_sync(ioc, 224 addr, 225 errp); 226 227 object_unref(OBJECT(ioc)); 228 return ret; 229 } 230 231 232 void qio_channel_socket_listen_async(QIOChannelSocket *ioc, 233 SocketAddress *addr, 234 QIOTaskFunc callback, 235 gpointer opaque, 236 GDestroyNotify destroy) 237 { 238 QIOTask *task = qio_task_new( 239 OBJECT(ioc), callback, opaque, destroy); 240 SocketAddress *addrCopy; 241 242 qapi_copy_SocketAddress(&addrCopy, addr); 243 244 /* socket_listen() blocks in DNS lookups, so we must use a thread */ 245 trace_qio_channel_socket_listen_async(ioc, addr); 246 qio_task_run_in_thread(task, 247 qio_channel_socket_listen_worker, 248 addrCopy, 249 (GDestroyNotify)qapi_free_SocketAddress); 250 } 251 252 253 int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc, 254 SocketAddress *localAddr, 255 SocketAddress *remoteAddr, 256 Error **errp) 257 { 258 int fd; 259 260 trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr); 261 fd = socket_dgram(remoteAddr, localAddr, errp); 262 if (fd < 0) { 263 trace_qio_channel_socket_dgram_fail(ioc); 264 return -1; 265 } 266 267 trace_qio_channel_socket_dgram_complete(ioc, fd); 268 if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) { 269 close(fd); 270 return -1; 271 } 272 273 return 0; 274 } 275 276 277 struct QIOChannelSocketDGramWorkerData { 278 SocketAddress *localAddr; 279 SocketAddress *remoteAddr; 280 }; 281 282 283 static void qio_channel_socket_dgram_worker_free(gpointer opaque) 284 { 285 struct QIOChannelSocketDGramWorkerData *data = opaque; 286 qapi_free_SocketAddress(data->localAddr); 287 qapi_free_SocketAddress(data->remoteAddr); 288 g_free(data); 289 } 290 291 static int qio_channel_socket_dgram_worker(QIOTask *task, 292 Error **errp, 293 gpointer opaque) 294 { 295 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); 296 struct QIOChannelSocketDGramWorkerData *data = opaque; 297 int ret; 298 299 /* socket_dgram() blocks in DNS lookups, so we must use a thread */ 300 ret = qio_channel_socket_dgram_sync(ioc, 301 data->localAddr, 302 data->remoteAddr, 303 errp); 304 305 object_unref(OBJECT(ioc)); 306 return ret; 307 } 308 309 310 void qio_channel_socket_dgram_async(QIOChannelSocket *ioc, 311 SocketAddress *localAddr, 312 SocketAddress *remoteAddr, 313 QIOTaskFunc callback, 314 gpointer opaque, 315 GDestroyNotify destroy) 316 { 317 QIOTask *task = qio_task_new( 318 OBJECT(ioc), callback, opaque, destroy); 319 struct QIOChannelSocketDGramWorkerData *data = g_new0( 320 struct QIOChannelSocketDGramWorkerData, 1); 321 322 qapi_copy_SocketAddress(&data->localAddr, localAddr); 323 qapi_copy_SocketAddress(&data->remoteAddr, remoteAddr); 324 325 trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr); 326 qio_task_run_in_thread(task, 327 qio_channel_socket_dgram_worker, 328 data, 329 qio_channel_socket_dgram_worker_free); 330 } 331 332 333 QIOChannelSocket * 334 qio_channel_socket_accept(QIOChannelSocket *ioc, 335 Error **errp) 336 { 337 QIOChannelSocket *cioc; 338 339 cioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET)); 340 cioc->fd = -1; 341 cioc->remoteAddrLen = sizeof(ioc->remoteAddr); 342 cioc->localAddrLen = sizeof(ioc->localAddr); 343 344 retry: 345 trace_qio_channel_socket_accept(ioc); 346 cioc->fd = accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr, 347 &cioc->remoteAddrLen); 348 if (cioc->fd < 0) { 349 trace_qio_channel_socket_accept_fail(ioc); 350 if (socket_error() == EINTR) { 351 goto retry; 352 } 353 goto error; 354 } 355 356 if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr, 357 &cioc->localAddrLen) < 0) { 358 error_setg_errno(errp, socket_error(), 359 "Unable to query local socket address"); 360 goto error; 361 } 362 363 #ifndef WIN32 364 if (cioc->localAddr.ss_family == AF_UNIX) { 365 QIO_CHANNEL(cioc)->features |= (1 << QIO_CHANNEL_FEATURE_FD_PASS); 366 } 367 #endif /* WIN32 */ 368 369 trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd); 370 return cioc; 371 372 error: 373 object_unref(OBJECT(cioc)); 374 return NULL; 375 } 376 377 static void qio_channel_socket_init(Object *obj) 378 { 379 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj); 380 ioc->fd = -1; 381 } 382 383 static void qio_channel_socket_finalize(Object *obj) 384 { 385 QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj); 386 if (ioc->fd != -1) { 387 close(ioc->fd); 388 ioc->fd = -1; 389 } 390 } 391 392 393 #ifndef WIN32 394 static void qio_channel_socket_copy_fds(struct msghdr *msg, 395 int **fds, size_t *nfds) 396 { 397 struct cmsghdr *cmsg; 398 399 *nfds = 0; 400 *fds = NULL; 401 402 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { 403 int fd_size, i; 404 int gotfds; 405 406 if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) || 407 cmsg->cmsg_level != SOL_SOCKET || 408 cmsg->cmsg_type != SCM_RIGHTS) { 409 continue; 410 } 411 412 fd_size = cmsg->cmsg_len - CMSG_LEN(0); 413 414 if (!fd_size) { 415 continue; 416 } 417 418 gotfds = fd_size / sizeof(int); 419 *fds = g_renew(int, *fds, *nfds + gotfds); 420 memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size); 421 422 for (i = 0; i < gotfds; i++) { 423 int fd = (*fds)[*nfds + i]; 424 if (fd < 0) { 425 continue; 426 } 427 428 /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */ 429 qemu_set_block(fd); 430 431 #ifndef MSG_CMSG_CLOEXEC 432 qemu_set_cloexec(fd); 433 #endif 434 } 435 *nfds += gotfds; 436 } 437 } 438 439 440 static ssize_t qio_channel_socket_readv(QIOChannel *ioc, 441 const struct iovec *iov, 442 size_t niov, 443 int **fds, 444 size_t *nfds, 445 Error **errp) 446 { 447 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 448 ssize_t ret; 449 struct msghdr msg = { NULL, }; 450 char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)]; 451 int sflags = 0; 452 453 memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)); 454 455 #ifdef MSG_CMSG_CLOEXEC 456 sflags |= MSG_CMSG_CLOEXEC; 457 #endif 458 459 msg.msg_iov = (struct iovec *)iov; 460 msg.msg_iovlen = niov; 461 if (fds && nfds) { 462 msg.msg_control = control; 463 msg.msg_controllen = sizeof(control); 464 } 465 466 retry: 467 ret = recvmsg(sioc->fd, &msg, sflags); 468 if (ret < 0) { 469 if (socket_error() == EAGAIN || 470 socket_error() == EWOULDBLOCK) { 471 return QIO_CHANNEL_ERR_BLOCK; 472 } 473 if (socket_error() == EINTR) { 474 goto retry; 475 } 476 477 error_setg_errno(errp, socket_error(), 478 "Unable to read from socket"); 479 return -1; 480 } 481 482 if (fds && nfds) { 483 qio_channel_socket_copy_fds(&msg, fds, nfds); 484 } 485 486 return ret; 487 } 488 489 static ssize_t qio_channel_socket_writev(QIOChannel *ioc, 490 const struct iovec *iov, 491 size_t niov, 492 int *fds, 493 size_t nfds, 494 Error **errp) 495 { 496 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 497 ssize_t ret; 498 struct msghdr msg = { NULL, }; 499 char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)]; 500 size_t fdsize = sizeof(int) * nfds; 501 struct cmsghdr *cmsg; 502 503 memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)); 504 505 msg.msg_iov = (struct iovec *)iov; 506 msg.msg_iovlen = niov; 507 508 if (nfds) { 509 if (nfds > SOCKET_MAX_FDS) { 510 error_setg_errno(errp, EINVAL, 511 "Only %d FDs can be sent, got %zu", 512 SOCKET_MAX_FDS, nfds); 513 return -1; 514 } 515 516 msg.msg_control = control; 517 msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds); 518 519 cmsg = CMSG_FIRSTHDR(&msg); 520 cmsg->cmsg_len = CMSG_LEN(fdsize); 521 cmsg->cmsg_level = SOL_SOCKET; 522 cmsg->cmsg_type = SCM_RIGHTS; 523 memcpy(CMSG_DATA(cmsg), fds, fdsize); 524 } 525 526 retry: 527 ret = sendmsg(sioc->fd, &msg, 0); 528 if (ret <= 0) { 529 if (socket_error() == EAGAIN || 530 socket_error() == EWOULDBLOCK) { 531 return QIO_CHANNEL_ERR_BLOCK; 532 } 533 if (socket_error() == EINTR) { 534 goto retry; 535 } 536 error_setg_errno(errp, socket_error(), 537 "Unable to write to socket"); 538 return -1; 539 } 540 return ret; 541 } 542 #else /* WIN32 */ 543 static ssize_t qio_channel_socket_readv(QIOChannel *ioc, 544 const struct iovec *iov, 545 size_t niov, 546 int **fds, 547 size_t *nfds, 548 Error **errp) 549 { 550 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 551 ssize_t done = 0; 552 ssize_t i; 553 554 for (i = 0; i < niov; i++) { 555 ssize_t ret; 556 retry: 557 ret = recv(sioc->fd, 558 iov[i].iov_base, 559 iov[i].iov_len, 560 0); 561 if (ret < 0) { 562 if (socket_error() == EAGAIN) { 563 if (done) { 564 return done; 565 } else { 566 return QIO_CHANNEL_ERR_BLOCK; 567 } 568 } else if (socket_error() == EINTR) { 569 goto retry; 570 } else { 571 error_setg_errno(errp, socket_error(), 572 "Unable to write to socket"); 573 return -1; 574 } 575 } 576 done += ret; 577 if (ret < iov[i].iov_len) { 578 return done; 579 } 580 } 581 582 return done; 583 } 584 585 static ssize_t qio_channel_socket_writev(QIOChannel *ioc, 586 const struct iovec *iov, 587 size_t niov, 588 int *fds, 589 size_t nfds, 590 Error **errp) 591 { 592 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 593 ssize_t done = 0; 594 ssize_t i; 595 596 for (i = 0; i < niov; i++) { 597 ssize_t ret; 598 retry: 599 ret = send(sioc->fd, 600 iov[i].iov_base, 601 iov[i].iov_len, 602 0); 603 if (ret < 0) { 604 if (socket_error() == EAGAIN) { 605 if (done) { 606 return done; 607 } else { 608 return QIO_CHANNEL_ERR_BLOCK; 609 } 610 } else if (socket_error() == EINTR) { 611 goto retry; 612 } else { 613 error_setg_errno(errp, socket_error(), 614 "Unable to write to socket"); 615 return -1; 616 } 617 } 618 done += ret; 619 if (ret < iov[i].iov_len) { 620 return done; 621 } 622 } 623 624 return done; 625 } 626 #endif /* WIN32 */ 627 628 static int 629 qio_channel_socket_set_blocking(QIOChannel *ioc, 630 bool enabled, 631 Error **errp) 632 { 633 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 634 635 if (enabled) { 636 qemu_set_block(sioc->fd); 637 } else { 638 qemu_set_nonblock(sioc->fd); 639 } 640 return 0; 641 } 642 643 644 static void 645 qio_channel_socket_set_delay(QIOChannel *ioc, 646 bool enabled) 647 { 648 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 649 int v = enabled ? 0 : 1; 650 651 qemu_setsockopt(sioc->fd, 652 IPPROTO_TCP, TCP_NODELAY, 653 &v, sizeof(v)); 654 } 655 656 657 static void 658 qio_channel_socket_set_cork(QIOChannel *ioc, 659 bool enabled) 660 { 661 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 662 int v = enabled ? 1 : 0; 663 664 socket_set_cork(sioc->fd, v); 665 } 666 667 668 static int 669 qio_channel_socket_close(QIOChannel *ioc, 670 Error **errp) 671 { 672 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 673 674 if (closesocket(sioc->fd) < 0) { 675 sioc->fd = -1; 676 error_setg_errno(errp, socket_error(), 677 "Unable to close socket"); 678 return -1; 679 } 680 sioc->fd = -1; 681 return 0; 682 } 683 684 static int 685 qio_channel_socket_shutdown(QIOChannel *ioc, 686 QIOChannelShutdown how, 687 Error **errp) 688 { 689 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 690 int sockhow; 691 692 switch (how) { 693 case QIO_CHANNEL_SHUTDOWN_READ: 694 sockhow = SHUT_RD; 695 break; 696 case QIO_CHANNEL_SHUTDOWN_WRITE: 697 sockhow = SHUT_WR; 698 break; 699 case QIO_CHANNEL_SHUTDOWN_BOTH: 700 default: 701 sockhow = SHUT_RDWR; 702 break; 703 } 704 705 if (shutdown(sioc->fd, sockhow) < 0) { 706 error_setg_errno(errp, socket_error(), 707 "Unable to shutdown socket"); 708 return -1; 709 } 710 return 0; 711 } 712 713 static GSource *qio_channel_socket_create_watch(QIOChannel *ioc, 714 GIOCondition condition) 715 { 716 QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc); 717 return qio_channel_create_fd_watch(ioc, 718 sioc->fd, 719 condition); 720 } 721 722 static void qio_channel_socket_class_init(ObjectClass *klass, 723 void *class_data G_GNUC_UNUSED) 724 { 725 QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); 726 727 ioc_klass->io_writev = qio_channel_socket_writev; 728 ioc_klass->io_readv = qio_channel_socket_readv; 729 ioc_klass->io_set_blocking = qio_channel_socket_set_blocking; 730 ioc_klass->io_close = qio_channel_socket_close; 731 ioc_klass->io_shutdown = qio_channel_socket_shutdown; 732 ioc_klass->io_set_cork = qio_channel_socket_set_cork; 733 ioc_klass->io_set_delay = qio_channel_socket_set_delay; 734 ioc_klass->io_create_watch = qio_channel_socket_create_watch; 735 } 736 737 static const TypeInfo qio_channel_socket_info = { 738 .parent = TYPE_QIO_CHANNEL, 739 .name = TYPE_QIO_CHANNEL_SOCKET, 740 .instance_size = sizeof(QIOChannelSocket), 741 .instance_init = qio_channel_socket_init, 742 .instance_finalize = qio_channel_socket_finalize, 743 .class_init = qio_channel_socket_class_init, 744 }; 745 746 static void qio_channel_socket_register_types(void) 747 { 748 type_register_static(&qio_channel_socket_info); 749 } 750 751 type_init(qio_channel_socket_register_types); 752