1 /* 2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) 3 * (a.k.a. Fault Tolerance or Continuous Replication) 4 * 5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. 6 * Copyright (c) 2016 FUJITSU LIMITED 7 * Copyright (c) 2016 Intel Corporation 8 * 9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2 or 12 * later. See the COPYING file in the top-level directory. 13 */ 14 15 #include "qemu/osdep.h" 16 #include "qemu-common.h" 17 #include "qemu/error-report.h" 18 #include "trace.h" 19 #include "qapi/error.h" 20 #include "net/net.h" 21 #include "net/eth.h" 22 #include "qom/object_interfaces.h" 23 #include "qemu/iov.h" 24 #include "qom/object.h" 25 #include "net/queue.h" 26 #include "chardev/char-fe.h" 27 #include "qemu/sockets.h" 28 #include "colo.h" 29 #include "sysemu/iothread.h" 30 #include "net/colo-compare.h" 31 #include "migration/colo.h" 32 #include "migration/migration.h" 33 #include "util.h" 34 35 #define TYPE_COLO_COMPARE "colo-compare" 36 #define COLO_COMPARE(obj) \ 37 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) 38 39 static QTAILQ_HEAD(, CompareState) net_compares = 40 QTAILQ_HEAD_INITIALIZER(net_compares); 41 42 static NotifierList colo_compare_notifiers = 43 NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers); 44 45 #define COMPARE_READ_LEN_MAX NET_BUFSIZE 46 #define MAX_QUEUE_SIZE 1024 47 48 #define COLO_COMPARE_FREE_PRIMARY 0x01 49 #define COLO_COMPARE_FREE_SECONDARY 0x02 50 51 #define REGULAR_PACKET_CHECK_MS 3000 52 #define DEFAULT_TIME_OUT_MS 3000 53 54 static QemuMutex event_mtx; 55 static QemuCond event_complete_cond; 56 static int event_unhandled_count; 57 58 /* 59 * + CompareState ++ 60 * | | 61 * +---------------+ +---------------+ +---------------+ 62 * | conn list + - > conn + ------- > conn + -- > ...... 63 * +---------------+ +---------------+ +---------------+ 64 * | | | | | | 65 * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+ 66 * |primary | |secondary |primary | |secondary 67 * |packet | |packet + |packet | |packet + 68 * +--------+ +--------+ +--------+ +--------+ 69 * | | | | 70 * +---v----+ +---v----+ +---v----+ +---v----+ 71 * |primary | |secondary |primary | |secondary 72 * |packet | |packet + |packet | |packet + 73 * +--------+ +--------+ +--------+ +--------+ 74 * | | | | 75 * +---v----+ +---v----+ +---v----+ +---v----+ 76 * |primary | |secondary |primary | |secondary 77 * |packet | |packet + |packet | |packet + 78 * +--------+ +--------+ +--------+ +--------+ 79 */ 80 typedef struct CompareState { 81 Object parent; 82 83 char *pri_indev; 84 char *sec_indev; 85 char *outdev; 86 char *notify_dev; 87 CharBackend chr_pri_in; 88 CharBackend chr_sec_in; 89 CharBackend chr_out; 90 CharBackend chr_notify_dev; 91 SocketReadState pri_rs; 92 SocketReadState sec_rs; 93 SocketReadState notify_rs; 94 bool vnet_hdr; 95 uint32_t compare_timeout; 96 uint32_t expired_scan_cycle; 97 98 /* 99 * Record the connection that through the NIC 100 * Element type: Connection 101 */ 102 GQueue conn_list; 103 /* Record the connection without repetition */ 104 GHashTable *connection_track_table; 105 106 IOThread *iothread; 107 GMainContext *worker_context; 108 QEMUTimer *packet_check_timer; 109 110 QEMUBH *event_bh; 111 enum colo_event event; 112 113 QTAILQ_ENTRY(CompareState) next; 114 } CompareState; 115 116 typedef struct CompareClass { 117 ObjectClass parent_class; 118 } CompareClass; 119 120 enum { 121 PRIMARY_IN = 0, 122 SECONDARY_IN, 123 }; 124 125 126 static int compare_chr_send(CompareState *s, 127 const uint8_t *buf, 128 uint32_t size, 129 uint32_t vnet_hdr_len, 130 bool notify_remote_frame); 131 132 static bool packet_matches_str(const char *str, 133 const uint8_t *buf, 134 uint32_t packet_len) 135 { 136 if (packet_len != strlen(str)) { 137 return false; 138 } 139 140 return !memcmp(str, buf, strlen(str)); 141 } 142 143 static void notify_remote_frame(CompareState *s) 144 { 145 char msg[] = "DO_CHECKPOINT"; 146 int ret = 0; 147 148 ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); 149 if (ret < 0) { 150 error_report("Notify Xen COLO-frame failed"); 151 } 152 } 153 154 static void colo_compare_inconsistency_notify(CompareState *s) 155 { 156 if (s->notify_dev) { 157 notify_remote_frame(s); 158 } else { 159 notifier_list_notify(&colo_compare_notifiers, 160 migrate_get_current()); 161 } 162 } 163 164 static gint seq_sorter(Packet *a, Packet *b, gpointer data) 165 { 166 struct tcp_hdr *atcp, *btcp; 167 168 atcp = (struct tcp_hdr *)(a->transport_header); 169 btcp = (struct tcp_hdr *)(b->transport_header); 170 return ntohl(atcp->th_seq) - ntohl(btcp->th_seq); 171 } 172 173 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack) 174 { 175 Packet *pkt = data; 176 struct tcp_hdr *tcphd; 177 178 tcphd = (struct tcp_hdr *)pkt->transport_header; 179 180 pkt->tcp_seq = ntohl(tcphd->th_seq); 181 pkt->tcp_ack = ntohl(tcphd->th_ack); 182 *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack; 183 pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data 184 + (tcphd->th_off << 2) - pkt->vnet_hdr_len; 185 pkt->payload_size = pkt->size - pkt->header_size; 186 pkt->seq_end = pkt->tcp_seq + pkt->payload_size; 187 pkt->flags = tcphd->th_flags; 188 } 189 190 /* 191 * Return 1 on success, if return 0 means the 192 * packet will be dropped 193 */ 194 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack) 195 { 196 if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) { 197 if (pkt->ip->ip_p == IPPROTO_TCP) { 198 fill_pkt_tcp_info(pkt, max_ack); 199 g_queue_insert_sorted(queue, 200 pkt, 201 (GCompareDataFunc)seq_sorter, 202 NULL); 203 } else { 204 g_queue_push_tail(queue, pkt); 205 } 206 return 1; 207 } 208 return 0; 209 } 210 211 /* 212 * Return 0 on success, if return -1 means the pkt 213 * is unsupported(arp and ipv6) and will be sent later 214 */ 215 static int packet_enqueue(CompareState *s, int mode, Connection **con) 216 { 217 ConnectionKey key; 218 Packet *pkt = NULL; 219 Connection *conn; 220 221 if (mode == PRIMARY_IN) { 222 pkt = packet_new(s->pri_rs.buf, 223 s->pri_rs.packet_len, 224 s->pri_rs.vnet_hdr_len); 225 } else { 226 pkt = packet_new(s->sec_rs.buf, 227 s->sec_rs.packet_len, 228 s->sec_rs.vnet_hdr_len); 229 } 230 231 if (parse_packet_early(pkt)) { 232 packet_destroy(pkt, NULL); 233 pkt = NULL; 234 return -1; 235 } 236 fill_connection_key(pkt, &key); 237 238 conn = connection_get(s->connection_track_table, 239 &key, 240 &s->conn_list); 241 242 if (!conn->processing) { 243 g_queue_push_tail(&s->conn_list, conn); 244 conn->processing = true; 245 } 246 247 if (mode == PRIMARY_IN) { 248 if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) { 249 error_report("colo compare primary queue size too big," 250 "drop packet"); 251 } 252 } else { 253 if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) { 254 error_report("colo compare secondary queue size too big," 255 "drop packet"); 256 } 257 } 258 *con = conn; 259 260 return 0; 261 } 262 263 static inline bool after(uint32_t seq1, uint32_t seq2) 264 { 265 return (int32_t)(seq1 - seq2) > 0; 266 } 267 268 static void colo_release_primary_pkt(CompareState *s, Packet *pkt) 269 { 270 int ret; 271 ret = compare_chr_send(s, 272 pkt->data, 273 pkt->size, 274 pkt->vnet_hdr_len, 275 false); 276 if (ret < 0) { 277 error_report("colo send primary packet failed"); 278 } 279 trace_colo_compare_main("packet same and release packet"); 280 packet_destroy(pkt, NULL); 281 } 282 283 /* 284 * The IP packets sent by primary and secondary 285 * will be compared in here 286 * TODO support ip fragment, Out-Of-Order 287 * return: 0 means packet same 288 * > 0 || < 0 means packet different 289 */ 290 static int colo_compare_packet_payload(Packet *ppkt, 291 Packet *spkt, 292 uint16_t poffset, 293 uint16_t soffset, 294 uint16_t len) 295 296 { 297 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 298 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20]; 299 300 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); 301 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); 302 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); 303 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); 304 305 trace_colo_compare_ip_info(ppkt->size, pri_ip_src, 306 pri_ip_dst, spkt->size, 307 sec_ip_src, sec_ip_dst); 308 } 309 310 return memcmp(ppkt->data + poffset, spkt->data + soffset, len); 311 } 312 313 /* 314 * return true means that the payload is consist and 315 * need to make the next comparison, false means do 316 * the checkpoint 317 */ 318 static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt, 319 int8_t *mark, uint32_t max_ack) 320 { 321 *mark = 0; 322 323 if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) { 324 if (!colo_compare_packet_payload(ppkt, spkt, 325 ppkt->header_size, spkt->header_size, 326 ppkt->payload_size)) { 327 *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY; 328 return true; 329 } 330 } 331 332 /* one part of secondary packet payload still need to be compared */ 333 if (!after(ppkt->seq_end, spkt->seq_end)) { 334 if (!colo_compare_packet_payload(ppkt, spkt, 335 ppkt->header_size + ppkt->offset, 336 spkt->header_size + spkt->offset, 337 ppkt->payload_size - ppkt->offset)) { 338 if (!after(ppkt->tcp_ack, max_ack)) { 339 *mark = COLO_COMPARE_FREE_PRIMARY; 340 spkt->offset += ppkt->payload_size - ppkt->offset; 341 return true; 342 } else { 343 /* secondary guest hasn't ack the data, don't send 344 * out this packet 345 */ 346 return false; 347 } 348 } 349 } else { 350 /* primary packet is longer than secondary packet, compare 351 * the same part and mark the primary packet offset 352 */ 353 if (!colo_compare_packet_payload(ppkt, spkt, 354 ppkt->header_size + ppkt->offset, 355 spkt->header_size + spkt->offset, 356 spkt->payload_size - spkt->offset)) { 357 *mark = COLO_COMPARE_FREE_SECONDARY; 358 ppkt->offset += spkt->payload_size - spkt->offset; 359 return true; 360 } 361 } 362 363 return false; 364 } 365 366 static void colo_compare_tcp(CompareState *s, Connection *conn) 367 { 368 Packet *ppkt = NULL, *spkt = NULL; 369 int8_t mark; 370 371 /* 372 * If ppkt and spkt have the same payload, but ppkt's ACK 373 * is greater than spkt's ACK, in this case we can not 374 * send the ppkt because it will cause the secondary guest 375 * to miss sending some data in the next. Therefore, we 376 * record the maximum ACK in the current queue at both 377 * primary side and secondary side. Only when the ack is 378 * less than the smaller of the two maximum ack, then we 379 * can ensure that the packet's payload is acknowledged by 380 * primary and secondary. 381 */ 382 uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack; 383 384 pri: 385 if (g_queue_is_empty(&conn->primary_list)) { 386 return; 387 } 388 ppkt = g_queue_pop_head(&conn->primary_list); 389 sec: 390 if (g_queue_is_empty(&conn->secondary_list)) { 391 g_queue_push_head(&conn->primary_list, ppkt); 392 return; 393 } 394 spkt = g_queue_pop_head(&conn->secondary_list); 395 396 if (ppkt->tcp_seq == ppkt->seq_end) { 397 colo_release_primary_pkt(s, ppkt); 398 ppkt = NULL; 399 } 400 401 if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) { 402 trace_colo_compare_main("pri: this packet has compared"); 403 colo_release_primary_pkt(s, ppkt); 404 ppkt = NULL; 405 } 406 407 if (spkt->tcp_seq == spkt->seq_end) { 408 packet_destroy(spkt, NULL); 409 if (!ppkt) { 410 goto pri; 411 } else { 412 goto sec; 413 } 414 } else { 415 if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) { 416 trace_colo_compare_main("sec: this packet has compared"); 417 packet_destroy(spkt, NULL); 418 if (!ppkt) { 419 goto pri; 420 } else { 421 goto sec; 422 } 423 } 424 if (!ppkt) { 425 g_queue_push_head(&conn->secondary_list, spkt); 426 goto pri; 427 } 428 } 429 430 if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) { 431 trace_colo_compare_tcp_info("pri", 432 ppkt->tcp_seq, ppkt->tcp_ack, 433 ppkt->header_size, ppkt->payload_size, 434 ppkt->offset, ppkt->flags); 435 436 trace_colo_compare_tcp_info("sec", 437 spkt->tcp_seq, spkt->tcp_ack, 438 spkt->header_size, spkt->payload_size, 439 spkt->offset, spkt->flags); 440 441 if (mark == COLO_COMPARE_FREE_PRIMARY) { 442 conn->compare_seq = ppkt->seq_end; 443 colo_release_primary_pkt(s, ppkt); 444 g_queue_push_head(&conn->secondary_list, spkt); 445 goto pri; 446 } 447 if (mark == COLO_COMPARE_FREE_SECONDARY) { 448 conn->compare_seq = spkt->seq_end; 449 packet_destroy(spkt, NULL); 450 goto sec; 451 } 452 if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) { 453 conn->compare_seq = ppkt->seq_end; 454 colo_release_primary_pkt(s, ppkt); 455 packet_destroy(spkt, NULL); 456 goto pri; 457 } 458 } else { 459 g_queue_push_head(&conn->primary_list, ppkt); 460 g_queue_push_head(&conn->secondary_list, spkt); 461 462 qemu_hexdump((char *)ppkt->data, stderr, 463 "colo-compare ppkt", ppkt->size); 464 qemu_hexdump((char *)spkt->data, stderr, 465 "colo-compare spkt", spkt->size); 466 467 colo_compare_inconsistency_notify(s); 468 } 469 } 470 471 472 /* 473 * Called from the compare thread on the primary 474 * for compare udp packet 475 */ 476 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt) 477 { 478 uint16_t network_header_length = ppkt->ip->ip_hl << 2; 479 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len; 480 481 trace_colo_compare_main("compare udp"); 482 483 /* 484 * Because of ppkt and spkt are both in the same connection, 485 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are 486 * same with spkt. In addition, IP header's Identification is a random 487 * field, we can handle it in IP fragmentation function later. 488 * COLO just concern the response net packet payload from primary guest 489 * and secondary guest are same or not, So we ignored all IP header include 490 * other field like TOS,TTL,IP Checksum. we only need to compare 491 * the ip payload here. 492 */ 493 if (ppkt->size != spkt->size) { 494 trace_colo_compare_main("UDP: payload size of packets are different"); 495 return -1; 496 } 497 if (colo_compare_packet_payload(ppkt, spkt, offset, offset, 498 ppkt->size - offset)) { 499 trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size); 500 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size); 501 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 502 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt", 503 ppkt->size); 504 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt", 505 spkt->size); 506 } 507 return -1; 508 } else { 509 return 0; 510 } 511 } 512 513 /* 514 * Called from the compare thread on the primary 515 * for compare icmp packet 516 */ 517 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt) 518 { 519 uint16_t network_header_length = ppkt->ip->ip_hl << 2; 520 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len; 521 522 trace_colo_compare_main("compare icmp"); 523 524 /* 525 * Because of ppkt and spkt are both in the same connection, 526 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are 527 * same with spkt. In addition, IP header's Identification is a random 528 * field, we can handle it in IP fragmentation function later. 529 * COLO just concern the response net packet payload from primary guest 530 * and secondary guest are same or not, So we ignored all IP header include 531 * other field like TOS,TTL,IP Checksum. we only need to compare 532 * the ip payload here. 533 */ 534 if (ppkt->size != spkt->size) { 535 trace_colo_compare_main("ICMP: payload size of packets are different"); 536 return -1; 537 } 538 if (colo_compare_packet_payload(ppkt, spkt, offset, offset, 539 ppkt->size - offset)) { 540 trace_colo_compare_icmp_miscompare("primary pkt size", 541 ppkt->size); 542 trace_colo_compare_icmp_miscompare("Secondary pkt size", 543 spkt->size); 544 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 545 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt", 546 ppkt->size); 547 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt", 548 spkt->size); 549 } 550 return -1; 551 } else { 552 return 0; 553 } 554 } 555 556 /* 557 * Called from the compare thread on the primary 558 * for compare other packet 559 */ 560 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt) 561 { 562 uint16_t offset = ppkt->vnet_hdr_len; 563 564 trace_colo_compare_main("compare other"); 565 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 566 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20]; 567 568 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); 569 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); 570 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); 571 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); 572 573 trace_colo_compare_ip_info(ppkt->size, pri_ip_src, 574 pri_ip_dst, spkt->size, 575 sec_ip_src, sec_ip_dst); 576 } 577 578 if (ppkt->size != spkt->size) { 579 trace_colo_compare_main("Other: payload size of packets are different"); 580 return -1; 581 } 582 return colo_compare_packet_payload(ppkt, spkt, offset, offset, 583 ppkt->size - offset); 584 } 585 586 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time) 587 { 588 int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST); 589 590 if ((now - pkt->creation_ms) > (*check_time)) { 591 trace_colo_old_packet_check_found(pkt->creation_ms); 592 return 0; 593 } else { 594 return 1; 595 } 596 } 597 598 void colo_compare_register_notifier(Notifier *notify) 599 { 600 notifier_list_add(&colo_compare_notifiers, notify); 601 } 602 603 void colo_compare_unregister_notifier(Notifier *notify) 604 { 605 notifier_remove(notify); 606 } 607 608 static int colo_old_packet_check_one_conn(Connection *conn, 609 CompareState *s) 610 { 611 GList *result = NULL; 612 613 result = g_queue_find_custom(&conn->primary_list, 614 &s->compare_timeout, 615 (GCompareFunc)colo_old_packet_check_one); 616 617 if (result) { 618 /* Do checkpoint will flush old packet */ 619 colo_compare_inconsistency_notify(s); 620 return 0; 621 } 622 623 return 1; 624 } 625 626 /* 627 * Look for old packets that the secondary hasn't matched, 628 * if we have some then we have to checkpoint to wake 629 * the secondary up. 630 */ 631 static void colo_old_packet_check(void *opaque) 632 { 633 CompareState *s = opaque; 634 635 /* 636 * If we find one old packet, stop finding job and notify 637 * COLO frame do checkpoint. 638 */ 639 g_queue_find_custom(&s->conn_list, s, 640 (GCompareFunc)colo_old_packet_check_one_conn); 641 } 642 643 static void colo_compare_packet(CompareState *s, Connection *conn, 644 int (*HandlePacket)(Packet *spkt, 645 Packet *ppkt)) 646 { 647 Packet *pkt = NULL; 648 GList *result = NULL; 649 650 while (!g_queue_is_empty(&conn->primary_list) && 651 !g_queue_is_empty(&conn->secondary_list)) { 652 pkt = g_queue_pop_head(&conn->primary_list); 653 result = g_queue_find_custom(&conn->secondary_list, 654 pkt, (GCompareFunc)HandlePacket); 655 656 if (result) { 657 colo_release_primary_pkt(s, pkt); 658 g_queue_remove(&conn->secondary_list, result->data); 659 } else { 660 /* 661 * If one packet arrive late, the secondary_list or 662 * primary_list will be empty, so we can't compare it 663 * until next comparison. If the packets in the list are 664 * timeout, it will trigger a checkpoint request. 665 */ 666 trace_colo_compare_main("packet different"); 667 g_queue_push_head(&conn->primary_list, pkt); 668 669 colo_compare_inconsistency_notify(s); 670 break; 671 } 672 } 673 } 674 675 /* 676 * Called from the compare thread on the primary 677 * for compare packet with secondary list of the 678 * specified connection when a new packet was 679 * queued to it. 680 */ 681 static void colo_compare_connection(void *opaque, void *user_data) 682 { 683 CompareState *s = user_data; 684 Connection *conn = opaque; 685 686 switch (conn->ip_proto) { 687 case IPPROTO_TCP: 688 colo_compare_tcp(s, conn); 689 break; 690 case IPPROTO_UDP: 691 colo_compare_packet(s, conn, colo_packet_compare_udp); 692 break; 693 case IPPROTO_ICMP: 694 colo_compare_packet(s, conn, colo_packet_compare_icmp); 695 break; 696 default: 697 colo_compare_packet(s, conn, colo_packet_compare_other); 698 break; 699 } 700 } 701 702 static int compare_chr_send(CompareState *s, 703 const uint8_t *buf, 704 uint32_t size, 705 uint32_t vnet_hdr_len, 706 bool notify_remote_frame) 707 { 708 int ret = 0; 709 uint32_t len = htonl(size); 710 711 if (!size) { 712 return 0; 713 } 714 715 if (notify_remote_frame) { 716 ret = qemu_chr_fe_write_all(&s->chr_notify_dev, 717 (uint8_t *)&len, 718 sizeof(len)); 719 } else { 720 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); 721 } 722 723 if (ret != sizeof(len)) { 724 goto err; 725 } 726 727 if (s->vnet_hdr) { 728 /* 729 * We send vnet header len make other module(like filter-redirector) 730 * know how to parse net packet correctly. 731 */ 732 len = htonl(vnet_hdr_len); 733 734 if (!notify_remote_frame) { 735 ret = qemu_chr_fe_write_all(&s->chr_out, 736 (uint8_t *)&len, 737 sizeof(len)); 738 } 739 740 if (ret != sizeof(len)) { 741 goto err; 742 } 743 } 744 745 if (notify_remote_frame) { 746 ret = qemu_chr_fe_write_all(&s->chr_notify_dev, 747 (uint8_t *)buf, 748 size); 749 } else { 750 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size); 751 } 752 753 if (ret != size) { 754 goto err; 755 } 756 757 return 0; 758 759 err: 760 return ret < 0 ? ret : -EIO; 761 } 762 763 static int compare_chr_can_read(void *opaque) 764 { 765 return COMPARE_READ_LEN_MAX; 766 } 767 768 /* 769 * Called from the main thread on the primary for packets 770 * arriving over the socket from the primary. 771 */ 772 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) 773 { 774 CompareState *s = COLO_COMPARE(opaque); 775 int ret; 776 777 ret = net_fill_rstate(&s->pri_rs, buf, size); 778 if (ret == -1) { 779 qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL, 780 NULL, NULL, true); 781 error_report("colo-compare primary_in error"); 782 } 783 } 784 785 /* 786 * Called from the main thread on the primary for packets 787 * arriving over the socket from the secondary. 788 */ 789 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) 790 { 791 CompareState *s = COLO_COMPARE(opaque); 792 int ret; 793 794 ret = net_fill_rstate(&s->sec_rs, buf, size); 795 if (ret == -1) { 796 qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL, 797 NULL, NULL, true); 798 error_report("colo-compare secondary_in error"); 799 } 800 } 801 802 static void compare_notify_chr(void *opaque, const uint8_t *buf, int size) 803 { 804 CompareState *s = COLO_COMPARE(opaque); 805 int ret; 806 807 ret = net_fill_rstate(&s->notify_rs, buf, size); 808 if (ret == -1) { 809 qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL, 810 NULL, NULL, true); 811 error_report("colo-compare notify_dev error"); 812 } 813 } 814 815 /* 816 * Check old packet regularly so it can watch for any packets 817 * that the secondary hasn't produced equivalents of. 818 */ 819 static void check_old_packet_regular(void *opaque) 820 { 821 CompareState *s = opaque; 822 823 /* if have old packet we will notify checkpoint */ 824 colo_old_packet_check(s); 825 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + 826 s->expired_scan_cycle); 827 } 828 829 /* Public API, Used for COLO frame to notify compare event */ 830 void colo_notify_compares_event(void *opaque, int event, Error **errp) 831 { 832 CompareState *s; 833 834 qemu_mutex_lock(&event_mtx); 835 QTAILQ_FOREACH(s, &net_compares, next) { 836 s->event = event; 837 qemu_bh_schedule(s->event_bh); 838 event_unhandled_count++; 839 } 840 /* Wait all compare threads to finish handling this event */ 841 while (event_unhandled_count > 0) { 842 qemu_cond_wait(&event_complete_cond, &event_mtx); 843 } 844 845 qemu_mutex_unlock(&event_mtx); 846 } 847 848 static void colo_compare_timer_init(CompareState *s) 849 { 850 AioContext *ctx = iothread_get_aio_context(s->iothread); 851 852 s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL, 853 SCALE_MS, check_old_packet_regular, 854 s); 855 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + 856 s->expired_scan_cycle); 857 } 858 859 static void colo_compare_timer_del(CompareState *s) 860 { 861 if (s->packet_check_timer) { 862 timer_del(s->packet_check_timer); 863 timer_free(s->packet_check_timer); 864 s->packet_check_timer = NULL; 865 } 866 } 867 868 static void colo_flush_packets(void *opaque, void *user_data); 869 870 static void colo_compare_handle_event(void *opaque) 871 { 872 CompareState *s = opaque; 873 874 switch (s->event) { 875 case COLO_EVENT_CHECKPOINT: 876 g_queue_foreach(&s->conn_list, colo_flush_packets, s); 877 break; 878 case COLO_EVENT_FAILOVER: 879 break; 880 default: 881 break; 882 } 883 884 qemu_mutex_lock(&event_mtx); 885 assert(event_unhandled_count > 0); 886 event_unhandled_count--; 887 qemu_cond_broadcast(&event_complete_cond); 888 qemu_mutex_unlock(&event_mtx); 889 } 890 891 static void colo_compare_iothread(CompareState *s) 892 { 893 object_ref(OBJECT(s->iothread)); 894 s->worker_context = iothread_get_g_main_context(s->iothread); 895 896 qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read, 897 compare_pri_chr_in, NULL, NULL, 898 s, s->worker_context, true); 899 qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read, 900 compare_sec_chr_in, NULL, NULL, 901 s, s->worker_context, true); 902 if (s->notify_dev) { 903 qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read, 904 compare_notify_chr, NULL, NULL, 905 s, s->worker_context, true); 906 } 907 908 colo_compare_timer_init(s); 909 s->event_bh = qemu_bh_new(colo_compare_handle_event, s); 910 } 911 912 static char *compare_get_pri_indev(Object *obj, Error **errp) 913 { 914 CompareState *s = COLO_COMPARE(obj); 915 916 return g_strdup(s->pri_indev); 917 } 918 919 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp) 920 { 921 CompareState *s = COLO_COMPARE(obj); 922 923 g_free(s->pri_indev); 924 s->pri_indev = g_strdup(value); 925 } 926 927 static char *compare_get_sec_indev(Object *obj, Error **errp) 928 { 929 CompareState *s = COLO_COMPARE(obj); 930 931 return g_strdup(s->sec_indev); 932 } 933 934 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp) 935 { 936 CompareState *s = COLO_COMPARE(obj); 937 938 g_free(s->sec_indev); 939 s->sec_indev = g_strdup(value); 940 } 941 942 static char *compare_get_outdev(Object *obj, Error **errp) 943 { 944 CompareState *s = COLO_COMPARE(obj); 945 946 return g_strdup(s->outdev); 947 } 948 949 static void compare_set_outdev(Object *obj, const char *value, Error **errp) 950 { 951 CompareState *s = COLO_COMPARE(obj); 952 953 g_free(s->outdev); 954 s->outdev = g_strdup(value); 955 } 956 957 static bool compare_get_vnet_hdr(Object *obj, Error **errp) 958 { 959 CompareState *s = COLO_COMPARE(obj); 960 961 return s->vnet_hdr; 962 } 963 964 static void compare_set_vnet_hdr(Object *obj, 965 bool value, 966 Error **errp) 967 { 968 CompareState *s = COLO_COMPARE(obj); 969 970 s->vnet_hdr = value; 971 } 972 973 static char *compare_get_notify_dev(Object *obj, Error **errp) 974 { 975 CompareState *s = COLO_COMPARE(obj); 976 977 return g_strdup(s->notify_dev); 978 } 979 980 static void compare_set_notify_dev(Object *obj, const char *value, Error **errp) 981 { 982 CompareState *s = COLO_COMPARE(obj); 983 984 g_free(s->notify_dev); 985 s->notify_dev = g_strdup(value); 986 } 987 988 static void compare_get_timeout(Object *obj, Visitor *v, 989 const char *name, void *opaque, 990 Error **errp) 991 { 992 CompareState *s = COLO_COMPARE(obj); 993 uint32_t value = s->compare_timeout; 994 995 visit_type_uint32(v, name, &value, errp); 996 } 997 998 static void compare_set_timeout(Object *obj, Visitor *v, 999 const char *name, void *opaque, 1000 Error **errp) 1001 { 1002 CompareState *s = COLO_COMPARE(obj); 1003 Error *local_err = NULL; 1004 uint32_t value; 1005 1006 visit_type_uint32(v, name, &value, &local_err); 1007 if (local_err) { 1008 goto out; 1009 } 1010 if (!value) { 1011 error_setg(&local_err, "Property '%s.%s' requires a positive value", 1012 object_get_typename(obj), name); 1013 goto out; 1014 } 1015 s->compare_timeout = value; 1016 1017 out: 1018 error_propagate(errp, local_err); 1019 } 1020 1021 static void compare_get_expired_scan_cycle(Object *obj, Visitor *v, 1022 const char *name, void *opaque, 1023 Error **errp) 1024 { 1025 CompareState *s = COLO_COMPARE(obj); 1026 uint32_t value = s->expired_scan_cycle; 1027 1028 visit_type_uint32(v, name, &value, errp); 1029 } 1030 1031 static void compare_set_expired_scan_cycle(Object *obj, Visitor *v, 1032 const char *name, void *opaque, 1033 Error **errp) 1034 { 1035 CompareState *s = COLO_COMPARE(obj); 1036 Error *local_err = NULL; 1037 uint32_t value; 1038 1039 visit_type_uint32(v, name, &value, &local_err); 1040 if (local_err) { 1041 goto out; 1042 } 1043 if (!value) { 1044 error_setg(&local_err, "Property '%s.%s' requires a positive value", 1045 object_get_typename(obj), name); 1046 goto out; 1047 } 1048 s->expired_scan_cycle = value; 1049 1050 out: 1051 error_propagate(errp, local_err); 1052 } 1053 1054 static void compare_pri_rs_finalize(SocketReadState *pri_rs) 1055 { 1056 CompareState *s = container_of(pri_rs, CompareState, pri_rs); 1057 Connection *conn = NULL; 1058 1059 if (packet_enqueue(s, PRIMARY_IN, &conn)) { 1060 trace_colo_compare_main("primary: unsupported packet in"); 1061 compare_chr_send(s, 1062 pri_rs->buf, 1063 pri_rs->packet_len, 1064 pri_rs->vnet_hdr_len, 1065 false); 1066 } else { 1067 /* compare packet in the specified connection */ 1068 colo_compare_connection(conn, s); 1069 } 1070 } 1071 1072 static void compare_sec_rs_finalize(SocketReadState *sec_rs) 1073 { 1074 CompareState *s = container_of(sec_rs, CompareState, sec_rs); 1075 Connection *conn = NULL; 1076 1077 if (packet_enqueue(s, SECONDARY_IN, &conn)) { 1078 trace_colo_compare_main("secondary: unsupported packet in"); 1079 } else { 1080 /* compare packet in the specified connection */ 1081 colo_compare_connection(conn, s); 1082 } 1083 } 1084 1085 static void compare_notify_rs_finalize(SocketReadState *notify_rs) 1086 { 1087 CompareState *s = container_of(notify_rs, CompareState, notify_rs); 1088 1089 const char msg[] = "COLO_COMPARE_GET_XEN_INIT"; 1090 int ret; 1091 1092 if (packet_matches_str("COLO_USERSPACE_PROXY_INIT", 1093 notify_rs->buf, 1094 notify_rs->packet_len)) { 1095 ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); 1096 if (ret < 0) { 1097 error_report("Notify Xen COLO-frame INIT failed"); 1098 } 1099 } else if (packet_matches_str("COLO_CHECKPOINT", 1100 notify_rs->buf, 1101 notify_rs->packet_len)) { 1102 /* colo-compare do checkpoint, flush pri packet and remove sec packet */ 1103 g_queue_foreach(&s->conn_list, colo_flush_packets, s); 1104 } else { 1105 error_report("COLO compare got unsupported instruction"); 1106 } 1107 } 1108 1109 /* 1110 * Return 0 is success. 1111 * Return 1 is failed. 1112 */ 1113 static int find_and_check_chardev(Chardev **chr, 1114 char *chr_name, 1115 Error **errp) 1116 { 1117 *chr = qemu_chr_find(chr_name); 1118 if (*chr == NULL) { 1119 error_setg(errp, "Device '%s' not found", 1120 chr_name); 1121 return 1; 1122 } 1123 1124 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) { 1125 error_setg(errp, "chardev \"%s\" is not reconnectable", 1126 chr_name); 1127 return 1; 1128 } 1129 1130 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) { 1131 error_setg(errp, "chardev \"%s\" cannot switch context", 1132 chr_name); 1133 return 1; 1134 } 1135 1136 return 0; 1137 } 1138 1139 /* 1140 * Called from the main thread on the primary 1141 * to setup colo-compare. 1142 */ 1143 static void colo_compare_complete(UserCreatable *uc, Error **errp) 1144 { 1145 CompareState *s = COLO_COMPARE(uc); 1146 Chardev *chr; 1147 1148 if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) { 1149 error_setg(errp, "colo compare needs 'primary_in' ," 1150 "'secondary_in','outdev','iothread' property set"); 1151 return; 1152 } else if (!strcmp(s->pri_indev, s->outdev) || 1153 !strcmp(s->sec_indev, s->outdev) || 1154 !strcmp(s->pri_indev, s->sec_indev)) { 1155 error_setg(errp, "'indev' and 'outdev' could not be same " 1156 "for compare module"); 1157 return; 1158 } 1159 1160 if (!s->compare_timeout) { 1161 /* Set default value to 3000 MS */ 1162 s->compare_timeout = DEFAULT_TIME_OUT_MS; 1163 } 1164 1165 if (!s->expired_scan_cycle) { 1166 /* Set default value to 3000 MS */ 1167 s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS; 1168 } 1169 1170 if (find_and_check_chardev(&chr, s->pri_indev, errp) || 1171 !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) { 1172 return; 1173 } 1174 1175 if (find_and_check_chardev(&chr, s->sec_indev, errp) || 1176 !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) { 1177 return; 1178 } 1179 1180 if (find_and_check_chardev(&chr, s->outdev, errp) || 1181 !qemu_chr_fe_init(&s->chr_out, chr, errp)) { 1182 return; 1183 } 1184 1185 net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr); 1186 net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr); 1187 1188 /* Try to enable remote notify chardev, currently just for Xen COLO */ 1189 if (s->notify_dev) { 1190 if (find_and_check_chardev(&chr, s->notify_dev, errp) || 1191 !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) { 1192 return; 1193 } 1194 1195 net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize, 1196 s->vnet_hdr); 1197 } 1198 1199 QTAILQ_INSERT_TAIL(&net_compares, s, next); 1200 1201 g_queue_init(&s->conn_list); 1202 1203 qemu_mutex_init(&event_mtx); 1204 qemu_cond_init(&event_complete_cond); 1205 1206 s->connection_track_table = g_hash_table_new_full(connection_key_hash, 1207 connection_key_equal, 1208 g_free, 1209 connection_destroy); 1210 1211 colo_compare_iothread(s); 1212 return; 1213 } 1214 1215 static void colo_flush_packets(void *opaque, void *user_data) 1216 { 1217 CompareState *s = user_data; 1218 Connection *conn = opaque; 1219 Packet *pkt = NULL; 1220 1221 while (!g_queue_is_empty(&conn->primary_list)) { 1222 pkt = g_queue_pop_head(&conn->primary_list); 1223 compare_chr_send(s, 1224 pkt->data, 1225 pkt->size, 1226 pkt->vnet_hdr_len, 1227 false); 1228 packet_destroy(pkt, NULL); 1229 } 1230 while (!g_queue_is_empty(&conn->secondary_list)) { 1231 pkt = g_queue_pop_head(&conn->secondary_list); 1232 packet_destroy(pkt, NULL); 1233 } 1234 } 1235 1236 static void colo_compare_class_init(ObjectClass *oc, void *data) 1237 { 1238 UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc); 1239 1240 ucc->complete = colo_compare_complete; 1241 } 1242 1243 static void colo_compare_init(Object *obj) 1244 { 1245 CompareState *s = COLO_COMPARE(obj); 1246 1247 object_property_add_str(obj, "primary_in", 1248 compare_get_pri_indev, compare_set_pri_indev); 1249 object_property_add_str(obj, "secondary_in", 1250 compare_get_sec_indev, compare_set_sec_indev); 1251 object_property_add_str(obj, "outdev", 1252 compare_get_outdev, compare_set_outdev); 1253 object_property_add_link(obj, "iothread", TYPE_IOTHREAD, 1254 (Object **)&s->iothread, 1255 object_property_allow_set_link, 1256 OBJ_PROP_LINK_STRONG); 1257 /* This parameter just for Xen COLO */ 1258 object_property_add_str(obj, "notify_dev", 1259 compare_get_notify_dev, compare_set_notify_dev); 1260 1261 object_property_add(obj, "compare_timeout", "uint32", 1262 compare_get_timeout, 1263 compare_set_timeout, NULL, NULL); 1264 1265 object_property_add(obj, "expired_scan_cycle", "uint32", 1266 compare_get_expired_scan_cycle, 1267 compare_set_expired_scan_cycle, NULL, NULL); 1268 1269 s->vnet_hdr = false; 1270 object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr, 1271 compare_set_vnet_hdr); 1272 } 1273 1274 static void colo_compare_finalize(Object *obj) 1275 { 1276 CompareState *s = COLO_COMPARE(obj); 1277 CompareState *tmp = NULL; 1278 1279 qemu_chr_fe_deinit(&s->chr_pri_in, false); 1280 qemu_chr_fe_deinit(&s->chr_sec_in, false); 1281 qemu_chr_fe_deinit(&s->chr_out, false); 1282 if (s->notify_dev) { 1283 qemu_chr_fe_deinit(&s->chr_notify_dev, false); 1284 } 1285 1286 if (s->iothread) { 1287 colo_compare_timer_del(s); 1288 } 1289 1290 qemu_bh_delete(s->event_bh); 1291 1292 QTAILQ_FOREACH(tmp, &net_compares, next) { 1293 if (tmp == s) { 1294 QTAILQ_REMOVE(&net_compares, s, next); 1295 break; 1296 } 1297 } 1298 1299 /* Release all unhandled packets after compare thead exited */ 1300 g_queue_foreach(&s->conn_list, colo_flush_packets, s); 1301 1302 g_queue_clear(&s->conn_list); 1303 1304 if (s->connection_track_table) { 1305 g_hash_table_destroy(s->connection_track_table); 1306 } 1307 1308 if (s->iothread) { 1309 object_unref(OBJECT(s->iothread)); 1310 } 1311 1312 qemu_mutex_destroy(&event_mtx); 1313 qemu_cond_destroy(&event_complete_cond); 1314 1315 g_free(s->pri_indev); 1316 g_free(s->sec_indev); 1317 g_free(s->outdev); 1318 g_free(s->notify_dev); 1319 } 1320 1321 static const TypeInfo colo_compare_info = { 1322 .name = TYPE_COLO_COMPARE, 1323 .parent = TYPE_OBJECT, 1324 .instance_size = sizeof(CompareState), 1325 .instance_init = colo_compare_init, 1326 .instance_finalize = colo_compare_finalize, 1327 .class_size = sizeof(CompareClass), 1328 .class_init = colo_compare_class_init, 1329 .interfaces = (InterfaceInfo[]) { 1330 { TYPE_USER_CREATABLE }, 1331 { } 1332 } 1333 }; 1334 1335 static void register_types(void) 1336 { 1337 type_register_static(&colo_compare_info); 1338 } 1339 1340 type_init(register_types); 1341