1 /* 2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) 3 * (a.k.a. Fault Tolerance or Continuous Replication) 4 * 5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. 6 * Copyright (c) 2016 FUJITSU LIMITED 7 * Copyright (c) 2016 Intel Corporation 8 * 9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2 or 12 * later. See the COPYING file in the top-level directory. 13 */ 14 15 #include "qemu/osdep.h" 16 #include "qemu/error-report.h" 17 #include "trace.h" 18 #include "qemu-common.h" 19 #include "qapi/error.h" 20 #include "net/net.h" 21 #include "net/eth.h" 22 #include "qom/object_interfaces.h" 23 #include "qemu/iov.h" 24 #include "qom/object.h" 25 #include "net/queue.h" 26 #include "chardev/char-fe.h" 27 #include "qemu/sockets.h" 28 #include "colo.h" 29 #include "sysemu/iothread.h" 30 #include "net/colo-compare.h" 31 #include "migration/colo.h" 32 #include "migration/migration.h" 33 #include "util.h" 34 35 #define TYPE_COLO_COMPARE "colo-compare" 36 #define COLO_COMPARE(obj) \ 37 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) 38 39 static QTAILQ_HEAD(, CompareState) net_compares = 40 QTAILQ_HEAD_INITIALIZER(net_compares); 41 42 static NotifierList colo_compare_notifiers = 43 NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers); 44 45 #define COMPARE_READ_LEN_MAX NET_BUFSIZE 46 #define MAX_QUEUE_SIZE 1024 47 48 #define COLO_COMPARE_FREE_PRIMARY 0x01 49 #define COLO_COMPARE_FREE_SECONDARY 0x02 50 51 /* TODO: Should be configurable */ 52 #define REGULAR_PACKET_CHECK_MS 3000 53 54 static QemuMutex event_mtx; 55 static QemuCond event_complete_cond; 56 static int event_unhandled_count; 57 58 /* 59 * + CompareState ++ 60 * | | 61 * +---------------+ +---------------+ +---------------+ 62 * | conn list + - > conn + ------- > conn + -- > ...... 63 * +---------------+ +---------------+ +---------------+ 64 * | | | | | | 65 * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+ 66 * |primary | |secondary |primary | |secondary 67 * |packet | |packet + |packet | |packet + 68 * +--------+ +--------+ +--------+ +--------+ 69 * | | | | 70 * +---v----+ +---v----+ +---v----+ +---v----+ 71 * |primary | |secondary |primary | |secondary 72 * |packet | |packet + |packet | |packet + 73 * +--------+ +--------+ +--------+ +--------+ 74 * | | | | 75 * +---v----+ +---v----+ +---v----+ +---v----+ 76 * |primary | |secondary |primary | |secondary 77 * |packet | |packet + |packet | |packet + 78 * +--------+ +--------+ +--------+ +--------+ 79 */ 80 typedef struct CompareState { 81 Object parent; 82 83 char *pri_indev; 84 char *sec_indev; 85 char *outdev; 86 CharBackend chr_pri_in; 87 CharBackend chr_sec_in; 88 CharBackend chr_out; 89 SocketReadState pri_rs; 90 SocketReadState sec_rs; 91 bool vnet_hdr; 92 93 /* 94 * Record the connection that through the NIC 95 * Element type: Connection 96 */ 97 GQueue conn_list; 98 /* Record the connection without repetition */ 99 GHashTable *connection_track_table; 100 101 IOThread *iothread; 102 GMainContext *worker_context; 103 QEMUTimer *packet_check_timer; 104 105 QEMUBH *event_bh; 106 enum colo_event event; 107 108 QTAILQ_ENTRY(CompareState) next; 109 } CompareState; 110 111 typedef struct CompareClass { 112 ObjectClass parent_class; 113 } CompareClass; 114 115 enum { 116 PRIMARY_IN = 0, 117 SECONDARY_IN, 118 }; 119 120 static void colo_compare_inconsistency_notify(void) 121 { 122 notifier_list_notify(&colo_compare_notifiers, 123 migrate_get_current()); 124 } 125 126 static int compare_chr_send(CompareState *s, 127 const uint8_t *buf, 128 uint32_t size, 129 uint32_t vnet_hdr_len); 130 131 static gint seq_sorter(Packet *a, Packet *b, gpointer data) 132 { 133 struct tcp_hdr *atcp, *btcp; 134 135 atcp = (struct tcp_hdr *)(a->transport_header); 136 btcp = (struct tcp_hdr *)(b->transport_header); 137 return ntohl(atcp->th_seq) - ntohl(btcp->th_seq); 138 } 139 140 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack) 141 { 142 Packet *pkt = data; 143 struct tcp_hdr *tcphd; 144 145 tcphd = (struct tcp_hdr *)pkt->transport_header; 146 147 pkt->tcp_seq = ntohl(tcphd->th_seq); 148 pkt->tcp_ack = ntohl(tcphd->th_ack); 149 *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack; 150 pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data 151 + (tcphd->th_off << 2) - pkt->vnet_hdr_len; 152 pkt->payload_size = pkt->size - pkt->header_size; 153 pkt->seq_end = pkt->tcp_seq + pkt->payload_size; 154 pkt->flags = tcphd->th_flags; 155 } 156 157 /* 158 * Return 1 on success, if return 0 means the 159 * packet will be dropped 160 */ 161 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack) 162 { 163 if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) { 164 if (pkt->ip->ip_p == IPPROTO_TCP) { 165 fill_pkt_tcp_info(pkt, max_ack); 166 g_queue_insert_sorted(queue, 167 pkt, 168 (GCompareDataFunc)seq_sorter, 169 NULL); 170 } else { 171 g_queue_push_tail(queue, pkt); 172 } 173 return 1; 174 } 175 return 0; 176 } 177 178 /* 179 * Return 0 on success, if return -1 means the pkt 180 * is unsupported(arp and ipv6) and will be sent later 181 */ 182 static int packet_enqueue(CompareState *s, int mode, Connection **con) 183 { 184 ConnectionKey key; 185 Packet *pkt = NULL; 186 Connection *conn; 187 188 if (mode == PRIMARY_IN) { 189 pkt = packet_new(s->pri_rs.buf, 190 s->pri_rs.packet_len, 191 s->pri_rs.vnet_hdr_len); 192 } else { 193 pkt = packet_new(s->sec_rs.buf, 194 s->sec_rs.packet_len, 195 s->sec_rs.vnet_hdr_len); 196 } 197 198 if (parse_packet_early(pkt)) { 199 packet_destroy(pkt, NULL); 200 pkt = NULL; 201 return -1; 202 } 203 fill_connection_key(pkt, &key); 204 205 conn = connection_get(s->connection_track_table, 206 &key, 207 &s->conn_list); 208 209 if (!conn->processing) { 210 g_queue_push_tail(&s->conn_list, conn); 211 conn->processing = true; 212 } 213 214 if (mode == PRIMARY_IN) { 215 if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) { 216 error_report("colo compare primary queue size too big," 217 "drop packet"); 218 } 219 } else { 220 if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) { 221 error_report("colo compare secondary queue size too big," 222 "drop packet"); 223 } 224 } 225 *con = conn; 226 227 return 0; 228 } 229 230 static inline bool after(uint32_t seq1, uint32_t seq2) 231 { 232 return (int32_t)(seq1 - seq2) > 0; 233 } 234 235 static void colo_release_primary_pkt(CompareState *s, Packet *pkt) 236 { 237 int ret; 238 ret = compare_chr_send(s, 239 pkt->data, 240 pkt->size, 241 pkt->vnet_hdr_len); 242 if (ret < 0) { 243 error_report("colo send primary packet failed"); 244 } 245 trace_colo_compare_main("packet same and release packet"); 246 packet_destroy(pkt, NULL); 247 } 248 249 /* 250 * The IP packets sent by primary and secondary 251 * will be compared in here 252 * TODO support ip fragment, Out-Of-Order 253 * return: 0 means packet same 254 * > 0 || < 0 means packet different 255 */ 256 static int colo_compare_packet_payload(Packet *ppkt, 257 Packet *spkt, 258 uint16_t poffset, 259 uint16_t soffset, 260 uint16_t len) 261 262 { 263 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 264 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20]; 265 266 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); 267 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); 268 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); 269 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); 270 271 trace_colo_compare_ip_info(ppkt->size, pri_ip_src, 272 pri_ip_dst, spkt->size, 273 sec_ip_src, sec_ip_dst); 274 } 275 276 return memcmp(ppkt->data + poffset, spkt->data + soffset, len); 277 } 278 279 /* 280 * return true means that the payload is consist and 281 * need to make the next comparison, false means do 282 * the checkpoint 283 */ 284 static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt, 285 int8_t *mark, uint32_t max_ack) 286 { 287 *mark = 0; 288 289 if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) { 290 if (colo_compare_packet_payload(ppkt, spkt, 291 ppkt->header_size, spkt->header_size, 292 ppkt->payload_size)) { 293 *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY; 294 return true; 295 } 296 } 297 if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) { 298 if (colo_compare_packet_payload(ppkt, spkt, 299 ppkt->header_size, spkt->header_size, 300 ppkt->payload_size)) { 301 *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY; 302 return true; 303 } 304 } 305 306 /* one part of secondary packet payload still need to be compared */ 307 if (!after(ppkt->seq_end, spkt->seq_end)) { 308 if (colo_compare_packet_payload(ppkt, spkt, 309 ppkt->header_size + ppkt->offset, 310 spkt->header_size + spkt->offset, 311 ppkt->payload_size - ppkt->offset)) { 312 if (!after(ppkt->tcp_ack, max_ack)) { 313 *mark = COLO_COMPARE_FREE_PRIMARY; 314 spkt->offset += ppkt->payload_size - ppkt->offset; 315 return true; 316 } else { 317 /* secondary guest hasn't ack the data, don't send 318 * out this packet 319 */ 320 return false; 321 } 322 } 323 } else { 324 /* primary packet is longer than secondary packet, compare 325 * the same part and mark the primary packet offset 326 */ 327 if (colo_compare_packet_payload(ppkt, spkt, 328 ppkt->header_size + ppkt->offset, 329 spkt->header_size + spkt->offset, 330 spkt->payload_size - spkt->offset)) { 331 *mark = COLO_COMPARE_FREE_SECONDARY; 332 ppkt->offset += spkt->payload_size - spkt->offset; 333 return true; 334 } 335 } 336 337 return false; 338 } 339 340 static void colo_compare_tcp(CompareState *s, Connection *conn) 341 { 342 Packet *ppkt = NULL, *spkt = NULL; 343 int8_t mark; 344 345 /* 346 * If ppkt and spkt have the same payload, but ppkt's ACK 347 * is greater than spkt's ACK, in this case we can not 348 * send the ppkt because it will cause the secondary guest 349 * to miss sending some data in the next. Therefore, we 350 * record the maximum ACK in the current queue at both 351 * primary side and secondary side. Only when the ack is 352 * less than the smaller of the two maximum ack, then we 353 * can ensure that the packet's payload is acknowledged by 354 * primary and secondary. 355 */ 356 uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack; 357 358 pri: 359 if (g_queue_is_empty(&conn->primary_list)) { 360 return; 361 } 362 ppkt = g_queue_pop_head(&conn->primary_list); 363 sec: 364 if (g_queue_is_empty(&conn->secondary_list)) { 365 g_queue_push_head(&conn->primary_list, ppkt); 366 return; 367 } 368 spkt = g_queue_pop_head(&conn->secondary_list); 369 370 if (ppkt->tcp_seq == ppkt->seq_end) { 371 colo_release_primary_pkt(s, ppkt); 372 ppkt = NULL; 373 } 374 375 if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) { 376 trace_colo_compare_main("pri: this packet has compared"); 377 colo_release_primary_pkt(s, ppkt); 378 ppkt = NULL; 379 } 380 381 if (spkt->tcp_seq == spkt->seq_end) { 382 packet_destroy(spkt, NULL); 383 if (!ppkt) { 384 goto pri; 385 } else { 386 goto sec; 387 } 388 } else { 389 if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) { 390 trace_colo_compare_main("sec: this packet has compared"); 391 packet_destroy(spkt, NULL); 392 if (!ppkt) { 393 goto pri; 394 } else { 395 goto sec; 396 } 397 } 398 if (!ppkt) { 399 g_queue_push_head(&conn->secondary_list, spkt); 400 goto pri; 401 } 402 } 403 404 if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) { 405 trace_colo_compare_tcp_info("pri", 406 ppkt->tcp_seq, ppkt->tcp_ack, 407 ppkt->header_size, ppkt->payload_size, 408 ppkt->offset, ppkt->flags); 409 410 trace_colo_compare_tcp_info("sec", 411 spkt->tcp_seq, spkt->tcp_ack, 412 spkt->header_size, spkt->payload_size, 413 spkt->offset, spkt->flags); 414 415 if (mark == COLO_COMPARE_FREE_PRIMARY) { 416 conn->compare_seq = ppkt->seq_end; 417 colo_release_primary_pkt(s, ppkt); 418 g_queue_push_head(&conn->secondary_list, spkt); 419 goto pri; 420 } 421 if (mark == COLO_COMPARE_FREE_SECONDARY) { 422 conn->compare_seq = spkt->seq_end; 423 packet_destroy(spkt, NULL); 424 goto sec; 425 } 426 if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) { 427 conn->compare_seq = ppkt->seq_end; 428 colo_release_primary_pkt(s, ppkt); 429 packet_destroy(spkt, NULL); 430 goto pri; 431 } 432 } else { 433 g_queue_push_head(&conn->primary_list, ppkt); 434 g_queue_push_head(&conn->secondary_list, spkt); 435 436 qemu_hexdump((char *)ppkt->data, stderr, 437 "colo-compare ppkt", ppkt->size); 438 qemu_hexdump((char *)spkt->data, stderr, 439 "colo-compare spkt", spkt->size); 440 441 colo_compare_inconsistency_notify(); 442 } 443 } 444 445 446 /* 447 * Called from the compare thread on the primary 448 * for compare udp packet 449 */ 450 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt) 451 { 452 uint16_t network_header_length = ppkt->ip->ip_hl << 2; 453 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len; 454 455 trace_colo_compare_main("compare udp"); 456 457 /* 458 * Because of ppkt and spkt are both in the same connection, 459 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are 460 * same with spkt. In addition, IP header's Identification is a random 461 * field, we can handle it in IP fragmentation function later. 462 * COLO just concern the response net packet payload from primary guest 463 * and secondary guest are same or not, So we ignored all IP header include 464 * other field like TOS,TTL,IP Checksum. we only need to compare 465 * the ip payload here. 466 */ 467 if (ppkt->size != spkt->size) { 468 trace_colo_compare_main("UDP: payload size of packets are different"); 469 return -1; 470 } 471 if (colo_compare_packet_payload(ppkt, spkt, offset, offset, 472 ppkt->size - offset)) { 473 trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size); 474 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size); 475 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 476 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt", 477 ppkt->size); 478 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt", 479 spkt->size); 480 } 481 return -1; 482 } else { 483 return 0; 484 } 485 } 486 487 /* 488 * Called from the compare thread on the primary 489 * for compare icmp packet 490 */ 491 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt) 492 { 493 uint16_t network_header_length = ppkt->ip->ip_hl << 2; 494 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len; 495 496 trace_colo_compare_main("compare icmp"); 497 498 /* 499 * Because of ppkt and spkt are both in the same connection, 500 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are 501 * same with spkt. In addition, IP header's Identification is a random 502 * field, we can handle it in IP fragmentation function later. 503 * COLO just concern the response net packet payload from primary guest 504 * and secondary guest are same or not, So we ignored all IP header include 505 * other field like TOS,TTL,IP Checksum. we only need to compare 506 * the ip payload here. 507 */ 508 if (ppkt->size != spkt->size) { 509 trace_colo_compare_main("ICMP: payload size of packets are different"); 510 return -1; 511 } 512 if (colo_compare_packet_payload(ppkt, spkt, offset, offset, 513 ppkt->size - offset)) { 514 trace_colo_compare_icmp_miscompare("primary pkt size", 515 ppkt->size); 516 trace_colo_compare_icmp_miscompare("Secondary pkt size", 517 spkt->size); 518 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 519 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt", 520 ppkt->size); 521 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt", 522 spkt->size); 523 } 524 return -1; 525 } else { 526 return 0; 527 } 528 } 529 530 /* 531 * Called from the compare thread on the primary 532 * for compare other packet 533 */ 534 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt) 535 { 536 uint16_t offset = ppkt->vnet_hdr_len; 537 538 trace_colo_compare_main("compare other"); 539 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 540 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20]; 541 542 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); 543 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); 544 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); 545 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); 546 547 trace_colo_compare_ip_info(ppkt->size, pri_ip_src, 548 pri_ip_dst, spkt->size, 549 sec_ip_src, sec_ip_dst); 550 } 551 552 if (ppkt->size != spkt->size) { 553 trace_colo_compare_main("Other: payload size of packets are different"); 554 return -1; 555 } 556 return colo_compare_packet_payload(ppkt, spkt, offset, offset, 557 ppkt->size - offset); 558 } 559 560 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time) 561 { 562 int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST); 563 564 if ((now - pkt->creation_ms) > (*check_time)) { 565 trace_colo_old_packet_check_found(pkt->creation_ms); 566 return 0; 567 } else { 568 return 1; 569 } 570 } 571 572 void colo_compare_register_notifier(Notifier *notify) 573 { 574 notifier_list_add(&colo_compare_notifiers, notify); 575 } 576 577 void colo_compare_unregister_notifier(Notifier *notify) 578 { 579 notifier_remove(notify); 580 } 581 582 static int colo_old_packet_check_one_conn(Connection *conn, 583 void *user_data) 584 { 585 GList *result = NULL; 586 int64_t check_time = REGULAR_PACKET_CHECK_MS; 587 588 result = g_queue_find_custom(&conn->primary_list, 589 &check_time, 590 (GCompareFunc)colo_old_packet_check_one); 591 592 if (result) { 593 /* Do checkpoint will flush old packet */ 594 colo_compare_inconsistency_notify(); 595 return 0; 596 } 597 598 return 1; 599 } 600 601 /* 602 * Look for old packets that the secondary hasn't matched, 603 * if we have some then we have to checkpoint to wake 604 * the secondary up. 605 */ 606 static void colo_old_packet_check(void *opaque) 607 { 608 CompareState *s = opaque; 609 610 /* 611 * If we find one old packet, stop finding job and notify 612 * COLO frame do checkpoint. 613 */ 614 g_queue_find_custom(&s->conn_list, NULL, 615 (GCompareFunc)colo_old_packet_check_one_conn); 616 } 617 618 static void colo_compare_packet(CompareState *s, Connection *conn, 619 int (*HandlePacket)(Packet *spkt, 620 Packet *ppkt)) 621 { 622 Packet *pkt = NULL; 623 GList *result = NULL; 624 625 while (!g_queue_is_empty(&conn->primary_list) && 626 !g_queue_is_empty(&conn->secondary_list)) { 627 pkt = g_queue_pop_head(&conn->primary_list); 628 result = g_queue_find_custom(&conn->secondary_list, 629 pkt, (GCompareFunc)HandlePacket); 630 631 if (result) { 632 colo_release_primary_pkt(s, pkt); 633 g_queue_remove(&conn->secondary_list, result->data); 634 } else { 635 /* 636 * If one packet arrive late, the secondary_list or 637 * primary_list will be empty, so we can't compare it 638 * until next comparison. If the packets in the list are 639 * timeout, it will trigger a checkpoint request. 640 */ 641 trace_colo_compare_main("packet different"); 642 g_queue_push_head(&conn->primary_list, pkt); 643 colo_compare_inconsistency_notify(); 644 break; 645 } 646 } 647 } 648 649 /* 650 * Called from the compare thread on the primary 651 * for compare packet with secondary list of the 652 * specified connection when a new packet was 653 * queued to it. 654 */ 655 static void colo_compare_connection(void *opaque, void *user_data) 656 { 657 CompareState *s = user_data; 658 Connection *conn = opaque; 659 660 switch (conn->ip_proto) { 661 case IPPROTO_TCP: 662 colo_compare_tcp(s, conn); 663 break; 664 case IPPROTO_UDP: 665 colo_compare_packet(s, conn, colo_packet_compare_udp); 666 break; 667 case IPPROTO_ICMP: 668 colo_compare_packet(s, conn, colo_packet_compare_icmp); 669 break; 670 default: 671 colo_compare_packet(s, conn, colo_packet_compare_other); 672 break; 673 } 674 } 675 676 static int compare_chr_send(CompareState *s, 677 const uint8_t *buf, 678 uint32_t size, 679 uint32_t vnet_hdr_len) 680 { 681 int ret = 0; 682 uint32_t len = htonl(size); 683 684 if (!size) { 685 return 0; 686 } 687 688 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); 689 if (ret != sizeof(len)) { 690 goto err; 691 } 692 693 if (s->vnet_hdr) { 694 /* 695 * We send vnet header len make other module(like filter-redirector) 696 * know how to parse net packet correctly. 697 */ 698 len = htonl(vnet_hdr_len); 699 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); 700 if (ret != sizeof(len)) { 701 goto err; 702 } 703 } 704 705 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size); 706 if (ret != size) { 707 goto err; 708 } 709 710 return 0; 711 712 err: 713 return ret < 0 ? ret : -EIO; 714 } 715 716 static int compare_chr_can_read(void *opaque) 717 { 718 return COMPARE_READ_LEN_MAX; 719 } 720 721 /* 722 * Called from the main thread on the primary for packets 723 * arriving over the socket from the primary. 724 */ 725 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) 726 { 727 CompareState *s = COLO_COMPARE(opaque); 728 int ret; 729 730 ret = net_fill_rstate(&s->pri_rs, buf, size); 731 if (ret == -1) { 732 qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL, 733 NULL, NULL, true); 734 error_report("colo-compare primary_in error"); 735 } 736 } 737 738 /* 739 * Called from the main thread on the primary for packets 740 * arriving over the socket from the secondary. 741 */ 742 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) 743 { 744 CompareState *s = COLO_COMPARE(opaque); 745 int ret; 746 747 ret = net_fill_rstate(&s->sec_rs, buf, size); 748 if (ret == -1) { 749 qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL, 750 NULL, NULL, true); 751 error_report("colo-compare secondary_in error"); 752 } 753 } 754 755 /* 756 * Check old packet regularly so it can watch for any packets 757 * that the secondary hasn't produced equivalents of. 758 */ 759 static void check_old_packet_regular(void *opaque) 760 { 761 CompareState *s = opaque; 762 763 /* if have old packet we will notify checkpoint */ 764 colo_old_packet_check(s); 765 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + 766 REGULAR_PACKET_CHECK_MS); 767 } 768 769 /* Public API, Used for COLO frame to notify compare event */ 770 void colo_notify_compares_event(void *opaque, int event, Error **errp) 771 { 772 CompareState *s; 773 774 qemu_mutex_lock(&event_mtx); 775 QTAILQ_FOREACH(s, &net_compares, next) { 776 s->event = event; 777 qemu_bh_schedule(s->event_bh); 778 event_unhandled_count++; 779 } 780 /* Wait all compare threads to finish handling this event */ 781 while (event_unhandled_count > 0) { 782 qemu_cond_wait(&event_complete_cond, &event_mtx); 783 } 784 785 qemu_mutex_unlock(&event_mtx); 786 } 787 788 static void colo_compare_timer_init(CompareState *s) 789 { 790 AioContext *ctx = iothread_get_aio_context(s->iothread); 791 792 s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL, 793 SCALE_MS, check_old_packet_regular, 794 s); 795 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + 796 REGULAR_PACKET_CHECK_MS); 797 } 798 799 static void colo_compare_timer_del(CompareState *s) 800 { 801 if (s->packet_check_timer) { 802 timer_del(s->packet_check_timer); 803 timer_free(s->packet_check_timer); 804 s->packet_check_timer = NULL; 805 } 806 } 807 808 static void colo_flush_packets(void *opaque, void *user_data); 809 810 static void colo_compare_handle_event(void *opaque) 811 { 812 CompareState *s = opaque; 813 814 switch (s->event) { 815 case COLO_EVENT_CHECKPOINT: 816 g_queue_foreach(&s->conn_list, colo_flush_packets, s); 817 break; 818 case COLO_EVENT_FAILOVER: 819 break; 820 default: 821 break; 822 } 823 824 assert(event_unhandled_count > 0); 825 826 qemu_mutex_lock(&event_mtx); 827 event_unhandled_count--; 828 qemu_cond_broadcast(&event_complete_cond); 829 qemu_mutex_unlock(&event_mtx); 830 } 831 832 static void colo_compare_iothread(CompareState *s) 833 { 834 object_ref(OBJECT(s->iothread)); 835 s->worker_context = iothread_get_g_main_context(s->iothread); 836 837 qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read, 838 compare_pri_chr_in, NULL, NULL, 839 s, s->worker_context, true); 840 qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read, 841 compare_sec_chr_in, NULL, NULL, 842 s, s->worker_context, true); 843 844 colo_compare_timer_init(s); 845 s->event_bh = qemu_bh_new(colo_compare_handle_event, s); 846 } 847 848 static char *compare_get_pri_indev(Object *obj, Error **errp) 849 { 850 CompareState *s = COLO_COMPARE(obj); 851 852 return g_strdup(s->pri_indev); 853 } 854 855 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp) 856 { 857 CompareState *s = COLO_COMPARE(obj); 858 859 g_free(s->pri_indev); 860 s->pri_indev = g_strdup(value); 861 } 862 863 static char *compare_get_sec_indev(Object *obj, Error **errp) 864 { 865 CompareState *s = COLO_COMPARE(obj); 866 867 return g_strdup(s->sec_indev); 868 } 869 870 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp) 871 { 872 CompareState *s = COLO_COMPARE(obj); 873 874 g_free(s->sec_indev); 875 s->sec_indev = g_strdup(value); 876 } 877 878 static char *compare_get_outdev(Object *obj, Error **errp) 879 { 880 CompareState *s = COLO_COMPARE(obj); 881 882 return g_strdup(s->outdev); 883 } 884 885 static void compare_set_outdev(Object *obj, const char *value, Error **errp) 886 { 887 CompareState *s = COLO_COMPARE(obj); 888 889 g_free(s->outdev); 890 s->outdev = g_strdup(value); 891 } 892 893 static bool compare_get_vnet_hdr(Object *obj, Error **errp) 894 { 895 CompareState *s = COLO_COMPARE(obj); 896 897 return s->vnet_hdr; 898 } 899 900 static void compare_set_vnet_hdr(Object *obj, 901 bool value, 902 Error **errp) 903 { 904 CompareState *s = COLO_COMPARE(obj); 905 906 s->vnet_hdr = value; 907 } 908 909 static void compare_pri_rs_finalize(SocketReadState *pri_rs) 910 { 911 CompareState *s = container_of(pri_rs, CompareState, pri_rs); 912 Connection *conn = NULL; 913 914 if (packet_enqueue(s, PRIMARY_IN, &conn)) { 915 trace_colo_compare_main("primary: unsupported packet in"); 916 compare_chr_send(s, 917 pri_rs->buf, 918 pri_rs->packet_len, 919 pri_rs->vnet_hdr_len); 920 } else { 921 /* compare packet in the specified connection */ 922 colo_compare_connection(conn, s); 923 } 924 } 925 926 static void compare_sec_rs_finalize(SocketReadState *sec_rs) 927 { 928 CompareState *s = container_of(sec_rs, CompareState, sec_rs); 929 Connection *conn = NULL; 930 931 if (packet_enqueue(s, SECONDARY_IN, &conn)) { 932 trace_colo_compare_main("secondary: unsupported packet in"); 933 } else { 934 /* compare packet in the specified connection */ 935 colo_compare_connection(conn, s); 936 } 937 } 938 939 940 /* 941 * Return 0 is success. 942 * Return 1 is failed. 943 */ 944 static int find_and_check_chardev(Chardev **chr, 945 char *chr_name, 946 Error **errp) 947 { 948 *chr = qemu_chr_find(chr_name); 949 if (*chr == NULL) { 950 error_setg(errp, "Device '%s' not found", 951 chr_name); 952 return 1; 953 } 954 955 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) { 956 error_setg(errp, "chardev \"%s\" is not reconnectable", 957 chr_name); 958 return 1; 959 } 960 961 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) { 962 error_setg(errp, "chardev \"%s\" cannot switch context", 963 chr_name); 964 return 1; 965 } 966 967 return 0; 968 } 969 970 /* 971 * Called from the main thread on the primary 972 * to setup colo-compare. 973 */ 974 static void colo_compare_complete(UserCreatable *uc, Error **errp) 975 { 976 CompareState *s = COLO_COMPARE(uc); 977 Chardev *chr; 978 979 if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) { 980 error_setg(errp, "colo compare needs 'primary_in' ," 981 "'secondary_in','outdev','iothread' property set"); 982 return; 983 } else if (!strcmp(s->pri_indev, s->outdev) || 984 !strcmp(s->sec_indev, s->outdev) || 985 !strcmp(s->pri_indev, s->sec_indev)) { 986 error_setg(errp, "'indev' and 'outdev' could not be same " 987 "for compare module"); 988 return; 989 } 990 991 if (find_and_check_chardev(&chr, s->pri_indev, errp) || 992 !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) { 993 return; 994 } 995 996 if (find_and_check_chardev(&chr, s->sec_indev, errp) || 997 !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) { 998 return; 999 } 1000 1001 if (find_and_check_chardev(&chr, s->outdev, errp) || 1002 !qemu_chr_fe_init(&s->chr_out, chr, errp)) { 1003 return; 1004 } 1005 1006 net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr); 1007 net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr); 1008 1009 QTAILQ_INSERT_TAIL(&net_compares, s, next); 1010 1011 g_queue_init(&s->conn_list); 1012 1013 qemu_mutex_init(&event_mtx); 1014 qemu_cond_init(&event_complete_cond); 1015 1016 s->connection_track_table = g_hash_table_new_full(connection_key_hash, 1017 connection_key_equal, 1018 g_free, 1019 connection_destroy); 1020 1021 colo_compare_iothread(s); 1022 return; 1023 } 1024 1025 static void colo_flush_packets(void *opaque, void *user_data) 1026 { 1027 CompareState *s = user_data; 1028 Connection *conn = opaque; 1029 Packet *pkt = NULL; 1030 1031 while (!g_queue_is_empty(&conn->primary_list)) { 1032 pkt = g_queue_pop_head(&conn->primary_list); 1033 compare_chr_send(s, 1034 pkt->data, 1035 pkt->size, 1036 pkt->vnet_hdr_len); 1037 packet_destroy(pkt, NULL); 1038 } 1039 while (!g_queue_is_empty(&conn->secondary_list)) { 1040 pkt = g_queue_pop_head(&conn->secondary_list); 1041 packet_destroy(pkt, NULL); 1042 } 1043 } 1044 1045 static void colo_compare_class_init(ObjectClass *oc, void *data) 1046 { 1047 UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc); 1048 1049 ucc->complete = colo_compare_complete; 1050 } 1051 1052 static void colo_compare_init(Object *obj) 1053 { 1054 CompareState *s = COLO_COMPARE(obj); 1055 1056 object_property_add_str(obj, "primary_in", 1057 compare_get_pri_indev, compare_set_pri_indev, 1058 NULL); 1059 object_property_add_str(obj, "secondary_in", 1060 compare_get_sec_indev, compare_set_sec_indev, 1061 NULL); 1062 object_property_add_str(obj, "outdev", 1063 compare_get_outdev, compare_set_outdev, 1064 NULL); 1065 object_property_add_link(obj, "iothread", TYPE_IOTHREAD, 1066 (Object **)&s->iothread, 1067 object_property_allow_set_link, 1068 OBJ_PROP_LINK_STRONG, NULL); 1069 1070 s->vnet_hdr = false; 1071 object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr, 1072 compare_set_vnet_hdr, NULL); 1073 } 1074 1075 static void colo_compare_finalize(Object *obj) 1076 { 1077 CompareState *s = COLO_COMPARE(obj); 1078 CompareState *tmp = NULL; 1079 1080 qemu_chr_fe_deinit(&s->chr_pri_in, false); 1081 qemu_chr_fe_deinit(&s->chr_sec_in, false); 1082 qemu_chr_fe_deinit(&s->chr_out, false); 1083 if (s->iothread) { 1084 colo_compare_timer_del(s); 1085 } 1086 1087 qemu_bh_delete(s->event_bh); 1088 1089 QTAILQ_FOREACH(tmp, &net_compares, next) { 1090 if (tmp == s) { 1091 QTAILQ_REMOVE(&net_compares, s, next); 1092 break; 1093 } 1094 } 1095 1096 /* Release all unhandled packets after compare thead exited */ 1097 g_queue_foreach(&s->conn_list, colo_flush_packets, s); 1098 1099 g_queue_clear(&s->conn_list); 1100 1101 if (s->connection_track_table) { 1102 g_hash_table_destroy(s->connection_track_table); 1103 } 1104 1105 if (s->iothread) { 1106 object_unref(OBJECT(s->iothread)); 1107 } 1108 1109 qemu_mutex_destroy(&event_mtx); 1110 qemu_cond_destroy(&event_complete_cond); 1111 1112 g_free(s->pri_indev); 1113 g_free(s->sec_indev); 1114 g_free(s->outdev); 1115 } 1116 1117 static const TypeInfo colo_compare_info = { 1118 .name = TYPE_COLO_COMPARE, 1119 .parent = TYPE_OBJECT, 1120 .instance_size = sizeof(CompareState), 1121 .instance_init = colo_compare_init, 1122 .instance_finalize = colo_compare_finalize, 1123 .class_size = sizeof(CompareClass), 1124 .class_init = colo_compare_class_init, 1125 .interfaces = (InterfaceInfo[]) { 1126 { TYPE_USER_CREATABLE }, 1127 { } 1128 } 1129 }; 1130 1131 static void register_types(void) 1132 { 1133 type_register_static(&colo_compare_info); 1134 } 1135 1136 type_init(register_types); 1137