1 /* 2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) 3 * (a.k.a. Fault Tolerance or Continuous Replication) 4 * 5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. 6 * Copyright (c) 2016 FUJITSU LIMITED 7 * Copyright (c) 2016 Intel Corporation 8 * 9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2 or 12 * later. See the COPYING file in the top-level directory. 13 */ 14 15 #include "qemu/osdep.h" 16 #include "qemu/error-report.h" 17 #include "trace.h" 18 #include "qemu-common.h" 19 #include "qapi/error.h" 20 #include "net/net.h" 21 #include "net/eth.h" 22 #include "qom/object_interfaces.h" 23 #include "qemu/iov.h" 24 #include "qom/object.h" 25 #include "net/queue.h" 26 #include "chardev/char-fe.h" 27 #include "qemu/sockets.h" 28 #include "net/colo.h" 29 #include "sysemu/iothread.h" 30 31 #define TYPE_COLO_COMPARE "colo-compare" 32 #define COLO_COMPARE(obj) \ 33 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) 34 35 #define COMPARE_READ_LEN_MAX NET_BUFSIZE 36 #define MAX_QUEUE_SIZE 1024 37 38 #define COLO_COMPARE_FREE_PRIMARY 0x01 39 #define COLO_COMPARE_FREE_SECONDARY 0x02 40 41 /* TODO: Should be configurable */ 42 #define REGULAR_PACKET_CHECK_MS 3000 43 44 /* 45 * + CompareState ++ 46 * | | 47 * +---------------+ +---------------+ +---------------+ 48 * | conn list + - > conn + ------- > conn + -- > ...... 49 * +---------------+ +---------------+ +---------------+ 50 * | | | | | | 51 * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+ 52 * |primary | |secondary |primary | |secondary 53 * |packet | |packet + |packet | |packet + 54 * +--------+ +--------+ +--------+ +--------+ 55 * | | | | 56 * +---v----+ +---v----+ +---v----+ +---v----+ 57 * |primary | |secondary |primary | |secondary 58 * |packet | |packet + |packet | |packet + 59 * +--------+ +--------+ +--------+ +--------+ 60 * | | | | 61 * +---v----+ +---v----+ +---v----+ +---v----+ 62 * |primary | |secondary |primary | |secondary 63 * |packet | |packet + |packet | |packet + 64 * +--------+ +--------+ +--------+ +--------+ 65 */ 66 typedef struct CompareState { 67 Object parent; 68 69 char *pri_indev; 70 char *sec_indev; 71 char *outdev; 72 CharBackend chr_pri_in; 73 CharBackend chr_sec_in; 74 CharBackend chr_out; 75 SocketReadState pri_rs; 76 SocketReadState sec_rs; 77 bool vnet_hdr; 78 79 /* 80 * Record the connection that through the NIC 81 * Element type: Connection 82 */ 83 GQueue conn_list; 84 /* Record the connection without repetition */ 85 GHashTable *connection_track_table; 86 87 IOThread *iothread; 88 GMainContext *worker_context; 89 QEMUTimer *packet_check_timer; 90 } CompareState; 91 92 typedef struct CompareClass { 93 ObjectClass parent_class; 94 } CompareClass; 95 96 enum { 97 PRIMARY_IN = 0, 98 SECONDARY_IN, 99 }; 100 101 static int compare_chr_send(CompareState *s, 102 const uint8_t *buf, 103 uint32_t size, 104 uint32_t vnet_hdr_len); 105 106 static gint seq_sorter(Packet *a, Packet *b, gpointer data) 107 { 108 struct tcphdr *atcp, *btcp; 109 110 atcp = (struct tcphdr *)(a->transport_header); 111 btcp = (struct tcphdr *)(b->transport_header); 112 return ntohl(atcp->th_seq) - ntohl(btcp->th_seq); 113 } 114 115 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack) 116 { 117 Packet *pkt = data; 118 struct tcphdr *tcphd; 119 120 tcphd = (struct tcphdr *)pkt->transport_header; 121 122 pkt->tcp_seq = ntohl(tcphd->th_seq); 123 pkt->tcp_ack = ntohl(tcphd->th_ack); 124 *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack; 125 pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data 126 + (tcphd->th_off << 2) - pkt->vnet_hdr_len; 127 pkt->payload_size = pkt->size - pkt->header_size; 128 pkt->seq_end = pkt->tcp_seq + pkt->payload_size; 129 pkt->flags = tcphd->th_flags; 130 } 131 132 /* 133 * Return 1 on success, if return 0 means the 134 * packet will be dropped 135 */ 136 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack) 137 { 138 if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) { 139 if (pkt->ip->ip_p == IPPROTO_TCP) { 140 fill_pkt_tcp_info(pkt, max_ack); 141 g_queue_insert_sorted(queue, 142 pkt, 143 (GCompareDataFunc)seq_sorter, 144 NULL); 145 } else { 146 g_queue_push_tail(queue, pkt); 147 } 148 return 1; 149 } 150 return 0; 151 } 152 153 /* 154 * Return 0 on success, if return -1 means the pkt 155 * is unsupported(arp and ipv6) and will be sent later 156 */ 157 static int packet_enqueue(CompareState *s, int mode, Connection **con) 158 { 159 ConnectionKey key; 160 Packet *pkt = NULL; 161 Connection *conn; 162 163 if (mode == PRIMARY_IN) { 164 pkt = packet_new(s->pri_rs.buf, 165 s->pri_rs.packet_len, 166 s->pri_rs.vnet_hdr_len); 167 } else { 168 pkt = packet_new(s->sec_rs.buf, 169 s->sec_rs.packet_len, 170 s->sec_rs.vnet_hdr_len); 171 } 172 173 if (parse_packet_early(pkt)) { 174 packet_destroy(pkt, NULL); 175 pkt = NULL; 176 return -1; 177 } 178 fill_connection_key(pkt, &key); 179 180 conn = connection_get(s->connection_track_table, 181 &key, 182 &s->conn_list); 183 184 if (!conn->processing) { 185 g_queue_push_tail(&s->conn_list, conn); 186 conn->processing = true; 187 } 188 189 if (mode == PRIMARY_IN) { 190 if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) { 191 error_report("colo compare primary queue size too big," 192 "drop packet"); 193 } 194 } else { 195 if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) { 196 error_report("colo compare secondary queue size too big," 197 "drop packet"); 198 } 199 } 200 *con = conn; 201 202 return 0; 203 } 204 205 static inline bool after(uint32_t seq1, uint32_t seq2) 206 { 207 return (int32_t)(seq1 - seq2) > 0; 208 } 209 210 static void colo_release_primary_pkt(CompareState *s, Packet *pkt) 211 { 212 int ret; 213 ret = compare_chr_send(s, 214 pkt->data, 215 pkt->size, 216 pkt->vnet_hdr_len); 217 if (ret < 0) { 218 error_report("colo send primary packet failed"); 219 } 220 trace_colo_compare_main("packet same and release packet"); 221 packet_destroy(pkt, NULL); 222 } 223 224 /* 225 * The IP packets sent by primary and secondary 226 * will be compared in here 227 * TODO support ip fragment, Out-Of-Order 228 * return: 0 means packet same 229 * > 0 || < 0 means packet different 230 */ 231 static int colo_compare_packet_payload(Packet *ppkt, 232 Packet *spkt, 233 uint16_t poffset, 234 uint16_t soffset, 235 uint16_t len) 236 237 { 238 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 239 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20]; 240 241 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); 242 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); 243 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); 244 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); 245 246 trace_colo_compare_ip_info(ppkt->size, pri_ip_src, 247 pri_ip_dst, spkt->size, 248 sec_ip_src, sec_ip_dst); 249 } 250 251 return memcmp(ppkt->data + poffset, spkt->data + soffset, len); 252 } 253 254 /* 255 * return true means that the payload is consist and 256 * need to make the next comparison, false means do 257 * the checkpoint 258 */ 259 static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt, 260 int8_t *mark, uint32_t max_ack) 261 { 262 *mark = 0; 263 264 if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) { 265 if (colo_compare_packet_payload(ppkt, spkt, 266 ppkt->header_size, spkt->header_size, 267 ppkt->payload_size)) { 268 *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY; 269 return true; 270 } 271 } 272 if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) { 273 if (colo_compare_packet_payload(ppkt, spkt, 274 ppkt->header_size, spkt->header_size, 275 ppkt->payload_size)) { 276 *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY; 277 return true; 278 } 279 } 280 281 /* one part of secondary packet payload still need to be compared */ 282 if (!after(ppkt->seq_end, spkt->seq_end)) { 283 if (colo_compare_packet_payload(ppkt, spkt, 284 ppkt->header_size + ppkt->offset, 285 spkt->header_size + spkt->offset, 286 ppkt->payload_size - ppkt->offset)) { 287 if (!after(ppkt->tcp_ack, max_ack)) { 288 *mark = COLO_COMPARE_FREE_PRIMARY; 289 spkt->offset += ppkt->payload_size - ppkt->offset; 290 return true; 291 } else { 292 /* secondary guest hasn't ack the data, don't send 293 * out this packet 294 */ 295 return false; 296 } 297 } 298 } else { 299 /* primary packet is longer than secondary packet, compare 300 * the same part and mark the primary packet offset 301 */ 302 if (colo_compare_packet_payload(ppkt, spkt, 303 ppkt->header_size + ppkt->offset, 304 spkt->header_size + spkt->offset, 305 spkt->payload_size - spkt->offset)) { 306 *mark = COLO_COMPARE_FREE_SECONDARY; 307 ppkt->offset += spkt->payload_size - spkt->offset; 308 return true; 309 } 310 } 311 312 return false; 313 } 314 315 static void colo_compare_tcp(CompareState *s, Connection *conn) 316 { 317 Packet *ppkt = NULL, *spkt = NULL; 318 int8_t mark; 319 320 /* 321 * If ppkt and spkt have the same payload, but ppkt's ACK 322 * is greater than spkt's ACK, in this case we can not 323 * send the ppkt because it will cause the secondary guest 324 * to miss sending some data in the next. Therefore, we 325 * record the maximum ACK in the current queue at both 326 * primary side and secondary side. Only when the ack is 327 * less than the smaller of the two maximum ack, then we 328 * can ensure that the packet's payload is acknowledged by 329 * primary and secondary. 330 */ 331 uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack; 332 333 pri: 334 if (g_queue_is_empty(&conn->primary_list)) { 335 return; 336 } 337 ppkt = g_queue_pop_head(&conn->primary_list); 338 sec: 339 if (g_queue_is_empty(&conn->secondary_list)) { 340 g_queue_push_head(&conn->primary_list, ppkt); 341 return; 342 } 343 spkt = g_queue_pop_head(&conn->secondary_list); 344 345 if (ppkt->tcp_seq == ppkt->seq_end) { 346 colo_release_primary_pkt(s, ppkt); 347 ppkt = NULL; 348 } 349 350 if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) { 351 trace_colo_compare_main("pri: this packet has compared"); 352 colo_release_primary_pkt(s, ppkt); 353 ppkt = NULL; 354 } 355 356 if (spkt->tcp_seq == spkt->seq_end) { 357 packet_destroy(spkt, NULL); 358 if (!ppkt) { 359 goto pri; 360 } else { 361 goto sec; 362 } 363 } else { 364 if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) { 365 trace_colo_compare_main("sec: this packet has compared"); 366 packet_destroy(spkt, NULL); 367 if (!ppkt) { 368 goto pri; 369 } else { 370 goto sec; 371 } 372 } 373 if (!ppkt) { 374 g_queue_push_head(&conn->secondary_list, spkt); 375 goto pri; 376 } 377 } 378 379 if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) { 380 trace_colo_compare_tcp_info("pri", 381 ppkt->tcp_seq, ppkt->tcp_ack, 382 ppkt->header_size, ppkt->payload_size, 383 ppkt->offset, ppkt->flags); 384 385 trace_colo_compare_tcp_info("sec", 386 spkt->tcp_seq, spkt->tcp_ack, 387 spkt->header_size, spkt->payload_size, 388 spkt->offset, spkt->flags); 389 390 if (mark == COLO_COMPARE_FREE_PRIMARY) { 391 conn->compare_seq = ppkt->seq_end; 392 colo_release_primary_pkt(s, ppkt); 393 g_queue_push_head(&conn->secondary_list, spkt); 394 goto pri; 395 } 396 if (mark == COLO_COMPARE_FREE_SECONDARY) { 397 conn->compare_seq = spkt->seq_end; 398 packet_destroy(spkt, NULL); 399 goto sec; 400 } 401 if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) { 402 conn->compare_seq = ppkt->seq_end; 403 colo_release_primary_pkt(s, ppkt); 404 packet_destroy(spkt, NULL); 405 goto pri; 406 } 407 } else { 408 g_queue_push_head(&conn->primary_list, ppkt); 409 g_queue_push_head(&conn->secondary_list, spkt); 410 411 qemu_hexdump((char *)ppkt->data, stderr, 412 "colo-compare ppkt", ppkt->size); 413 qemu_hexdump((char *)spkt->data, stderr, 414 "colo-compare spkt", spkt->size); 415 416 /* 417 * colo_compare_inconsistent_notify(); 418 * TODO: notice to checkpoint(); 419 */ 420 } 421 } 422 423 424 /* 425 * Called from the compare thread on the primary 426 * for compare udp packet 427 */ 428 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt) 429 { 430 uint16_t network_header_length = ppkt->ip->ip_hl << 2; 431 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len; 432 433 trace_colo_compare_main("compare udp"); 434 435 /* 436 * Because of ppkt and spkt are both in the same connection, 437 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are 438 * same with spkt. In addition, IP header's Identification is a random 439 * field, we can handle it in IP fragmentation function later. 440 * COLO just concern the response net packet payload from primary guest 441 * and secondary guest are same or not, So we ignored all IP header include 442 * other field like TOS,TTL,IP Checksum. we only need to compare 443 * the ip payload here. 444 */ 445 if (ppkt->size != spkt->size) { 446 trace_colo_compare_main("UDP: payload size of packets are different"); 447 return -1; 448 } 449 if (colo_compare_packet_payload(ppkt, spkt, offset, offset, 450 ppkt->size - offset)) { 451 trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size); 452 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size); 453 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 454 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt", 455 ppkt->size); 456 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt", 457 spkt->size); 458 } 459 return -1; 460 } else { 461 return 0; 462 } 463 } 464 465 /* 466 * Called from the compare thread on the primary 467 * for compare icmp packet 468 */ 469 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt) 470 { 471 uint16_t network_header_length = ppkt->ip->ip_hl << 2; 472 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len; 473 474 trace_colo_compare_main("compare icmp"); 475 476 /* 477 * Because of ppkt and spkt are both in the same connection, 478 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are 479 * same with spkt. In addition, IP header's Identification is a random 480 * field, we can handle it in IP fragmentation function later. 481 * COLO just concern the response net packet payload from primary guest 482 * and secondary guest are same or not, So we ignored all IP header include 483 * other field like TOS,TTL,IP Checksum. we only need to compare 484 * the ip payload here. 485 */ 486 if (ppkt->size != spkt->size) { 487 trace_colo_compare_main("ICMP: payload size of packets are different"); 488 return -1; 489 } 490 if (colo_compare_packet_payload(ppkt, spkt, offset, offset, 491 ppkt->size - offset)) { 492 trace_colo_compare_icmp_miscompare("primary pkt size", 493 ppkt->size); 494 trace_colo_compare_icmp_miscompare("Secondary pkt size", 495 spkt->size); 496 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 497 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt", 498 ppkt->size); 499 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt", 500 spkt->size); 501 } 502 return -1; 503 } else { 504 return 0; 505 } 506 } 507 508 /* 509 * Called from the compare thread on the primary 510 * for compare other packet 511 */ 512 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt) 513 { 514 uint16_t offset = ppkt->vnet_hdr_len; 515 516 trace_colo_compare_main("compare other"); 517 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 518 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20]; 519 520 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); 521 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); 522 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); 523 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); 524 525 trace_colo_compare_ip_info(ppkt->size, pri_ip_src, 526 pri_ip_dst, spkt->size, 527 sec_ip_src, sec_ip_dst); 528 } 529 530 if (ppkt->size != spkt->size) { 531 trace_colo_compare_main("Other: payload size of packets are different"); 532 return -1; 533 } 534 return colo_compare_packet_payload(ppkt, spkt, offset, offset, 535 ppkt->size - offset); 536 } 537 538 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time) 539 { 540 int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST); 541 542 if ((now - pkt->creation_ms) > (*check_time)) { 543 trace_colo_old_packet_check_found(pkt->creation_ms); 544 return 0; 545 } else { 546 return 1; 547 } 548 } 549 550 static int colo_old_packet_check_one_conn(Connection *conn, 551 void *user_data) 552 { 553 GList *result = NULL; 554 int64_t check_time = REGULAR_PACKET_CHECK_MS; 555 556 result = g_queue_find_custom(&conn->primary_list, 557 &check_time, 558 (GCompareFunc)colo_old_packet_check_one); 559 560 if (result) { 561 /* Do checkpoint will flush old packet */ 562 /* 563 * TODO: Notify colo frame to do checkpoint. 564 * colo_compare_inconsistent_notify(); 565 */ 566 return 0; 567 } 568 569 return 1; 570 } 571 572 /* 573 * Look for old packets that the secondary hasn't matched, 574 * if we have some then we have to checkpoint to wake 575 * the secondary up. 576 */ 577 static void colo_old_packet_check(void *opaque) 578 { 579 CompareState *s = opaque; 580 581 /* 582 * If we find one old packet, stop finding job and notify 583 * COLO frame do checkpoint. 584 */ 585 g_queue_find_custom(&s->conn_list, NULL, 586 (GCompareFunc)colo_old_packet_check_one_conn); 587 } 588 589 static void colo_compare_packet(CompareState *s, Connection *conn, 590 int (*HandlePacket)(Packet *spkt, 591 Packet *ppkt)) 592 { 593 Packet *pkt = NULL; 594 GList *result = NULL; 595 596 while (!g_queue_is_empty(&conn->primary_list) && 597 !g_queue_is_empty(&conn->secondary_list)) { 598 pkt = g_queue_pop_head(&conn->primary_list); 599 result = g_queue_find_custom(&conn->secondary_list, 600 pkt, (GCompareFunc)HandlePacket); 601 602 if (result) { 603 colo_release_primary_pkt(s, pkt); 604 g_queue_remove(&conn->secondary_list, result->data); 605 } else { 606 /* 607 * If one packet arrive late, the secondary_list or 608 * primary_list will be empty, so we can't compare it 609 * until next comparison. 610 */ 611 trace_colo_compare_main("packet different"); 612 g_queue_push_head(&conn->primary_list, pkt); 613 /* TODO: colo_notify_checkpoint();*/ 614 break; 615 } 616 } 617 } 618 619 /* 620 * Called from the compare thread on the primary 621 * for compare packet with secondary list of the 622 * specified connection when a new packet was 623 * queued to it. 624 */ 625 static void colo_compare_connection(void *opaque, void *user_data) 626 { 627 CompareState *s = user_data; 628 Connection *conn = opaque; 629 630 switch (conn->ip_proto) { 631 case IPPROTO_TCP: 632 colo_compare_tcp(s, conn); 633 break; 634 case IPPROTO_UDP: 635 colo_compare_packet(s, conn, colo_packet_compare_udp); 636 break; 637 case IPPROTO_ICMP: 638 colo_compare_packet(s, conn, colo_packet_compare_icmp); 639 break; 640 default: 641 colo_compare_packet(s, conn, colo_packet_compare_other); 642 break; 643 } 644 } 645 646 static int compare_chr_send(CompareState *s, 647 const uint8_t *buf, 648 uint32_t size, 649 uint32_t vnet_hdr_len) 650 { 651 int ret = 0; 652 uint32_t len = htonl(size); 653 654 if (!size) { 655 return 0; 656 } 657 658 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); 659 if (ret != sizeof(len)) { 660 goto err; 661 } 662 663 if (s->vnet_hdr) { 664 /* 665 * We send vnet header len make other module(like filter-redirector) 666 * know how to parse net packet correctly. 667 */ 668 len = htonl(vnet_hdr_len); 669 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); 670 if (ret != sizeof(len)) { 671 goto err; 672 } 673 } 674 675 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size); 676 if (ret != size) { 677 goto err; 678 } 679 680 return 0; 681 682 err: 683 return ret < 0 ? ret : -EIO; 684 } 685 686 static int compare_chr_can_read(void *opaque) 687 { 688 return COMPARE_READ_LEN_MAX; 689 } 690 691 /* 692 * Called from the main thread on the primary for packets 693 * arriving over the socket from the primary. 694 */ 695 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) 696 { 697 CompareState *s = COLO_COMPARE(opaque); 698 int ret; 699 700 ret = net_fill_rstate(&s->pri_rs, buf, size); 701 if (ret == -1) { 702 qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL, 703 NULL, NULL, true); 704 error_report("colo-compare primary_in error"); 705 } 706 } 707 708 /* 709 * Called from the main thread on the primary for packets 710 * arriving over the socket from the secondary. 711 */ 712 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) 713 { 714 CompareState *s = COLO_COMPARE(opaque); 715 int ret; 716 717 ret = net_fill_rstate(&s->sec_rs, buf, size); 718 if (ret == -1) { 719 qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL, 720 NULL, NULL, true); 721 error_report("colo-compare secondary_in error"); 722 } 723 } 724 725 /* 726 * Check old packet regularly so it can watch for any packets 727 * that the secondary hasn't produced equivalents of. 728 */ 729 static void check_old_packet_regular(void *opaque) 730 { 731 CompareState *s = opaque; 732 733 /* if have old packet we will notify checkpoint */ 734 colo_old_packet_check(s); 735 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + 736 REGULAR_PACKET_CHECK_MS); 737 } 738 739 static void colo_compare_timer_init(CompareState *s) 740 { 741 AioContext *ctx = iothread_get_aio_context(s->iothread); 742 743 s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL, 744 SCALE_MS, check_old_packet_regular, 745 s); 746 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + 747 REGULAR_PACKET_CHECK_MS); 748 } 749 750 static void colo_compare_timer_del(CompareState *s) 751 { 752 if (s->packet_check_timer) { 753 timer_del(s->packet_check_timer); 754 timer_free(s->packet_check_timer); 755 s->packet_check_timer = NULL; 756 } 757 } 758 759 static void colo_compare_iothread(CompareState *s) 760 { 761 object_ref(OBJECT(s->iothread)); 762 s->worker_context = iothread_get_g_main_context(s->iothread); 763 764 qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read, 765 compare_pri_chr_in, NULL, NULL, 766 s, s->worker_context, true); 767 qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read, 768 compare_sec_chr_in, NULL, NULL, 769 s, s->worker_context, true); 770 771 colo_compare_timer_init(s); 772 } 773 774 static char *compare_get_pri_indev(Object *obj, Error **errp) 775 { 776 CompareState *s = COLO_COMPARE(obj); 777 778 return g_strdup(s->pri_indev); 779 } 780 781 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp) 782 { 783 CompareState *s = COLO_COMPARE(obj); 784 785 g_free(s->pri_indev); 786 s->pri_indev = g_strdup(value); 787 } 788 789 static char *compare_get_sec_indev(Object *obj, Error **errp) 790 { 791 CompareState *s = COLO_COMPARE(obj); 792 793 return g_strdup(s->sec_indev); 794 } 795 796 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp) 797 { 798 CompareState *s = COLO_COMPARE(obj); 799 800 g_free(s->sec_indev); 801 s->sec_indev = g_strdup(value); 802 } 803 804 static char *compare_get_outdev(Object *obj, Error **errp) 805 { 806 CompareState *s = COLO_COMPARE(obj); 807 808 return g_strdup(s->outdev); 809 } 810 811 static void compare_set_outdev(Object *obj, const char *value, Error **errp) 812 { 813 CompareState *s = COLO_COMPARE(obj); 814 815 g_free(s->outdev); 816 s->outdev = g_strdup(value); 817 } 818 819 static bool compare_get_vnet_hdr(Object *obj, Error **errp) 820 { 821 CompareState *s = COLO_COMPARE(obj); 822 823 return s->vnet_hdr; 824 } 825 826 static void compare_set_vnet_hdr(Object *obj, 827 bool value, 828 Error **errp) 829 { 830 CompareState *s = COLO_COMPARE(obj); 831 832 s->vnet_hdr = value; 833 } 834 835 static void compare_pri_rs_finalize(SocketReadState *pri_rs) 836 { 837 CompareState *s = container_of(pri_rs, CompareState, pri_rs); 838 Connection *conn = NULL; 839 840 if (packet_enqueue(s, PRIMARY_IN, &conn)) { 841 trace_colo_compare_main("primary: unsupported packet in"); 842 compare_chr_send(s, 843 pri_rs->buf, 844 pri_rs->packet_len, 845 pri_rs->vnet_hdr_len); 846 } else { 847 /* compare packet in the specified connection */ 848 colo_compare_connection(conn, s); 849 } 850 } 851 852 static void compare_sec_rs_finalize(SocketReadState *sec_rs) 853 { 854 CompareState *s = container_of(sec_rs, CompareState, sec_rs); 855 Connection *conn = NULL; 856 857 if (packet_enqueue(s, SECONDARY_IN, &conn)) { 858 trace_colo_compare_main("secondary: unsupported packet in"); 859 } else { 860 /* compare packet in the specified connection */ 861 colo_compare_connection(conn, s); 862 } 863 } 864 865 866 /* 867 * Return 0 is success. 868 * Return 1 is failed. 869 */ 870 static int find_and_check_chardev(Chardev **chr, 871 char *chr_name, 872 Error **errp) 873 { 874 *chr = qemu_chr_find(chr_name); 875 if (*chr == NULL) { 876 error_setg(errp, "Device '%s' not found", 877 chr_name); 878 return 1; 879 } 880 881 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) { 882 error_setg(errp, "chardev \"%s\" is not reconnectable", 883 chr_name); 884 return 1; 885 } 886 887 return 0; 888 } 889 890 /* 891 * Called from the main thread on the primary 892 * to setup colo-compare. 893 */ 894 static void colo_compare_complete(UserCreatable *uc, Error **errp) 895 { 896 CompareState *s = COLO_COMPARE(uc); 897 Chardev *chr; 898 899 if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) { 900 error_setg(errp, "colo compare needs 'primary_in' ," 901 "'secondary_in','outdev','iothread' property set"); 902 return; 903 } else if (!strcmp(s->pri_indev, s->outdev) || 904 !strcmp(s->sec_indev, s->outdev) || 905 !strcmp(s->pri_indev, s->sec_indev)) { 906 error_setg(errp, "'indev' and 'outdev' could not be same " 907 "for compare module"); 908 return; 909 } 910 911 if (find_and_check_chardev(&chr, s->pri_indev, errp) || 912 !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) { 913 return; 914 } 915 916 if (find_and_check_chardev(&chr, s->sec_indev, errp) || 917 !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) { 918 return; 919 } 920 921 if (find_and_check_chardev(&chr, s->outdev, errp) || 922 !qemu_chr_fe_init(&s->chr_out, chr, errp)) { 923 return; 924 } 925 926 net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr); 927 net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr); 928 929 g_queue_init(&s->conn_list); 930 931 s->connection_track_table = g_hash_table_new_full(connection_key_hash, 932 connection_key_equal, 933 g_free, 934 connection_destroy); 935 936 colo_compare_iothread(s); 937 return; 938 } 939 940 static void colo_flush_packets(void *opaque, void *user_data) 941 { 942 CompareState *s = user_data; 943 Connection *conn = opaque; 944 Packet *pkt = NULL; 945 946 while (!g_queue_is_empty(&conn->primary_list)) { 947 pkt = g_queue_pop_head(&conn->primary_list); 948 compare_chr_send(s, 949 pkt->data, 950 pkt->size, 951 pkt->vnet_hdr_len); 952 packet_destroy(pkt, NULL); 953 } 954 while (!g_queue_is_empty(&conn->secondary_list)) { 955 pkt = g_queue_pop_head(&conn->secondary_list); 956 packet_destroy(pkt, NULL); 957 } 958 } 959 960 static void colo_compare_class_init(ObjectClass *oc, void *data) 961 { 962 UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc); 963 964 ucc->complete = colo_compare_complete; 965 } 966 967 static void colo_compare_init(Object *obj) 968 { 969 CompareState *s = COLO_COMPARE(obj); 970 971 object_property_add_str(obj, "primary_in", 972 compare_get_pri_indev, compare_set_pri_indev, 973 NULL); 974 object_property_add_str(obj, "secondary_in", 975 compare_get_sec_indev, compare_set_sec_indev, 976 NULL); 977 object_property_add_str(obj, "outdev", 978 compare_get_outdev, compare_set_outdev, 979 NULL); 980 object_property_add_link(obj, "iothread", TYPE_IOTHREAD, 981 (Object **)&s->iothread, 982 object_property_allow_set_link, 983 OBJ_PROP_LINK_UNREF_ON_RELEASE, NULL); 984 985 s->vnet_hdr = false; 986 object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr, 987 compare_set_vnet_hdr, NULL); 988 } 989 990 static void colo_compare_finalize(Object *obj) 991 { 992 CompareState *s = COLO_COMPARE(obj); 993 994 qemu_chr_fe_deinit(&s->chr_pri_in, false); 995 qemu_chr_fe_deinit(&s->chr_sec_in, false); 996 qemu_chr_fe_deinit(&s->chr_out, false); 997 if (s->iothread) { 998 colo_compare_timer_del(s); 999 } 1000 /* Release all unhandled packets after compare thead exited */ 1001 g_queue_foreach(&s->conn_list, colo_flush_packets, s); 1002 1003 g_queue_clear(&s->conn_list); 1004 1005 if (s->connection_track_table) { 1006 g_hash_table_destroy(s->connection_track_table); 1007 } 1008 1009 if (s->iothread) { 1010 object_unref(OBJECT(s->iothread)); 1011 } 1012 g_free(s->pri_indev); 1013 g_free(s->sec_indev); 1014 g_free(s->outdev); 1015 } 1016 1017 static const TypeInfo colo_compare_info = { 1018 .name = TYPE_COLO_COMPARE, 1019 .parent = TYPE_OBJECT, 1020 .instance_size = sizeof(CompareState), 1021 .instance_init = colo_compare_init, 1022 .instance_finalize = colo_compare_finalize, 1023 .class_size = sizeof(CompareClass), 1024 .class_init = colo_compare_class_init, 1025 .interfaces = (InterfaceInfo[]) { 1026 { TYPE_USER_CREATABLE }, 1027 { } 1028 } 1029 }; 1030 1031 static void register_types(void) 1032 { 1033 type_register_static(&colo_compare_info); 1034 } 1035 1036 type_init(register_types); 1037