1 /* 2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) 3 * (a.k.a. Fault Tolerance or Continuous Replication) 4 * 5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. 6 * Copyright (c) 2016 FUJITSU LIMITED 7 * Copyright (c) 2016 Intel Corporation 8 * 9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2 or 12 * later. See the COPYING file in the top-level directory. 13 */ 14 15 #include "qemu/osdep.h" 16 #include "qemu-common.h" 17 #include "qemu/error-report.h" 18 #include "trace.h" 19 #include "qapi/error.h" 20 #include "net/net.h" 21 #include "net/eth.h" 22 #include "qom/object_interfaces.h" 23 #include "qemu/iov.h" 24 #include "qom/object.h" 25 #include "net/queue.h" 26 #include "chardev/char-fe.h" 27 #include "qemu/sockets.h" 28 #include "colo.h" 29 #include "sysemu/iothread.h" 30 #include "net/colo-compare.h" 31 #include "migration/colo.h" 32 #include "migration/migration.h" 33 #include "util.h" 34 35 #define TYPE_COLO_COMPARE "colo-compare" 36 #define COLO_COMPARE(obj) \ 37 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) 38 39 static QTAILQ_HEAD(, CompareState) net_compares = 40 QTAILQ_HEAD_INITIALIZER(net_compares); 41 42 static NotifierList colo_compare_notifiers = 43 NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers); 44 45 #define COMPARE_READ_LEN_MAX NET_BUFSIZE 46 #define MAX_QUEUE_SIZE 1024 47 48 #define COLO_COMPARE_FREE_PRIMARY 0x01 49 #define COLO_COMPARE_FREE_SECONDARY 0x02 50 51 /* TODO: Should be configurable */ 52 #define REGULAR_PACKET_CHECK_MS 3000 53 54 static QemuMutex event_mtx; 55 static QemuCond event_complete_cond; 56 static int event_unhandled_count; 57 58 /* 59 * + CompareState ++ 60 * | | 61 * +---------------+ +---------------+ +---------------+ 62 * | conn list + - > conn + ------- > conn + -- > ...... 63 * +---------------+ +---------------+ +---------------+ 64 * | | | | | | 65 * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+ 66 * |primary | |secondary |primary | |secondary 67 * |packet | |packet + |packet | |packet + 68 * +--------+ +--------+ +--------+ +--------+ 69 * | | | | 70 * +---v----+ +---v----+ +---v----+ +---v----+ 71 * |primary | |secondary |primary | |secondary 72 * |packet | |packet + |packet | |packet + 73 * +--------+ +--------+ +--------+ +--------+ 74 * | | | | 75 * +---v----+ +---v----+ +---v----+ +---v----+ 76 * |primary | |secondary |primary | |secondary 77 * |packet | |packet + |packet | |packet + 78 * +--------+ +--------+ +--------+ +--------+ 79 */ 80 typedef struct CompareState { 81 Object parent; 82 83 char *pri_indev; 84 char *sec_indev; 85 char *outdev; 86 char *notify_dev; 87 CharBackend chr_pri_in; 88 CharBackend chr_sec_in; 89 CharBackend chr_out; 90 CharBackend chr_notify_dev; 91 SocketReadState pri_rs; 92 SocketReadState sec_rs; 93 SocketReadState notify_rs; 94 bool vnet_hdr; 95 96 /* 97 * Record the connection that through the NIC 98 * Element type: Connection 99 */ 100 GQueue conn_list; 101 /* Record the connection without repetition */ 102 GHashTable *connection_track_table; 103 104 IOThread *iothread; 105 GMainContext *worker_context; 106 QEMUTimer *packet_check_timer; 107 108 QEMUBH *event_bh; 109 enum colo_event event; 110 111 QTAILQ_ENTRY(CompareState) next; 112 } CompareState; 113 114 typedef struct CompareClass { 115 ObjectClass parent_class; 116 } CompareClass; 117 118 enum { 119 PRIMARY_IN = 0, 120 SECONDARY_IN, 121 }; 122 123 124 static int compare_chr_send(CompareState *s, 125 const uint8_t *buf, 126 uint32_t size, 127 uint32_t vnet_hdr_len, 128 bool notify_remote_frame); 129 130 static bool packet_matches_str(const char *str, 131 const uint8_t *buf, 132 uint32_t packet_len) 133 { 134 if (packet_len != strlen(str)) { 135 return false; 136 } 137 138 return !memcmp(str, buf, strlen(str)); 139 } 140 141 static void notify_remote_frame(CompareState *s) 142 { 143 char msg[] = "DO_CHECKPOINT"; 144 int ret = 0; 145 146 ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); 147 if (ret < 0) { 148 error_report("Notify Xen COLO-frame failed"); 149 } 150 } 151 152 static void colo_compare_inconsistency_notify(CompareState *s) 153 { 154 if (s->notify_dev) { 155 notify_remote_frame(s); 156 } else { 157 notifier_list_notify(&colo_compare_notifiers, 158 migrate_get_current()); 159 } 160 } 161 162 static gint seq_sorter(Packet *a, Packet *b, gpointer data) 163 { 164 struct tcp_hdr *atcp, *btcp; 165 166 atcp = (struct tcp_hdr *)(a->transport_header); 167 btcp = (struct tcp_hdr *)(b->transport_header); 168 return ntohl(atcp->th_seq) - ntohl(btcp->th_seq); 169 } 170 171 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack) 172 { 173 Packet *pkt = data; 174 struct tcp_hdr *tcphd; 175 176 tcphd = (struct tcp_hdr *)pkt->transport_header; 177 178 pkt->tcp_seq = ntohl(tcphd->th_seq); 179 pkt->tcp_ack = ntohl(tcphd->th_ack); 180 *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack; 181 pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data 182 + (tcphd->th_off << 2) - pkt->vnet_hdr_len; 183 pkt->payload_size = pkt->size - pkt->header_size; 184 pkt->seq_end = pkt->tcp_seq + pkt->payload_size; 185 pkt->flags = tcphd->th_flags; 186 } 187 188 /* 189 * Return 1 on success, if return 0 means the 190 * packet will be dropped 191 */ 192 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack) 193 { 194 if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) { 195 if (pkt->ip->ip_p == IPPROTO_TCP) { 196 fill_pkt_tcp_info(pkt, max_ack); 197 g_queue_insert_sorted(queue, 198 pkt, 199 (GCompareDataFunc)seq_sorter, 200 NULL); 201 } else { 202 g_queue_push_tail(queue, pkt); 203 } 204 return 1; 205 } 206 return 0; 207 } 208 209 /* 210 * Return 0 on success, if return -1 means the pkt 211 * is unsupported(arp and ipv6) and will be sent later 212 */ 213 static int packet_enqueue(CompareState *s, int mode, Connection **con) 214 { 215 ConnectionKey key; 216 Packet *pkt = NULL; 217 Connection *conn; 218 219 if (mode == PRIMARY_IN) { 220 pkt = packet_new(s->pri_rs.buf, 221 s->pri_rs.packet_len, 222 s->pri_rs.vnet_hdr_len); 223 } else { 224 pkt = packet_new(s->sec_rs.buf, 225 s->sec_rs.packet_len, 226 s->sec_rs.vnet_hdr_len); 227 } 228 229 if (parse_packet_early(pkt)) { 230 packet_destroy(pkt, NULL); 231 pkt = NULL; 232 return -1; 233 } 234 fill_connection_key(pkt, &key); 235 236 conn = connection_get(s->connection_track_table, 237 &key, 238 &s->conn_list); 239 240 if (!conn->processing) { 241 g_queue_push_tail(&s->conn_list, conn); 242 conn->processing = true; 243 } 244 245 if (mode == PRIMARY_IN) { 246 if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) { 247 error_report("colo compare primary queue size too big," 248 "drop packet"); 249 } 250 } else { 251 if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) { 252 error_report("colo compare secondary queue size too big," 253 "drop packet"); 254 } 255 } 256 *con = conn; 257 258 return 0; 259 } 260 261 static inline bool after(uint32_t seq1, uint32_t seq2) 262 { 263 return (int32_t)(seq1 - seq2) > 0; 264 } 265 266 static void colo_release_primary_pkt(CompareState *s, Packet *pkt) 267 { 268 int ret; 269 ret = compare_chr_send(s, 270 pkt->data, 271 pkt->size, 272 pkt->vnet_hdr_len, 273 false); 274 if (ret < 0) { 275 error_report("colo send primary packet failed"); 276 } 277 trace_colo_compare_main("packet same and release packet"); 278 packet_destroy(pkt, NULL); 279 } 280 281 /* 282 * The IP packets sent by primary and secondary 283 * will be compared in here 284 * TODO support ip fragment, Out-Of-Order 285 * return: 0 means packet same 286 * > 0 || < 0 means packet different 287 */ 288 static int colo_compare_packet_payload(Packet *ppkt, 289 Packet *spkt, 290 uint16_t poffset, 291 uint16_t soffset, 292 uint16_t len) 293 294 { 295 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 296 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20]; 297 298 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); 299 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); 300 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); 301 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); 302 303 trace_colo_compare_ip_info(ppkt->size, pri_ip_src, 304 pri_ip_dst, spkt->size, 305 sec_ip_src, sec_ip_dst); 306 } 307 308 return memcmp(ppkt->data + poffset, spkt->data + soffset, len); 309 } 310 311 /* 312 * return true means that the payload is consist and 313 * need to make the next comparison, false means do 314 * the checkpoint 315 */ 316 static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt, 317 int8_t *mark, uint32_t max_ack) 318 { 319 *mark = 0; 320 321 if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) { 322 if (colo_compare_packet_payload(ppkt, spkt, 323 ppkt->header_size, spkt->header_size, 324 ppkt->payload_size)) { 325 *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY; 326 return true; 327 } 328 } 329 330 /* one part of secondary packet payload still need to be compared */ 331 if (!after(ppkt->seq_end, spkt->seq_end)) { 332 if (colo_compare_packet_payload(ppkt, spkt, 333 ppkt->header_size + ppkt->offset, 334 spkt->header_size + spkt->offset, 335 ppkt->payload_size - ppkt->offset)) { 336 if (!after(ppkt->tcp_ack, max_ack)) { 337 *mark = COLO_COMPARE_FREE_PRIMARY; 338 spkt->offset += ppkt->payload_size - ppkt->offset; 339 return true; 340 } else { 341 /* secondary guest hasn't ack the data, don't send 342 * out this packet 343 */ 344 return false; 345 } 346 } 347 } else { 348 /* primary packet is longer than secondary packet, compare 349 * the same part and mark the primary packet offset 350 */ 351 if (colo_compare_packet_payload(ppkt, spkt, 352 ppkt->header_size + ppkt->offset, 353 spkt->header_size + spkt->offset, 354 spkt->payload_size - spkt->offset)) { 355 *mark = COLO_COMPARE_FREE_SECONDARY; 356 ppkt->offset += spkt->payload_size - spkt->offset; 357 return true; 358 } 359 } 360 361 return false; 362 } 363 364 static void colo_compare_tcp(CompareState *s, Connection *conn) 365 { 366 Packet *ppkt = NULL, *spkt = NULL; 367 int8_t mark; 368 369 /* 370 * If ppkt and spkt have the same payload, but ppkt's ACK 371 * is greater than spkt's ACK, in this case we can not 372 * send the ppkt because it will cause the secondary guest 373 * to miss sending some data in the next. Therefore, we 374 * record the maximum ACK in the current queue at both 375 * primary side and secondary side. Only when the ack is 376 * less than the smaller of the two maximum ack, then we 377 * can ensure that the packet's payload is acknowledged by 378 * primary and secondary. 379 */ 380 uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack; 381 382 pri: 383 if (g_queue_is_empty(&conn->primary_list)) { 384 return; 385 } 386 ppkt = g_queue_pop_head(&conn->primary_list); 387 sec: 388 if (g_queue_is_empty(&conn->secondary_list)) { 389 g_queue_push_head(&conn->primary_list, ppkt); 390 return; 391 } 392 spkt = g_queue_pop_head(&conn->secondary_list); 393 394 if (ppkt->tcp_seq == ppkt->seq_end) { 395 colo_release_primary_pkt(s, ppkt); 396 ppkt = NULL; 397 } 398 399 if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) { 400 trace_colo_compare_main("pri: this packet has compared"); 401 colo_release_primary_pkt(s, ppkt); 402 ppkt = NULL; 403 } 404 405 if (spkt->tcp_seq == spkt->seq_end) { 406 packet_destroy(spkt, NULL); 407 if (!ppkt) { 408 goto pri; 409 } else { 410 goto sec; 411 } 412 } else { 413 if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) { 414 trace_colo_compare_main("sec: this packet has compared"); 415 packet_destroy(spkt, NULL); 416 if (!ppkt) { 417 goto pri; 418 } else { 419 goto sec; 420 } 421 } 422 if (!ppkt) { 423 g_queue_push_head(&conn->secondary_list, spkt); 424 goto pri; 425 } 426 } 427 428 if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) { 429 trace_colo_compare_tcp_info("pri", 430 ppkt->tcp_seq, ppkt->tcp_ack, 431 ppkt->header_size, ppkt->payload_size, 432 ppkt->offset, ppkt->flags); 433 434 trace_colo_compare_tcp_info("sec", 435 spkt->tcp_seq, spkt->tcp_ack, 436 spkt->header_size, spkt->payload_size, 437 spkt->offset, spkt->flags); 438 439 if (mark == COLO_COMPARE_FREE_PRIMARY) { 440 conn->compare_seq = ppkt->seq_end; 441 colo_release_primary_pkt(s, ppkt); 442 g_queue_push_head(&conn->secondary_list, spkt); 443 goto pri; 444 } 445 if (mark == COLO_COMPARE_FREE_SECONDARY) { 446 conn->compare_seq = spkt->seq_end; 447 packet_destroy(spkt, NULL); 448 goto sec; 449 } 450 if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) { 451 conn->compare_seq = ppkt->seq_end; 452 colo_release_primary_pkt(s, ppkt); 453 packet_destroy(spkt, NULL); 454 goto pri; 455 } 456 } else { 457 g_queue_push_head(&conn->primary_list, ppkt); 458 g_queue_push_head(&conn->secondary_list, spkt); 459 460 qemu_hexdump((char *)ppkt->data, stderr, 461 "colo-compare ppkt", ppkt->size); 462 qemu_hexdump((char *)spkt->data, stderr, 463 "colo-compare spkt", spkt->size); 464 465 colo_compare_inconsistency_notify(s); 466 } 467 } 468 469 470 /* 471 * Called from the compare thread on the primary 472 * for compare udp packet 473 */ 474 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt) 475 { 476 uint16_t network_header_length = ppkt->ip->ip_hl << 2; 477 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len; 478 479 trace_colo_compare_main("compare udp"); 480 481 /* 482 * Because of ppkt and spkt are both in the same connection, 483 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are 484 * same with spkt. In addition, IP header's Identification is a random 485 * field, we can handle it in IP fragmentation function later. 486 * COLO just concern the response net packet payload from primary guest 487 * and secondary guest are same or not, So we ignored all IP header include 488 * other field like TOS,TTL,IP Checksum. we only need to compare 489 * the ip payload here. 490 */ 491 if (ppkt->size != spkt->size) { 492 trace_colo_compare_main("UDP: payload size of packets are different"); 493 return -1; 494 } 495 if (colo_compare_packet_payload(ppkt, spkt, offset, offset, 496 ppkt->size - offset)) { 497 trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size); 498 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size); 499 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 500 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt", 501 ppkt->size); 502 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt", 503 spkt->size); 504 } 505 return -1; 506 } else { 507 return 0; 508 } 509 } 510 511 /* 512 * Called from the compare thread on the primary 513 * for compare icmp packet 514 */ 515 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt) 516 { 517 uint16_t network_header_length = ppkt->ip->ip_hl << 2; 518 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len; 519 520 trace_colo_compare_main("compare icmp"); 521 522 /* 523 * Because of ppkt and spkt are both in the same connection, 524 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are 525 * same with spkt. In addition, IP header's Identification is a random 526 * field, we can handle it in IP fragmentation function later. 527 * COLO just concern the response net packet payload from primary guest 528 * and secondary guest are same or not, So we ignored all IP header include 529 * other field like TOS,TTL,IP Checksum. we only need to compare 530 * the ip payload here. 531 */ 532 if (ppkt->size != spkt->size) { 533 trace_colo_compare_main("ICMP: payload size of packets are different"); 534 return -1; 535 } 536 if (colo_compare_packet_payload(ppkt, spkt, offset, offset, 537 ppkt->size - offset)) { 538 trace_colo_compare_icmp_miscompare("primary pkt size", 539 ppkt->size); 540 trace_colo_compare_icmp_miscompare("Secondary pkt size", 541 spkt->size); 542 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 543 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt", 544 ppkt->size); 545 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt", 546 spkt->size); 547 } 548 return -1; 549 } else { 550 return 0; 551 } 552 } 553 554 /* 555 * Called from the compare thread on the primary 556 * for compare other packet 557 */ 558 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt) 559 { 560 uint16_t offset = ppkt->vnet_hdr_len; 561 562 trace_colo_compare_main("compare other"); 563 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 564 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20]; 565 566 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); 567 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); 568 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); 569 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); 570 571 trace_colo_compare_ip_info(ppkt->size, pri_ip_src, 572 pri_ip_dst, spkt->size, 573 sec_ip_src, sec_ip_dst); 574 } 575 576 if (ppkt->size != spkt->size) { 577 trace_colo_compare_main("Other: payload size of packets are different"); 578 return -1; 579 } 580 return colo_compare_packet_payload(ppkt, spkt, offset, offset, 581 ppkt->size - offset); 582 } 583 584 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time) 585 { 586 int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST); 587 588 if ((now - pkt->creation_ms) > (*check_time)) { 589 trace_colo_old_packet_check_found(pkt->creation_ms); 590 return 0; 591 } else { 592 return 1; 593 } 594 } 595 596 void colo_compare_register_notifier(Notifier *notify) 597 { 598 notifier_list_add(&colo_compare_notifiers, notify); 599 } 600 601 void colo_compare_unregister_notifier(Notifier *notify) 602 { 603 notifier_remove(notify); 604 } 605 606 static int colo_old_packet_check_one_conn(Connection *conn, 607 CompareState *s) 608 { 609 GList *result = NULL; 610 int64_t check_time = REGULAR_PACKET_CHECK_MS; 611 612 result = g_queue_find_custom(&conn->primary_list, 613 &check_time, 614 (GCompareFunc)colo_old_packet_check_one); 615 616 if (result) { 617 /* Do checkpoint will flush old packet */ 618 colo_compare_inconsistency_notify(s); 619 return 0; 620 } 621 622 return 1; 623 } 624 625 /* 626 * Look for old packets that the secondary hasn't matched, 627 * if we have some then we have to checkpoint to wake 628 * the secondary up. 629 */ 630 static void colo_old_packet_check(void *opaque) 631 { 632 CompareState *s = opaque; 633 634 /* 635 * If we find one old packet, stop finding job and notify 636 * COLO frame do checkpoint. 637 */ 638 g_queue_find_custom(&s->conn_list, s, 639 (GCompareFunc)colo_old_packet_check_one_conn); 640 } 641 642 static void colo_compare_packet(CompareState *s, Connection *conn, 643 int (*HandlePacket)(Packet *spkt, 644 Packet *ppkt)) 645 { 646 Packet *pkt = NULL; 647 GList *result = NULL; 648 649 while (!g_queue_is_empty(&conn->primary_list) && 650 !g_queue_is_empty(&conn->secondary_list)) { 651 pkt = g_queue_pop_head(&conn->primary_list); 652 result = g_queue_find_custom(&conn->secondary_list, 653 pkt, (GCompareFunc)HandlePacket); 654 655 if (result) { 656 colo_release_primary_pkt(s, pkt); 657 g_queue_remove(&conn->secondary_list, result->data); 658 } else { 659 /* 660 * If one packet arrive late, the secondary_list or 661 * primary_list will be empty, so we can't compare it 662 * until next comparison. If the packets in the list are 663 * timeout, it will trigger a checkpoint request. 664 */ 665 trace_colo_compare_main("packet different"); 666 g_queue_push_head(&conn->primary_list, pkt); 667 668 colo_compare_inconsistency_notify(s); 669 break; 670 } 671 } 672 } 673 674 /* 675 * Called from the compare thread on the primary 676 * for compare packet with secondary list of the 677 * specified connection when a new packet was 678 * queued to it. 679 */ 680 static void colo_compare_connection(void *opaque, void *user_data) 681 { 682 CompareState *s = user_data; 683 Connection *conn = opaque; 684 685 switch (conn->ip_proto) { 686 case IPPROTO_TCP: 687 colo_compare_tcp(s, conn); 688 break; 689 case IPPROTO_UDP: 690 colo_compare_packet(s, conn, colo_packet_compare_udp); 691 break; 692 case IPPROTO_ICMP: 693 colo_compare_packet(s, conn, colo_packet_compare_icmp); 694 break; 695 default: 696 colo_compare_packet(s, conn, colo_packet_compare_other); 697 break; 698 } 699 } 700 701 static int compare_chr_send(CompareState *s, 702 const uint8_t *buf, 703 uint32_t size, 704 uint32_t vnet_hdr_len, 705 bool notify_remote_frame) 706 { 707 int ret = 0; 708 uint32_t len = htonl(size); 709 710 if (!size) { 711 return 0; 712 } 713 714 if (notify_remote_frame) { 715 ret = qemu_chr_fe_write_all(&s->chr_notify_dev, 716 (uint8_t *)&len, 717 sizeof(len)); 718 } else { 719 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); 720 } 721 722 if (ret != sizeof(len)) { 723 goto err; 724 } 725 726 if (s->vnet_hdr) { 727 /* 728 * We send vnet header len make other module(like filter-redirector) 729 * know how to parse net packet correctly. 730 */ 731 len = htonl(vnet_hdr_len); 732 733 if (!notify_remote_frame) { 734 ret = qemu_chr_fe_write_all(&s->chr_out, 735 (uint8_t *)&len, 736 sizeof(len)); 737 } 738 739 if (ret != sizeof(len)) { 740 goto err; 741 } 742 } 743 744 if (notify_remote_frame) { 745 ret = qemu_chr_fe_write_all(&s->chr_notify_dev, 746 (uint8_t *)buf, 747 size); 748 } else { 749 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size); 750 } 751 752 if (ret != size) { 753 goto err; 754 } 755 756 return 0; 757 758 err: 759 return ret < 0 ? ret : -EIO; 760 } 761 762 static int compare_chr_can_read(void *opaque) 763 { 764 return COMPARE_READ_LEN_MAX; 765 } 766 767 /* 768 * Called from the main thread on the primary for packets 769 * arriving over the socket from the primary. 770 */ 771 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) 772 { 773 CompareState *s = COLO_COMPARE(opaque); 774 int ret; 775 776 ret = net_fill_rstate(&s->pri_rs, buf, size); 777 if (ret == -1) { 778 qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL, 779 NULL, NULL, true); 780 error_report("colo-compare primary_in error"); 781 } 782 } 783 784 /* 785 * Called from the main thread on the primary for packets 786 * arriving over the socket from the secondary. 787 */ 788 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) 789 { 790 CompareState *s = COLO_COMPARE(opaque); 791 int ret; 792 793 ret = net_fill_rstate(&s->sec_rs, buf, size); 794 if (ret == -1) { 795 qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL, 796 NULL, NULL, true); 797 error_report("colo-compare secondary_in error"); 798 } 799 } 800 801 static void compare_notify_chr(void *opaque, const uint8_t *buf, int size) 802 { 803 CompareState *s = COLO_COMPARE(opaque); 804 int ret; 805 806 ret = net_fill_rstate(&s->notify_rs, buf, size); 807 if (ret == -1) { 808 qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL, 809 NULL, NULL, true); 810 error_report("colo-compare notify_dev error"); 811 } 812 } 813 814 /* 815 * Check old packet regularly so it can watch for any packets 816 * that the secondary hasn't produced equivalents of. 817 */ 818 static void check_old_packet_regular(void *opaque) 819 { 820 CompareState *s = opaque; 821 822 /* if have old packet we will notify checkpoint */ 823 colo_old_packet_check(s); 824 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + 825 REGULAR_PACKET_CHECK_MS); 826 } 827 828 /* Public API, Used for COLO frame to notify compare event */ 829 void colo_notify_compares_event(void *opaque, int event, Error **errp) 830 { 831 CompareState *s; 832 833 qemu_mutex_lock(&event_mtx); 834 QTAILQ_FOREACH(s, &net_compares, next) { 835 s->event = event; 836 qemu_bh_schedule(s->event_bh); 837 event_unhandled_count++; 838 } 839 /* Wait all compare threads to finish handling this event */ 840 while (event_unhandled_count > 0) { 841 qemu_cond_wait(&event_complete_cond, &event_mtx); 842 } 843 844 qemu_mutex_unlock(&event_mtx); 845 } 846 847 static void colo_compare_timer_init(CompareState *s) 848 { 849 AioContext *ctx = iothread_get_aio_context(s->iothread); 850 851 s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL, 852 SCALE_MS, check_old_packet_regular, 853 s); 854 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + 855 REGULAR_PACKET_CHECK_MS); 856 } 857 858 static void colo_compare_timer_del(CompareState *s) 859 { 860 if (s->packet_check_timer) { 861 timer_del(s->packet_check_timer); 862 timer_free(s->packet_check_timer); 863 s->packet_check_timer = NULL; 864 } 865 } 866 867 static void colo_flush_packets(void *opaque, void *user_data); 868 869 static void colo_compare_handle_event(void *opaque) 870 { 871 CompareState *s = opaque; 872 873 switch (s->event) { 874 case COLO_EVENT_CHECKPOINT: 875 g_queue_foreach(&s->conn_list, colo_flush_packets, s); 876 break; 877 case COLO_EVENT_FAILOVER: 878 break; 879 default: 880 break; 881 } 882 883 qemu_mutex_lock(&event_mtx); 884 assert(event_unhandled_count > 0); 885 event_unhandled_count--; 886 qemu_cond_broadcast(&event_complete_cond); 887 qemu_mutex_unlock(&event_mtx); 888 } 889 890 static void colo_compare_iothread(CompareState *s) 891 { 892 object_ref(OBJECT(s->iothread)); 893 s->worker_context = iothread_get_g_main_context(s->iothread); 894 895 qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read, 896 compare_pri_chr_in, NULL, NULL, 897 s, s->worker_context, true); 898 qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read, 899 compare_sec_chr_in, NULL, NULL, 900 s, s->worker_context, true); 901 if (s->notify_dev) { 902 qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read, 903 compare_notify_chr, NULL, NULL, 904 s, s->worker_context, true); 905 } 906 907 colo_compare_timer_init(s); 908 s->event_bh = qemu_bh_new(colo_compare_handle_event, s); 909 } 910 911 static char *compare_get_pri_indev(Object *obj, Error **errp) 912 { 913 CompareState *s = COLO_COMPARE(obj); 914 915 return g_strdup(s->pri_indev); 916 } 917 918 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp) 919 { 920 CompareState *s = COLO_COMPARE(obj); 921 922 g_free(s->pri_indev); 923 s->pri_indev = g_strdup(value); 924 } 925 926 static char *compare_get_sec_indev(Object *obj, Error **errp) 927 { 928 CompareState *s = COLO_COMPARE(obj); 929 930 return g_strdup(s->sec_indev); 931 } 932 933 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp) 934 { 935 CompareState *s = COLO_COMPARE(obj); 936 937 g_free(s->sec_indev); 938 s->sec_indev = g_strdup(value); 939 } 940 941 static char *compare_get_outdev(Object *obj, Error **errp) 942 { 943 CompareState *s = COLO_COMPARE(obj); 944 945 return g_strdup(s->outdev); 946 } 947 948 static void compare_set_outdev(Object *obj, const char *value, Error **errp) 949 { 950 CompareState *s = COLO_COMPARE(obj); 951 952 g_free(s->outdev); 953 s->outdev = g_strdup(value); 954 } 955 956 static bool compare_get_vnet_hdr(Object *obj, Error **errp) 957 { 958 CompareState *s = COLO_COMPARE(obj); 959 960 return s->vnet_hdr; 961 } 962 963 static void compare_set_vnet_hdr(Object *obj, 964 bool value, 965 Error **errp) 966 { 967 CompareState *s = COLO_COMPARE(obj); 968 969 s->vnet_hdr = value; 970 } 971 972 static char *compare_get_notify_dev(Object *obj, Error **errp) 973 { 974 CompareState *s = COLO_COMPARE(obj); 975 976 return g_strdup(s->notify_dev); 977 } 978 979 static void compare_set_notify_dev(Object *obj, const char *value, Error **errp) 980 { 981 CompareState *s = COLO_COMPARE(obj); 982 983 g_free(s->notify_dev); 984 s->notify_dev = g_strdup(value); 985 } 986 987 static void compare_pri_rs_finalize(SocketReadState *pri_rs) 988 { 989 CompareState *s = container_of(pri_rs, CompareState, pri_rs); 990 Connection *conn = NULL; 991 992 if (packet_enqueue(s, PRIMARY_IN, &conn)) { 993 trace_colo_compare_main("primary: unsupported packet in"); 994 compare_chr_send(s, 995 pri_rs->buf, 996 pri_rs->packet_len, 997 pri_rs->vnet_hdr_len, 998 false); 999 } else { 1000 /* compare packet in the specified connection */ 1001 colo_compare_connection(conn, s); 1002 } 1003 } 1004 1005 static void compare_sec_rs_finalize(SocketReadState *sec_rs) 1006 { 1007 CompareState *s = container_of(sec_rs, CompareState, sec_rs); 1008 Connection *conn = NULL; 1009 1010 if (packet_enqueue(s, SECONDARY_IN, &conn)) { 1011 trace_colo_compare_main("secondary: unsupported packet in"); 1012 } else { 1013 /* compare packet in the specified connection */ 1014 colo_compare_connection(conn, s); 1015 } 1016 } 1017 1018 static void compare_notify_rs_finalize(SocketReadState *notify_rs) 1019 { 1020 CompareState *s = container_of(notify_rs, CompareState, notify_rs); 1021 1022 const char msg[] = "COLO_COMPARE_GET_XEN_INIT"; 1023 int ret; 1024 1025 if (packet_matches_str("COLO_USERSPACE_PROXY_INIT", 1026 notify_rs->buf, 1027 notify_rs->packet_len)) { 1028 ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); 1029 if (ret < 0) { 1030 error_report("Notify Xen COLO-frame INIT failed"); 1031 } 1032 } else if (packet_matches_str("COLO_CHECKPOINT", 1033 notify_rs->buf, 1034 notify_rs->packet_len)) { 1035 /* colo-compare do checkpoint, flush pri packet and remove sec packet */ 1036 g_queue_foreach(&s->conn_list, colo_flush_packets, s); 1037 } else { 1038 error_report("COLO compare got unsupported instruction"); 1039 } 1040 } 1041 1042 /* 1043 * Return 0 is success. 1044 * Return 1 is failed. 1045 */ 1046 static int find_and_check_chardev(Chardev **chr, 1047 char *chr_name, 1048 Error **errp) 1049 { 1050 *chr = qemu_chr_find(chr_name); 1051 if (*chr == NULL) { 1052 error_setg(errp, "Device '%s' not found", 1053 chr_name); 1054 return 1; 1055 } 1056 1057 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) { 1058 error_setg(errp, "chardev \"%s\" is not reconnectable", 1059 chr_name); 1060 return 1; 1061 } 1062 1063 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) { 1064 error_setg(errp, "chardev \"%s\" cannot switch context", 1065 chr_name); 1066 return 1; 1067 } 1068 1069 return 0; 1070 } 1071 1072 /* 1073 * Called from the main thread on the primary 1074 * to setup colo-compare. 1075 */ 1076 static void colo_compare_complete(UserCreatable *uc, Error **errp) 1077 { 1078 CompareState *s = COLO_COMPARE(uc); 1079 Chardev *chr; 1080 1081 if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) { 1082 error_setg(errp, "colo compare needs 'primary_in' ," 1083 "'secondary_in','outdev','iothread' property set"); 1084 return; 1085 } else if (!strcmp(s->pri_indev, s->outdev) || 1086 !strcmp(s->sec_indev, s->outdev) || 1087 !strcmp(s->pri_indev, s->sec_indev)) { 1088 error_setg(errp, "'indev' and 'outdev' could not be same " 1089 "for compare module"); 1090 return; 1091 } 1092 1093 if (find_and_check_chardev(&chr, s->pri_indev, errp) || 1094 !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) { 1095 return; 1096 } 1097 1098 if (find_and_check_chardev(&chr, s->sec_indev, errp) || 1099 !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) { 1100 return; 1101 } 1102 1103 if (find_and_check_chardev(&chr, s->outdev, errp) || 1104 !qemu_chr_fe_init(&s->chr_out, chr, errp)) { 1105 return; 1106 } 1107 1108 net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr); 1109 net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr); 1110 1111 /* Try to enable remote notify chardev, currently just for Xen COLO */ 1112 if (s->notify_dev) { 1113 if (find_and_check_chardev(&chr, s->notify_dev, errp) || 1114 !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) { 1115 return; 1116 } 1117 1118 net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize, 1119 s->vnet_hdr); 1120 } 1121 1122 QTAILQ_INSERT_TAIL(&net_compares, s, next); 1123 1124 g_queue_init(&s->conn_list); 1125 1126 qemu_mutex_init(&event_mtx); 1127 qemu_cond_init(&event_complete_cond); 1128 1129 s->connection_track_table = g_hash_table_new_full(connection_key_hash, 1130 connection_key_equal, 1131 g_free, 1132 connection_destroy); 1133 1134 colo_compare_iothread(s); 1135 return; 1136 } 1137 1138 static void colo_flush_packets(void *opaque, void *user_data) 1139 { 1140 CompareState *s = user_data; 1141 Connection *conn = opaque; 1142 Packet *pkt = NULL; 1143 1144 while (!g_queue_is_empty(&conn->primary_list)) { 1145 pkt = g_queue_pop_head(&conn->primary_list); 1146 compare_chr_send(s, 1147 pkt->data, 1148 pkt->size, 1149 pkt->vnet_hdr_len, 1150 false); 1151 packet_destroy(pkt, NULL); 1152 } 1153 while (!g_queue_is_empty(&conn->secondary_list)) { 1154 pkt = g_queue_pop_head(&conn->secondary_list); 1155 packet_destroy(pkt, NULL); 1156 } 1157 } 1158 1159 static void colo_compare_class_init(ObjectClass *oc, void *data) 1160 { 1161 UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc); 1162 1163 ucc->complete = colo_compare_complete; 1164 } 1165 1166 static void colo_compare_init(Object *obj) 1167 { 1168 CompareState *s = COLO_COMPARE(obj); 1169 1170 object_property_add_str(obj, "primary_in", 1171 compare_get_pri_indev, compare_set_pri_indev, 1172 NULL); 1173 object_property_add_str(obj, "secondary_in", 1174 compare_get_sec_indev, compare_set_sec_indev, 1175 NULL); 1176 object_property_add_str(obj, "outdev", 1177 compare_get_outdev, compare_set_outdev, 1178 NULL); 1179 object_property_add_link(obj, "iothread", TYPE_IOTHREAD, 1180 (Object **)&s->iothread, 1181 object_property_allow_set_link, 1182 OBJ_PROP_LINK_STRONG, NULL); 1183 /* This parameter just for Xen COLO */ 1184 object_property_add_str(obj, "notify_dev", 1185 compare_get_notify_dev, compare_set_notify_dev, 1186 NULL); 1187 1188 s->vnet_hdr = false; 1189 object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr, 1190 compare_set_vnet_hdr, NULL); 1191 } 1192 1193 static void colo_compare_finalize(Object *obj) 1194 { 1195 CompareState *s = COLO_COMPARE(obj); 1196 CompareState *tmp = NULL; 1197 1198 qemu_chr_fe_deinit(&s->chr_pri_in, false); 1199 qemu_chr_fe_deinit(&s->chr_sec_in, false); 1200 qemu_chr_fe_deinit(&s->chr_out, false); 1201 if (s->notify_dev) { 1202 qemu_chr_fe_deinit(&s->chr_notify_dev, false); 1203 } 1204 1205 if (s->iothread) { 1206 colo_compare_timer_del(s); 1207 } 1208 1209 qemu_bh_delete(s->event_bh); 1210 1211 QTAILQ_FOREACH(tmp, &net_compares, next) { 1212 if (tmp == s) { 1213 QTAILQ_REMOVE(&net_compares, s, next); 1214 break; 1215 } 1216 } 1217 1218 /* Release all unhandled packets after compare thead exited */ 1219 g_queue_foreach(&s->conn_list, colo_flush_packets, s); 1220 1221 g_queue_clear(&s->conn_list); 1222 1223 if (s->connection_track_table) { 1224 g_hash_table_destroy(s->connection_track_table); 1225 } 1226 1227 if (s->iothread) { 1228 object_unref(OBJECT(s->iothread)); 1229 } 1230 1231 qemu_mutex_destroy(&event_mtx); 1232 qemu_cond_destroy(&event_complete_cond); 1233 1234 g_free(s->pri_indev); 1235 g_free(s->sec_indev); 1236 g_free(s->outdev); 1237 g_free(s->notify_dev); 1238 } 1239 1240 static const TypeInfo colo_compare_info = { 1241 .name = TYPE_COLO_COMPARE, 1242 .parent = TYPE_OBJECT, 1243 .instance_size = sizeof(CompareState), 1244 .instance_init = colo_compare_init, 1245 .instance_finalize = colo_compare_finalize, 1246 .class_size = sizeof(CompareClass), 1247 .class_init = colo_compare_class_init, 1248 .interfaces = (InterfaceInfo[]) { 1249 { TYPE_USER_CREATABLE }, 1250 { } 1251 } 1252 }; 1253 1254 static void register_types(void) 1255 { 1256 type_register_static(&colo_compare_info); 1257 } 1258 1259 type_init(register_types); 1260