1 // SPDX-License-Identifier: GPL-2.0 2 3 #define _GNU_SOURCE 4 5 #include <errno.h> 6 #include <limits.h> 7 #include <fcntl.h> 8 #include <string.h> 9 #include <stdarg.h> 10 #include <stdbool.h> 11 #include <stdint.h> 12 #include <stdio.h> 13 #include <stdlib.h> 14 #include <strings.h> 15 #include <signal.h> 16 #include <unistd.h> 17 #include <time.h> 18 19 #include <sys/ioctl.h> 20 #include <sys/poll.h> 21 #include <sys/random.h> 22 #include <sys/sendfile.h> 23 #include <sys/stat.h> 24 #include <sys/socket.h> 25 #include <sys/types.h> 26 #include <sys/mman.h> 27 28 #include <netdb.h> 29 #include <netinet/in.h> 30 31 #include <linux/tcp.h> 32 #include <linux/time_types.h> 33 #include <linux/sockios.h> 34 35 extern int optind; 36 37 #ifndef IPPROTO_MPTCP 38 #define IPPROTO_MPTCP 262 39 #endif 40 #ifndef TCP_ULP 41 #define TCP_ULP 31 42 #endif 43 44 static int poll_timeout = 10 * 1000; 45 static bool listen_mode; 46 static bool quit; 47 48 enum cfg_mode { 49 CFG_MODE_POLL, 50 CFG_MODE_MMAP, 51 CFG_MODE_SENDFILE, 52 }; 53 54 enum cfg_peek { 55 CFG_NONE_PEEK, 56 CFG_WITH_PEEK, 57 CFG_AFTER_PEEK, 58 }; 59 60 static enum cfg_mode cfg_mode = CFG_MODE_POLL; 61 static enum cfg_peek cfg_peek = CFG_NONE_PEEK; 62 static const char *cfg_host; 63 static const char *cfg_port = "12000"; 64 static int cfg_sock_proto = IPPROTO_MPTCP; 65 static int pf = AF_INET; 66 static int cfg_sndbuf; 67 static int cfg_rcvbuf; 68 static bool cfg_join; 69 static bool cfg_remove; 70 static unsigned int cfg_time; 71 static unsigned int cfg_do_w; 72 static int cfg_wait; 73 static uint32_t cfg_mark; 74 static char *cfg_input; 75 static int cfg_repeat = 1; 76 static int cfg_truncate; 77 static int cfg_rcv_trunc; 78 79 struct cfg_cmsg_types { 80 unsigned int cmsg_enabled:1; 81 unsigned int timestampns:1; 82 unsigned int tcp_inq:1; 83 }; 84 85 struct cfg_sockopt_types { 86 unsigned int transparent:1; 87 unsigned int mptfo:1; 88 }; 89 90 struct tcp_inq_state { 91 unsigned int last; 92 bool expect_eof; 93 }; 94 95 struct wstate { 96 char buf[8192]; 97 unsigned int len; 98 unsigned int off; 99 unsigned int total_len; 100 }; 101 102 static struct tcp_inq_state tcp_inq; 103 104 static struct cfg_cmsg_types cfg_cmsg_types; 105 static struct cfg_sockopt_types cfg_sockopt_types; 106 107 static void die_usage(void) 108 { 109 fprintf(stderr, "Usage: mptcp_connect [-6] [-c cmsg] [-f offset] [-i file] [-I num] [-j] [-l] " 110 "[-m mode] [-M mark] [-o option] [-p port] [-P mode] [-r num] [-R num] " 111 "[-s MPTCP|TCP] [-S num] [-t num] [-T num] [-w sec] connect_address\n"); 112 fprintf(stderr, "\t-6 use ipv6\n"); 113 fprintf(stderr, "\t-c cmsg -- test cmsg type <cmsg>\n"); 114 fprintf(stderr, "\t-f offset -- stop the I/O after receiving and sending the specified amount " 115 "of bytes. If there are unread bytes in the receive queue, that will cause a MPTCP " 116 "fastclose at close/shutdown. If offset is negative, expect the peer to close before " 117 "all the local data as been sent, thus toleration errors on write and EPIPE signals\n"); 118 fprintf(stderr, "\t-i file -- read the data to send from the given file instead of stdin"); 119 fprintf(stderr, "\t-I num -- repeat the transfer 'num' times. In listen mode accepts num " 120 "incoming connections, in client mode, disconnect and reconnect to the server\n"); 121 fprintf(stderr, "\t-j -- add additional sleep at connection start and tear down " 122 "-- for MPJ tests\n"); 123 fprintf(stderr, "\t-l -- listens mode, accepts incoming connection\n"); 124 fprintf(stderr, "\t-m [poll|mmap|sendfile] -- use poll(default)/mmap+write/sendfile\n"); 125 fprintf(stderr, "\t-M mark -- set socket packet mark\n"); 126 fprintf(stderr, "\t-o option -- test sockopt <option>\n"); 127 fprintf(stderr, "\t-p num -- use port num\n"); 128 fprintf(stderr, 129 "\t-P [saveWithPeek|saveAfterPeek] -- save data with/after MSG_PEEK form tcp socket\n"); 130 fprintf(stderr, "\t-r num -- enable slow mode, limiting each write to num bytes " 131 "-- for remove addr tests\n"); 132 fprintf(stderr, "\t-R num -- set SO_RCVBUF to num\n"); 133 fprintf(stderr, "\t-s [MPTCP|TCP] -- use mptcp(default) or tcp sockets\n"); 134 fprintf(stderr, "\t-S num -- set SO_SNDBUF to num\n"); 135 fprintf(stderr, "\t-t num -- set poll timeout to num\n"); 136 fprintf(stderr, "\t-T num -- set expected runtime to num ms\n"); 137 fprintf(stderr, "\t-w num -- wait num sec before closing the socket\n"); 138 exit(1); 139 } 140 141 static void xerror(const char *fmt, ...) 142 { 143 va_list ap; 144 145 va_start(ap, fmt); 146 vfprintf(stderr, fmt, ap); 147 va_end(ap); 148 exit(1); 149 } 150 151 static void handle_signal(int nr) 152 { 153 quit = true; 154 } 155 156 static const char *getxinfo_strerr(int err) 157 { 158 if (err == EAI_SYSTEM) 159 return strerror(errno); 160 161 return gai_strerror(err); 162 } 163 164 static void xgetnameinfo(const struct sockaddr *addr, socklen_t addrlen, 165 char *host, socklen_t hostlen, 166 char *serv, socklen_t servlen) 167 { 168 int flags = NI_NUMERICHOST | NI_NUMERICSERV; 169 int err = getnameinfo(addr, addrlen, host, hostlen, serv, servlen, 170 flags); 171 172 if (err) { 173 const char *errstr = getxinfo_strerr(err); 174 175 fprintf(stderr, "Fatal: getnameinfo: %s\n", errstr); 176 exit(1); 177 } 178 } 179 180 static void xgetaddrinfo(const char *node, const char *service, 181 const struct addrinfo *hints, 182 struct addrinfo **res) 183 { 184 int err = getaddrinfo(node, service, hints, res); 185 186 if (err) { 187 const char *errstr = getxinfo_strerr(err); 188 189 fprintf(stderr, "Fatal: getaddrinfo(%s:%s): %s\n", 190 node ? node : "", service ? service : "", errstr); 191 exit(1); 192 } 193 } 194 195 static void set_rcvbuf(int fd, unsigned int size) 196 { 197 int err; 198 199 err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)); 200 if (err) { 201 perror("set SO_RCVBUF"); 202 exit(1); 203 } 204 } 205 206 static void set_sndbuf(int fd, unsigned int size) 207 { 208 int err; 209 210 err = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)); 211 if (err) { 212 perror("set SO_SNDBUF"); 213 exit(1); 214 } 215 } 216 217 static void set_mark(int fd, uint32_t mark) 218 { 219 int err; 220 221 err = setsockopt(fd, SOL_SOCKET, SO_MARK, &mark, sizeof(mark)); 222 if (err) { 223 perror("set SO_MARK"); 224 exit(1); 225 } 226 } 227 228 static void set_transparent(int fd, int pf) 229 { 230 int one = 1; 231 232 switch (pf) { 233 case AF_INET: 234 if (-1 == setsockopt(fd, SOL_IP, IP_TRANSPARENT, &one, sizeof(one))) 235 perror("IP_TRANSPARENT"); 236 break; 237 case AF_INET6: 238 if (-1 == setsockopt(fd, IPPROTO_IPV6, IPV6_TRANSPARENT, &one, sizeof(one))) 239 perror("IPV6_TRANSPARENT"); 240 break; 241 } 242 } 243 244 static void set_mptfo(int fd, int pf) 245 { 246 int qlen = 25; 247 248 if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &qlen, sizeof(qlen)) == -1) 249 perror("TCP_FASTOPEN"); 250 } 251 252 static int do_ulp_so(int sock, const char *name) 253 { 254 return setsockopt(sock, IPPROTO_TCP, TCP_ULP, name, strlen(name)); 255 } 256 257 #define X(m) xerror("%s:%u: %s: failed for proto %d at line %u", __FILE__, __LINE__, (m), proto, line) 258 static void sock_test_tcpulp(int sock, int proto, unsigned int line) 259 { 260 socklen_t buflen = 8; 261 char buf[8] = ""; 262 int ret = getsockopt(sock, IPPROTO_TCP, TCP_ULP, buf, &buflen); 263 264 if (ret != 0) 265 X("getsockopt"); 266 267 if (buflen > 0) { 268 if (strcmp(buf, "mptcp") != 0) 269 xerror("unexpected ULP '%s' for proto %d at line %u", buf, proto, line); 270 ret = do_ulp_so(sock, "tls"); 271 if (ret == 0) 272 X("setsockopt"); 273 } else if (proto == IPPROTO_MPTCP) { 274 ret = do_ulp_so(sock, "tls"); 275 if (ret != -1) 276 X("setsockopt"); 277 } 278 279 ret = do_ulp_so(sock, "mptcp"); 280 if (ret != -1) 281 X("setsockopt"); 282 283 #undef X 284 } 285 286 #define SOCK_TEST_TCPULP(s, p) sock_test_tcpulp((s), (p), __LINE__) 287 288 static int sock_listen_mptcp(const char * const listenaddr, 289 const char * const port) 290 { 291 int sock = -1; 292 struct addrinfo hints = { 293 .ai_protocol = IPPROTO_TCP, 294 .ai_socktype = SOCK_STREAM, 295 .ai_flags = AI_PASSIVE | AI_NUMERICHOST 296 }; 297 298 hints.ai_family = pf; 299 300 struct addrinfo *a, *addr; 301 int one = 1; 302 303 xgetaddrinfo(listenaddr, port, &hints, &addr); 304 hints.ai_family = pf; 305 306 for (a = addr; a; a = a->ai_next) { 307 sock = socket(a->ai_family, a->ai_socktype, cfg_sock_proto); 308 if (sock < 0) 309 continue; 310 311 SOCK_TEST_TCPULP(sock, cfg_sock_proto); 312 313 if (-1 == setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, 314 sizeof(one))) 315 perror("setsockopt"); 316 317 if (cfg_sockopt_types.transparent) 318 set_transparent(sock, pf); 319 320 if (cfg_sockopt_types.mptfo) 321 set_mptfo(sock, pf); 322 323 if (bind(sock, a->ai_addr, a->ai_addrlen) == 0) 324 break; /* success */ 325 326 perror("bind"); 327 close(sock); 328 sock = -1; 329 } 330 331 freeaddrinfo(addr); 332 333 if (sock < 0) { 334 fprintf(stderr, "Could not create listen socket\n"); 335 return sock; 336 } 337 338 SOCK_TEST_TCPULP(sock, cfg_sock_proto); 339 340 if (listen(sock, 20)) { 341 perror("listen"); 342 close(sock); 343 return -1; 344 } 345 346 SOCK_TEST_TCPULP(sock, cfg_sock_proto); 347 348 return sock; 349 } 350 351 static int sock_connect_mptcp(const char * const remoteaddr, 352 const char * const port, int proto, 353 struct addrinfo **peer, 354 int infd, struct wstate *winfo) 355 { 356 struct addrinfo hints = { 357 .ai_protocol = IPPROTO_TCP, 358 .ai_socktype = SOCK_STREAM, 359 }; 360 struct addrinfo *a, *addr; 361 int syn_copied = 0; 362 int sock = -1; 363 364 hints.ai_family = pf; 365 366 xgetaddrinfo(remoteaddr, port, &hints, &addr); 367 for (a = addr; a; a = a->ai_next) { 368 sock = socket(a->ai_family, a->ai_socktype, proto); 369 if (sock < 0) { 370 perror("socket"); 371 continue; 372 } 373 374 SOCK_TEST_TCPULP(sock, proto); 375 376 if (cfg_mark) 377 set_mark(sock, cfg_mark); 378 379 if (cfg_sockopt_types.mptfo) { 380 if (!winfo->total_len) 381 winfo->total_len = winfo->len = read(infd, winfo->buf, 382 sizeof(winfo->buf)); 383 384 syn_copied = sendto(sock, winfo->buf, winfo->len, MSG_FASTOPEN, 385 a->ai_addr, a->ai_addrlen); 386 if (syn_copied >= 0) { 387 winfo->off = syn_copied; 388 winfo->len -= syn_copied; 389 *peer = a; 390 break; /* success */ 391 } 392 } else { 393 if (connect(sock, a->ai_addr, a->ai_addrlen) == 0) { 394 *peer = a; 395 break; /* success */ 396 } 397 } 398 if (cfg_sockopt_types.mptfo) { 399 perror("sendto()"); 400 close(sock); 401 sock = -1; 402 } else { 403 perror("connect()"); 404 close(sock); 405 sock = -1; 406 } 407 } 408 409 freeaddrinfo(addr); 410 if (sock != -1) 411 SOCK_TEST_TCPULP(sock, proto); 412 return sock; 413 } 414 415 static size_t do_rnd_write(const int fd, char *buf, const size_t len) 416 { 417 static bool first = true; 418 unsigned int do_w; 419 ssize_t bw; 420 421 do_w = rand() & 0xffff; 422 if (do_w == 0 || do_w > len) 423 do_w = len; 424 425 if (cfg_join && first && do_w > 100) 426 do_w = 100; 427 428 if (cfg_remove && do_w > cfg_do_w) 429 do_w = cfg_do_w; 430 431 bw = write(fd, buf, do_w); 432 if (bw < 0) 433 return bw; 434 435 /* let the join handshake complete, before going on */ 436 if (cfg_join && first) { 437 usleep(200000); 438 first = false; 439 } 440 441 if (cfg_remove) 442 usleep(200000); 443 444 return bw; 445 } 446 447 static size_t do_write(const int fd, char *buf, const size_t len) 448 { 449 size_t offset = 0; 450 451 while (offset < len) { 452 size_t written; 453 ssize_t bw; 454 455 bw = write(fd, buf + offset, len - offset); 456 if (bw < 0) { 457 perror("write"); 458 return 0; 459 } 460 461 written = (size_t)bw; 462 offset += written; 463 } 464 465 return offset; 466 } 467 468 static void process_cmsg(struct msghdr *msgh) 469 { 470 struct __kernel_timespec ts; 471 bool inq_found = false; 472 bool ts_found = false; 473 unsigned int inq = 0; 474 struct cmsghdr *cmsg; 475 476 for (cmsg = CMSG_FIRSTHDR(msgh); cmsg ; cmsg = CMSG_NXTHDR(msgh, cmsg)) { 477 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_TIMESTAMPNS_NEW) { 478 memcpy(&ts, CMSG_DATA(cmsg), sizeof(ts)); 479 ts_found = true; 480 continue; 481 } 482 if (cmsg->cmsg_level == IPPROTO_TCP && cmsg->cmsg_type == TCP_CM_INQ) { 483 memcpy(&inq, CMSG_DATA(cmsg), sizeof(inq)); 484 inq_found = true; 485 continue; 486 } 487 488 } 489 490 if (cfg_cmsg_types.timestampns) { 491 if (!ts_found) 492 xerror("TIMESTAMPNS not present\n"); 493 } 494 495 if (cfg_cmsg_types.tcp_inq) { 496 if (!inq_found) 497 xerror("TCP_INQ not present\n"); 498 499 if (inq > 1024) 500 xerror("tcp_inq %u is larger than one kbyte\n", inq); 501 tcp_inq.last = inq; 502 } 503 } 504 505 static ssize_t do_recvmsg_cmsg(const int fd, char *buf, const size_t len) 506 { 507 char msg_buf[8192]; 508 struct iovec iov = { 509 .iov_base = buf, 510 .iov_len = len, 511 }; 512 struct msghdr msg = { 513 .msg_iov = &iov, 514 .msg_iovlen = 1, 515 .msg_control = msg_buf, 516 .msg_controllen = sizeof(msg_buf), 517 }; 518 int flags = 0; 519 unsigned int last_hint = tcp_inq.last; 520 int ret = recvmsg(fd, &msg, flags); 521 522 if (ret <= 0) { 523 if (ret == 0 && tcp_inq.expect_eof) 524 return ret; 525 526 if (ret == 0 && cfg_cmsg_types.tcp_inq) 527 if (last_hint != 1 && last_hint != 0) 528 xerror("EOF but last tcp_inq hint was %u\n", last_hint); 529 530 return ret; 531 } 532 533 if (tcp_inq.expect_eof) 534 xerror("expected EOF, last_hint %u, now %u\n", 535 last_hint, tcp_inq.last); 536 537 if (msg.msg_controllen && !cfg_cmsg_types.cmsg_enabled) 538 xerror("got %lu bytes of cmsg data, expected 0\n", 539 (unsigned long)msg.msg_controllen); 540 541 if (msg.msg_controllen == 0 && cfg_cmsg_types.cmsg_enabled) 542 xerror("%s\n", "got no cmsg data"); 543 544 if (msg.msg_controllen) 545 process_cmsg(&msg); 546 547 if (cfg_cmsg_types.tcp_inq) { 548 if ((size_t)ret < len && last_hint > (unsigned int)ret) { 549 if (ret + 1 != (int)last_hint) { 550 int next = read(fd, msg_buf, sizeof(msg_buf)); 551 552 xerror("read %u of %u, last_hint was %u tcp_inq hint now %u next_read returned %d/%m\n", 553 ret, (unsigned int)len, last_hint, tcp_inq.last, next); 554 } else { 555 tcp_inq.expect_eof = true; 556 } 557 } 558 } 559 560 return ret; 561 } 562 563 static ssize_t do_rnd_read(const int fd, char *buf, const size_t len) 564 { 565 int ret = 0; 566 char tmp[16384]; 567 size_t cap = rand(); 568 569 cap &= 0xffff; 570 571 if (cap == 0) 572 cap = 1; 573 else if (cap > len) 574 cap = len; 575 576 if (cfg_peek == CFG_WITH_PEEK) { 577 ret = recv(fd, buf, cap, MSG_PEEK); 578 ret = (ret < 0) ? ret : read(fd, tmp, ret); 579 } else if (cfg_peek == CFG_AFTER_PEEK) { 580 ret = recv(fd, buf, cap, MSG_PEEK); 581 ret = (ret < 0) ? ret : read(fd, buf, cap); 582 } else if (cfg_cmsg_types.cmsg_enabled) { 583 ret = do_recvmsg_cmsg(fd, buf, cap); 584 } else { 585 ret = read(fd, buf, cap); 586 } 587 588 return ret; 589 } 590 591 static void set_nonblock(int fd, bool nonblock) 592 { 593 int flags = fcntl(fd, F_GETFL); 594 595 if (flags == -1) 596 return; 597 598 if (nonblock) 599 fcntl(fd, F_SETFL, flags | O_NONBLOCK); 600 else 601 fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); 602 } 603 604 static void shut_wr(int fd) 605 { 606 /* Close our write side, ev. give some time 607 * for address notification and/or checking 608 * the current status 609 */ 610 if (cfg_wait) 611 usleep(cfg_wait); 612 613 shutdown(fd, SHUT_WR); 614 } 615 616 static int copyfd_io_poll(int infd, int peerfd, int outfd, 617 bool *in_closed_after_out, struct wstate *winfo) 618 { 619 struct pollfd fds = { 620 .fd = peerfd, 621 .events = POLLIN | POLLOUT, 622 }; 623 unsigned int total_wlen = 0, total_rlen = 0; 624 625 set_nonblock(peerfd, true); 626 627 for (;;) { 628 char rbuf[8192]; 629 ssize_t len; 630 631 if (fds.events == 0 || quit) 632 break; 633 634 switch (poll(&fds, 1, poll_timeout)) { 635 case -1: 636 if (errno == EINTR) 637 continue; 638 perror("poll"); 639 return 1; 640 case 0: 641 fprintf(stderr, "%s: poll timed out (events: " 642 "POLLIN %u, POLLOUT %u)\n", __func__, 643 fds.events & POLLIN, fds.events & POLLOUT); 644 return 2; 645 } 646 647 if (fds.revents & POLLIN) { 648 ssize_t rb = sizeof(rbuf); 649 650 /* limit the total amount of read data to the trunc value*/ 651 if (cfg_truncate > 0) { 652 if (rb + total_rlen > cfg_truncate) 653 rb = cfg_truncate - total_rlen; 654 len = read(peerfd, rbuf, rb); 655 } else { 656 len = do_rnd_read(peerfd, rbuf, sizeof(rbuf)); 657 } 658 if (len == 0) { 659 /* no more data to receive: 660 * peer has closed its write side 661 */ 662 fds.events &= ~POLLIN; 663 664 if ((fds.events & POLLOUT) == 0) { 665 *in_closed_after_out = true; 666 /* and nothing more to send */ 667 break; 668 } 669 670 /* Else, still have data to transmit */ 671 } else if (len < 0) { 672 if (cfg_rcv_trunc) 673 return 0; 674 perror("read"); 675 return 3; 676 } 677 678 total_rlen += len; 679 do_write(outfd, rbuf, len); 680 } 681 682 if (fds.revents & POLLOUT) { 683 if (winfo->len == 0) { 684 winfo->off = 0; 685 winfo->len = read(infd, winfo->buf, sizeof(winfo->buf)); 686 } 687 688 if (winfo->len > 0) { 689 ssize_t bw; 690 691 /* limit the total amount of written data to the trunc value */ 692 if (cfg_truncate > 0 && winfo->len + total_wlen > cfg_truncate) 693 winfo->len = cfg_truncate - total_wlen; 694 695 bw = do_rnd_write(peerfd, winfo->buf + winfo->off, winfo->len); 696 if (bw < 0) { 697 if (cfg_rcv_trunc) 698 return 0; 699 perror("write"); 700 return 111; 701 } 702 703 winfo->off += bw; 704 winfo->len -= bw; 705 total_wlen += bw; 706 } else if (winfo->len == 0) { 707 /* We have no more data to send. */ 708 fds.events &= ~POLLOUT; 709 710 if ((fds.events & POLLIN) == 0) 711 /* ... and peer also closed already */ 712 break; 713 714 shut_wr(peerfd); 715 } else { 716 if (errno == EINTR) 717 continue; 718 perror("read"); 719 return 4; 720 } 721 } 722 723 if (fds.revents & (POLLERR | POLLNVAL)) { 724 if (cfg_rcv_trunc) 725 return 0; 726 fprintf(stderr, "Unexpected revents: " 727 "POLLERR/POLLNVAL(%x)\n", fds.revents); 728 return 5; 729 } 730 731 if (cfg_truncate > 0 && total_wlen >= cfg_truncate && 732 total_rlen >= cfg_truncate) 733 break; 734 } 735 736 /* leave some time for late join/announce */ 737 if (cfg_remove && !quit) 738 usleep(cfg_wait); 739 740 return 0; 741 } 742 743 static int do_recvfile(int infd, int outfd) 744 { 745 ssize_t r; 746 747 do { 748 char buf[16384]; 749 750 r = do_rnd_read(infd, buf, sizeof(buf)); 751 if (r > 0) { 752 if (write(outfd, buf, r) != r) 753 break; 754 } else if (r < 0) { 755 perror("read"); 756 } 757 } while (r > 0); 758 759 return (int)r; 760 } 761 762 static int spool_buf(int fd, struct wstate *winfo) 763 { 764 while (winfo->len) { 765 int ret = write(fd, winfo->buf + winfo->off, winfo->len); 766 767 if (ret < 0) { 768 perror("write"); 769 return 4; 770 } 771 winfo->off += ret; 772 winfo->len -= ret; 773 } 774 return 0; 775 } 776 777 static int do_mmap(int infd, int outfd, unsigned int size, 778 struct wstate *winfo) 779 { 780 char *inbuf = mmap(NULL, size, PROT_READ, MAP_SHARED, infd, 0); 781 ssize_t ret = 0, off = winfo->total_len; 782 size_t rem; 783 784 if (inbuf == MAP_FAILED) { 785 perror("mmap"); 786 return 1; 787 } 788 789 ret = spool_buf(outfd, winfo); 790 if (ret < 0) 791 return ret; 792 793 rem = size - winfo->total_len; 794 795 while (rem > 0) { 796 ret = write(outfd, inbuf + off, rem); 797 798 if (ret < 0) { 799 perror("write"); 800 break; 801 } 802 803 off += ret; 804 rem -= ret; 805 } 806 807 munmap(inbuf, size); 808 return rem; 809 } 810 811 static int get_infd_size(int fd) 812 { 813 struct stat sb; 814 ssize_t count; 815 int err; 816 817 err = fstat(fd, &sb); 818 if (err < 0) { 819 perror("fstat"); 820 return -1; 821 } 822 823 if ((sb.st_mode & S_IFMT) != S_IFREG) { 824 fprintf(stderr, "%s: stdin is not a regular file\n", __func__); 825 return -2; 826 } 827 828 count = sb.st_size; 829 if (count > INT_MAX) { 830 fprintf(stderr, "File too large: %zu\n", count); 831 return -3; 832 } 833 834 return (int)count; 835 } 836 837 static int do_sendfile(int infd, int outfd, unsigned int count, 838 struct wstate *winfo) 839 { 840 int ret = spool_buf(outfd, winfo); 841 842 if (ret < 0) 843 return ret; 844 845 count -= winfo->total_len; 846 847 while (count > 0) { 848 ssize_t r; 849 850 r = sendfile(outfd, infd, NULL, count); 851 if (r < 0) { 852 perror("sendfile"); 853 return 3; 854 } 855 856 count -= r; 857 } 858 859 return 0; 860 } 861 862 static int copyfd_io_mmap(int infd, int peerfd, int outfd, 863 unsigned int size, bool *in_closed_after_out, 864 struct wstate *winfo) 865 { 866 int err; 867 868 if (listen_mode) { 869 err = do_recvfile(peerfd, outfd); 870 if (err) 871 return err; 872 873 err = do_mmap(infd, peerfd, size, winfo); 874 } else { 875 err = do_mmap(infd, peerfd, size, winfo); 876 if (err) 877 return err; 878 879 shut_wr(peerfd); 880 881 err = do_recvfile(peerfd, outfd); 882 *in_closed_after_out = true; 883 } 884 885 return err; 886 } 887 888 static int copyfd_io_sendfile(int infd, int peerfd, int outfd, 889 unsigned int size, bool *in_closed_after_out, struct wstate *winfo) 890 { 891 int err; 892 893 if (listen_mode) { 894 err = do_recvfile(peerfd, outfd); 895 if (err) 896 return err; 897 898 err = do_sendfile(infd, peerfd, size, winfo); 899 } else { 900 err = do_sendfile(infd, peerfd, size, winfo); 901 if (err) 902 return err; 903 904 shut_wr(peerfd); 905 906 err = do_recvfile(peerfd, outfd); 907 *in_closed_after_out = true; 908 } 909 910 return err; 911 } 912 913 static int copyfd_io(int infd, int peerfd, int outfd, bool close_peerfd, struct wstate *winfo) 914 { 915 bool in_closed_after_out = false; 916 struct timespec start, end; 917 int file_size; 918 int ret; 919 920 if (cfg_time && (clock_gettime(CLOCK_MONOTONIC, &start) < 0)) 921 xerror("can not fetch start time %d", errno); 922 923 switch (cfg_mode) { 924 case CFG_MODE_POLL: 925 ret = copyfd_io_poll(infd, peerfd, outfd, &in_closed_after_out, 926 winfo); 927 break; 928 929 case CFG_MODE_MMAP: 930 file_size = get_infd_size(infd); 931 if (file_size < 0) 932 return file_size; 933 ret = copyfd_io_mmap(infd, peerfd, outfd, file_size, 934 &in_closed_after_out, winfo); 935 break; 936 937 case CFG_MODE_SENDFILE: 938 file_size = get_infd_size(infd); 939 if (file_size < 0) 940 return file_size; 941 ret = copyfd_io_sendfile(infd, peerfd, outfd, file_size, 942 &in_closed_after_out, winfo); 943 break; 944 945 default: 946 fprintf(stderr, "Invalid mode %d\n", cfg_mode); 947 948 die_usage(); 949 return 1; 950 } 951 952 if (ret) 953 return ret; 954 955 if (close_peerfd) 956 close(peerfd); 957 958 if (cfg_time) { 959 unsigned int delta_ms; 960 961 if (clock_gettime(CLOCK_MONOTONIC, &end) < 0) 962 xerror("can not fetch end time %d", errno); 963 delta_ms = (end.tv_sec - start.tv_sec) * 1000 + (end.tv_nsec - start.tv_nsec) / 1000000; 964 if (delta_ms > cfg_time) { 965 xerror("transfer slower than expected! runtime %d ms, expected %d ms", 966 delta_ms, cfg_time); 967 } 968 969 /* show the runtime only if this end shutdown(wr) before receiving the EOF, 970 * (that is, if this end got the longer runtime) 971 */ 972 if (in_closed_after_out) 973 fprintf(stderr, "%d", delta_ms); 974 } 975 976 return 0; 977 } 978 979 static void check_sockaddr(int pf, struct sockaddr_storage *ss, 980 socklen_t salen) 981 { 982 struct sockaddr_in6 *sin6; 983 struct sockaddr_in *sin; 984 socklen_t wanted_size = 0; 985 986 switch (pf) { 987 case AF_INET: 988 wanted_size = sizeof(*sin); 989 sin = (void *)ss; 990 if (!sin->sin_port) 991 fprintf(stderr, "accept: something wrong: ip connection from port 0"); 992 break; 993 case AF_INET6: 994 wanted_size = sizeof(*sin6); 995 sin6 = (void *)ss; 996 if (!sin6->sin6_port) 997 fprintf(stderr, "accept: something wrong: ipv6 connection from port 0"); 998 break; 999 default: 1000 fprintf(stderr, "accept: Unknown pf %d, salen %u\n", pf, salen); 1001 return; 1002 } 1003 1004 if (salen != wanted_size) 1005 fprintf(stderr, "accept: size mismatch, got %d expected %d\n", 1006 (int)salen, wanted_size); 1007 1008 if (ss->ss_family != pf) 1009 fprintf(stderr, "accept: pf mismatch, expect %d, ss_family is %d\n", 1010 (int)ss->ss_family, pf); 1011 } 1012 1013 static void check_getpeername(int fd, struct sockaddr_storage *ss, socklen_t salen) 1014 { 1015 struct sockaddr_storage peerss; 1016 socklen_t peersalen = sizeof(peerss); 1017 1018 if (getpeername(fd, (struct sockaddr *)&peerss, &peersalen) < 0) { 1019 perror("getpeername"); 1020 return; 1021 } 1022 1023 if (peersalen != salen) { 1024 fprintf(stderr, "%s: %d vs %d\n", __func__, peersalen, salen); 1025 return; 1026 } 1027 1028 if (memcmp(ss, &peerss, peersalen)) { 1029 char a[INET6_ADDRSTRLEN]; 1030 char b[INET6_ADDRSTRLEN]; 1031 char c[INET6_ADDRSTRLEN]; 1032 char d[INET6_ADDRSTRLEN]; 1033 1034 xgetnameinfo((struct sockaddr *)ss, salen, 1035 a, sizeof(a), b, sizeof(b)); 1036 1037 xgetnameinfo((struct sockaddr *)&peerss, peersalen, 1038 c, sizeof(c), d, sizeof(d)); 1039 1040 fprintf(stderr, "%s: memcmp failure: accept %s vs peername %s, %s vs %s salen %d vs %d\n", 1041 __func__, a, c, b, d, peersalen, salen); 1042 } 1043 } 1044 1045 static void check_getpeername_connect(int fd) 1046 { 1047 struct sockaddr_storage ss; 1048 socklen_t salen = sizeof(ss); 1049 char a[INET6_ADDRSTRLEN]; 1050 char b[INET6_ADDRSTRLEN]; 1051 1052 if (getpeername(fd, (struct sockaddr *)&ss, &salen) < 0) { 1053 perror("getpeername"); 1054 return; 1055 } 1056 1057 xgetnameinfo((struct sockaddr *)&ss, salen, 1058 a, sizeof(a), b, sizeof(b)); 1059 1060 if (strcmp(cfg_host, a) || strcmp(cfg_port, b)) 1061 fprintf(stderr, "%s: %s vs %s, %s vs %s\n", __func__, 1062 cfg_host, a, cfg_port, b); 1063 } 1064 1065 static void maybe_close(int fd) 1066 { 1067 unsigned int r = rand(); 1068 1069 if (!(cfg_join || cfg_remove || cfg_repeat > 1) && (r & 1)) 1070 close(fd); 1071 } 1072 1073 int main_loop_s(int listensock) 1074 { 1075 struct sockaddr_storage ss; 1076 struct wstate winfo; 1077 struct pollfd polls; 1078 socklen_t salen; 1079 int remotesock; 1080 int fd = 0; 1081 1082 again: 1083 polls.fd = listensock; 1084 polls.events = POLLIN; 1085 1086 switch (poll(&polls, 1, poll_timeout)) { 1087 case -1: 1088 perror("poll"); 1089 return 1; 1090 case 0: 1091 fprintf(stderr, "%s: timed out\n", __func__); 1092 close(listensock); 1093 return 2; 1094 } 1095 1096 salen = sizeof(ss); 1097 remotesock = accept(listensock, (struct sockaddr *)&ss, &salen); 1098 if (remotesock >= 0) { 1099 maybe_close(listensock); 1100 check_sockaddr(pf, &ss, salen); 1101 check_getpeername(remotesock, &ss, salen); 1102 1103 if (cfg_input) { 1104 fd = open(cfg_input, O_RDONLY); 1105 if (fd < 0) 1106 xerror("can't open %s: %d", cfg_input, errno); 1107 } 1108 1109 SOCK_TEST_TCPULP(remotesock, 0); 1110 1111 memset(&winfo, 0, sizeof(winfo)); 1112 copyfd_io(fd, remotesock, 1, true, &winfo); 1113 } else { 1114 perror("accept"); 1115 return 1; 1116 } 1117 1118 if (--cfg_repeat > 0) { 1119 if (cfg_input) 1120 close(fd); 1121 goto again; 1122 } 1123 1124 return 0; 1125 } 1126 1127 static void init_rng(void) 1128 { 1129 unsigned int foo; 1130 1131 if (getrandom(&foo, sizeof(foo), 0) == -1) { 1132 perror("getrandom"); 1133 exit(1); 1134 } 1135 1136 srand(foo); 1137 } 1138 1139 static void xsetsockopt(int fd, int level, int optname, const void *optval, socklen_t optlen) 1140 { 1141 int err; 1142 1143 err = setsockopt(fd, level, optname, optval, optlen); 1144 if (err) { 1145 perror("setsockopt"); 1146 exit(1); 1147 } 1148 } 1149 1150 static void apply_cmsg_types(int fd, const struct cfg_cmsg_types *cmsg) 1151 { 1152 static const unsigned int on = 1; 1153 1154 if (cmsg->timestampns) 1155 xsetsockopt(fd, SOL_SOCKET, SO_TIMESTAMPNS_NEW, &on, sizeof(on)); 1156 if (cmsg->tcp_inq) 1157 xsetsockopt(fd, IPPROTO_TCP, TCP_INQ, &on, sizeof(on)); 1158 } 1159 1160 static void parse_cmsg_types(const char *type) 1161 { 1162 char *next = strchr(type, ','); 1163 unsigned int len = 0; 1164 1165 cfg_cmsg_types.cmsg_enabled = 1; 1166 1167 if (next) { 1168 parse_cmsg_types(next + 1); 1169 len = next - type; 1170 } else { 1171 len = strlen(type); 1172 } 1173 1174 if (strncmp(type, "TIMESTAMPNS", len) == 0) { 1175 cfg_cmsg_types.timestampns = 1; 1176 return; 1177 } 1178 1179 if (strncmp(type, "TCPINQ", len) == 0) { 1180 cfg_cmsg_types.tcp_inq = 1; 1181 return; 1182 } 1183 1184 fprintf(stderr, "Unrecognized cmsg option %s\n", type); 1185 exit(1); 1186 } 1187 1188 static void parse_setsock_options(const char *name) 1189 { 1190 char *next = strchr(name, ','); 1191 unsigned int len = 0; 1192 1193 if (next) { 1194 parse_setsock_options(next + 1); 1195 len = next - name; 1196 } else { 1197 len = strlen(name); 1198 } 1199 1200 if (strncmp(name, "TRANSPARENT", len) == 0) { 1201 cfg_sockopt_types.transparent = 1; 1202 return; 1203 } 1204 1205 if (strncmp(name, "MPTFO", len) == 0) { 1206 cfg_sockopt_types.mptfo = 1; 1207 return; 1208 } 1209 1210 fprintf(stderr, "Unrecognized setsockopt option %s\n", name); 1211 exit(1); 1212 } 1213 1214 void xdisconnect(int fd, int addrlen) 1215 { 1216 struct sockaddr_storage empty; 1217 int msec_sleep = 10; 1218 int queued = 1; 1219 int i; 1220 1221 shutdown(fd, SHUT_WR); 1222 1223 /* while until the pending data is completely flushed, the later 1224 * disconnect will bypass/ignore/drop any pending data. 1225 */ 1226 for (i = 0; ; i += msec_sleep) { 1227 if (ioctl(fd, SIOCOUTQ, &queued) < 0) 1228 xerror("can't query out socket queue: %d", errno); 1229 1230 if (!queued) 1231 break; 1232 1233 if (i > poll_timeout) 1234 xerror("timeout while waiting for spool to complete"); 1235 usleep(msec_sleep * 1000); 1236 } 1237 1238 memset(&empty, 0, sizeof(empty)); 1239 empty.ss_family = AF_UNSPEC; 1240 if (connect(fd, (struct sockaddr *)&empty, addrlen) < 0) 1241 xerror("can't disconnect: %d", errno); 1242 } 1243 1244 int main_loop(void) 1245 { 1246 int fd = 0, ret, fd_in = 0; 1247 struct addrinfo *peer; 1248 struct wstate winfo; 1249 1250 if (cfg_input && cfg_sockopt_types.mptfo) { 1251 fd_in = open(cfg_input, O_RDONLY); 1252 if (fd < 0) 1253 xerror("can't open %s:%d", cfg_input, errno); 1254 } 1255 1256 memset(&winfo, 0, sizeof(winfo)); 1257 fd = sock_connect_mptcp(cfg_host, cfg_port, cfg_sock_proto, &peer, fd_in, &winfo); 1258 if (fd < 0) 1259 return 2; 1260 1261 again: 1262 check_getpeername_connect(fd); 1263 1264 SOCK_TEST_TCPULP(fd, cfg_sock_proto); 1265 1266 if (cfg_rcvbuf) 1267 set_rcvbuf(fd, cfg_rcvbuf); 1268 if (cfg_sndbuf) 1269 set_sndbuf(fd, cfg_sndbuf); 1270 if (cfg_cmsg_types.cmsg_enabled) 1271 apply_cmsg_types(fd, &cfg_cmsg_types); 1272 1273 if (cfg_input && !cfg_sockopt_types.mptfo) { 1274 fd_in = open(cfg_input, O_RDONLY); 1275 if (fd < 0) 1276 xerror("can't open %s:%d", cfg_input, errno); 1277 } 1278 1279 ret = copyfd_io(fd_in, fd, 1, 0, &winfo); 1280 if (ret) 1281 return ret; 1282 1283 if (cfg_truncate > 0) { 1284 xdisconnect(fd, peer->ai_addrlen); 1285 } else if (--cfg_repeat > 0) { 1286 xdisconnect(fd, peer->ai_addrlen); 1287 1288 /* the socket could be unblocking at this point, we need the 1289 * connect to be blocking 1290 */ 1291 set_nonblock(fd, false); 1292 if (connect(fd, peer->ai_addr, peer->ai_addrlen)) 1293 xerror("can't reconnect: %d", errno); 1294 if (cfg_input) 1295 close(fd_in); 1296 memset(&winfo, 0, sizeof(winfo)); 1297 goto again; 1298 } else { 1299 close(fd); 1300 } 1301 1302 return 0; 1303 } 1304 1305 int parse_proto(const char *proto) 1306 { 1307 if (!strcasecmp(proto, "MPTCP")) 1308 return IPPROTO_MPTCP; 1309 if (!strcasecmp(proto, "TCP")) 1310 return IPPROTO_TCP; 1311 1312 fprintf(stderr, "Unknown protocol: %s\n.", proto); 1313 die_usage(); 1314 1315 /* silence compiler warning */ 1316 return 0; 1317 } 1318 1319 int parse_mode(const char *mode) 1320 { 1321 if (!strcasecmp(mode, "poll")) 1322 return CFG_MODE_POLL; 1323 if (!strcasecmp(mode, "mmap")) 1324 return CFG_MODE_MMAP; 1325 if (!strcasecmp(mode, "sendfile")) 1326 return CFG_MODE_SENDFILE; 1327 1328 fprintf(stderr, "Unknown test mode: %s\n", mode); 1329 fprintf(stderr, "Supported modes are:\n"); 1330 fprintf(stderr, "\t\t\"poll\" - interleaved read/write using poll()\n"); 1331 fprintf(stderr, "\t\t\"mmap\" - send entire input file (mmap+write), then read response (-l will read input first)\n"); 1332 fprintf(stderr, "\t\t\"sendfile\" - send entire input file (sendfile), then read response (-l will read input first)\n"); 1333 1334 die_usage(); 1335 1336 /* silence compiler warning */ 1337 return 0; 1338 } 1339 1340 int parse_peek(const char *mode) 1341 { 1342 if (!strcasecmp(mode, "saveWithPeek")) 1343 return CFG_WITH_PEEK; 1344 if (!strcasecmp(mode, "saveAfterPeek")) 1345 return CFG_AFTER_PEEK; 1346 1347 fprintf(stderr, "Unknown: %s\n", mode); 1348 fprintf(stderr, "Supported MSG_PEEK mode are:\n"); 1349 fprintf(stderr, 1350 "\t\t\"saveWithPeek\" - recv data with flags 'MSG_PEEK' and save the peek data into file\n"); 1351 fprintf(stderr, 1352 "\t\t\"saveAfterPeek\" - read and save data into file after recv with flags 'MSG_PEEK'\n"); 1353 1354 die_usage(); 1355 1356 /* silence compiler warning */ 1357 return 0; 1358 } 1359 1360 static int parse_int(const char *size) 1361 { 1362 unsigned long s; 1363 1364 errno = 0; 1365 1366 s = strtoul(size, NULL, 0); 1367 1368 if (errno) { 1369 fprintf(stderr, "Invalid sndbuf size %s (%s)\n", 1370 size, strerror(errno)); 1371 die_usage(); 1372 } 1373 1374 if (s > INT_MAX) { 1375 fprintf(stderr, "Invalid sndbuf size %s (%s)\n", 1376 size, strerror(ERANGE)); 1377 die_usage(); 1378 } 1379 1380 return (int)s; 1381 } 1382 1383 static void parse_opts(int argc, char **argv) 1384 { 1385 int c; 1386 1387 while ((c = getopt(argc, argv, "6c:f:hi:I:jlm:M:o:p:P:r:R:s:S:t:T:w:")) != -1) { 1388 switch (c) { 1389 case 'f': 1390 cfg_truncate = atoi(optarg); 1391 1392 /* when receiving a fastclose, ignore PIPE signals and 1393 * all the I/O errors later in the code 1394 */ 1395 if (cfg_truncate < 0) { 1396 cfg_rcv_trunc = true; 1397 signal(SIGPIPE, handle_signal); 1398 } 1399 break; 1400 case 'j': 1401 cfg_join = true; 1402 cfg_mode = CFG_MODE_POLL; 1403 break; 1404 case 'r': 1405 cfg_remove = true; 1406 cfg_mode = CFG_MODE_POLL; 1407 cfg_wait = 400000; 1408 cfg_do_w = atoi(optarg); 1409 if (cfg_do_w <= 0) 1410 cfg_do_w = 50; 1411 break; 1412 case 'i': 1413 cfg_input = optarg; 1414 break; 1415 case 'I': 1416 cfg_repeat = atoi(optarg); 1417 break; 1418 case 'l': 1419 listen_mode = true; 1420 break; 1421 case 'p': 1422 cfg_port = optarg; 1423 break; 1424 case 's': 1425 cfg_sock_proto = parse_proto(optarg); 1426 break; 1427 case 'h': 1428 die_usage(); 1429 break; 1430 case '6': 1431 pf = AF_INET6; 1432 break; 1433 case 't': 1434 poll_timeout = atoi(optarg) * 1000; 1435 if (poll_timeout <= 0) 1436 poll_timeout = -1; 1437 break; 1438 case 'T': 1439 cfg_time = atoi(optarg); 1440 break; 1441 case 'm': 1442 cfg_mode = parse_mode(optarg); 1443 break; 1444 case 'S': 1445 cfg_sndbuf = parse_int(optarg); 1446 break; 1447 case 'R': 1448 cfg_rcvbuf = parse_int(optarg); 1449 break; 1450 case 'w': 1451 cfg_wait = atoi(optarg)*1000000; 1452 break; 1453 case 'M': 1454 cfg_mark = strtol(optarg, NULL, 0); 1455 break; 1456 case 'P': 1457 cfg_peek = parse_peek(optarg); 1458 break; 1459 case 'c': 1460 parse_cmsg_types(optarg); 1461 break; 1462 case 'o': 1463 parse_setsock_options(optarg); 1464 break; 1465 } 1466 } 1467 1468 if (optind + 1 != argc) 1469 die_usage(); 1470 cfg_host = argv[optind]; 1471 1472 if (strchr(cfg_host, ':')) 1473 pf = AF_INET6; 1474 } 1475 1476 int main(int argc, char *argv[]) 1477 { 1478 init_rng(); 1479 1480 signal(SIGUSR1, handle_signal); 1481 parse_opts(argc, argv); 1482 1483 if (listen_mode) { 1484 int fd = sock_listen_mptcp(cfg_host, cfg_port); 1485 1486 if (fd < 0) 1487 return 1; 1488 1489 if (cfg_rcvbuf) 1490 set_rcvbuf(fd, cfg_rcvbuf); 1491 if (cfg_sndbuf) 1492 set_sndbuf(fd, cfg_sndbuf); 1493 if (cfg_mark) 1494 set_mark(fd, cfg_mark); 1495 if (cfg_cmsg_types.cmsg_enabled) 1496 apply_cmsg_types(fd, &cfg_cmsg_types); 1497 1498 return main_loop_s(fd); 1499 } 1500 1501 return main_loop(); 1502 } 1503