1 /* 2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) 3 * (a.k.a. Fault Tolerance or Continuous Replication) 4 * 5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. 6 * Copyright (c) 2016 FUJITSU LIMITED 7 * Copyright (c) 2016 Intel Corporation 8 * 9 * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2 or 12 * later. See the COPYING file in the top-level directory. 13 */ 14 15 #include "qemu/osdep.h" 16 #include "qemu/error-report.h" 17 #include "trace.h" 18 #include "qemu-common.h" 19 #include "qapi/qmp/qerror.h" 20 #include "qapi/error.h" 21 #include "net/net.h" 22 #include "net/eth.h" 23 #include "qom/object_interfaces.h" 24 #include "qemu/iov.h" 25 #include "qom/object.h" 26 #include "qemu/typedefs.h" 27 #include "net/queue.h" 28 #include "chardev/char-fe.h" 29 #include "qemu/sockets.h" 30 #include "qapi-visit.h" 31 #include "net/colo.h" 32 #include "sysemu/iothread.h" 33 34 #define TYPE_COLO_COMPARE "colo-compare" 35 #define COLO_COMPARE(obj) \ 36 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) 37 38 #define COMPARE_READ_LEN_MAX NET_BUFSIZE 39 #define MAX_QUEUE_SIZE 1024 40 41 /* TODO: Should be configurable */ 42 #define REGULAR_PACKET_CHECK_MS 3000 43 44 /* 45 * + CompareState ++ 46 * | | 47 * +---------------+ +---------------+ +---------------+ 48 * | conn list + - > conn + ------- > conn + -- > ...... 49 * +---------------+ +---------------+ +---------------+ 50 * | | | | | | 51 * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+ 52 * |primary | |secondary |primary | |secondary 53 * |packet | |packet + |packet | |packet + 54 * +--------+ +--------+ +--------+ +--------+ 55 * | | | | 56 * +---v----+ +---v----+ +---v----+ +---v----+ 57 * |primary | |secondary |primary | |secondary 58 * |packet | |packet + |packet | |packet + 59 * +--------+ +--------+ +--------+ +--------+ 60 * | | | | 61 * +---v----+ +---v----+ +---v----+ +---v----+ 62 * |primary | |secondary |primary | |secondary 63 * |packet | |packet + |packet | |packet + 64 * +--------+ +--------+ +--------+ +--------+ 65 */ 66 typedef struct CompareState { 67 Object parent; 68 69 char *pri_indev; 70 char *sec_indev; 71 char *outdev; 72 CharBackend chr_pri_in; 73 CharBackend chr_sec_in; 74 CharBackend chr_out; 75 SocketReadState pri_rs; 76 SocketReadState sec_rs; 77 bool vnet_hdr; 78 79 /* 80 * Record the connection that through the NIC 81 * Element type: Connection 82 */ 83 GQueue conn_list; 84 /* Record the connection without repetition */ 85 GHashTable *connection_track_table; 86 87 IOThread *iothread; 88 GMainContext *worker_context; 89 QEMUTimer *packet_check_timer; 90 } CompareState; 91 92 typedef struct CompareClass { 93 ObjectClass parent_class; 94 } CompareClass; 95 96 enum { 97 PRIMARY_IN = 0, 98 SECONDARY_IN, 99 }; 100 101 static int compare_chr_send(CompareState *s, 102 const uint8_t *buf, 103 uint32_t size, 104 uint32_t vnet_hdr_len); 105 106 static gint seq_sorter(Packet *a, Packet *b, gpointer data) 107 { 108 struct tcphdr *atcp, *btcp; 109 110 atcp = (struct tcphdr *)(a->transport_header); 111 btcp = (struct tcphdr *)(b->transport_header); 112 return ntohl(atcp->th_seq) - ntohl(btcp->th_seq); 113 } 114 115 /* 116 * Return 0 on success, if return -1 means the pkt 117 * is unsupported(arp and ipv6) and will be sent later 118 */ 119 static int packet_enqueue(CompareState *s, int mode) 120 { 121 ConnectionKey key; 122 Packet *pkt = NULL; 123 Connection *conn; 124 125 if (mode == PRIMARY_IN) { 126 pkt = packet_new(s->pri_rs.buf, 127 s->pri_rs.packet_len, 128 s->pri_rs.vnet_hdr_len); 129 } else { 130 pkt = packet_new(s->sec_rs.buf, 131 s->sec_rs.packet_len, 132 s->sec_rs.vnet_hdr_len); 133 } 134 135 if (parse_packet_early(pkt)) { 136 packet_destroy(pkt, NULL); 137 pkt = NULL; 138 return -1; 139 } 140 fill_connection_key(pkt, &key); 141 142 conn = connection_get(s->connection_track_table, 143 &key, 144 &s->conn_list); 145 146 if (!conn->processing) { 147 g_queue_push_tail(&s->conn_list, conn); 148 conn->processing = true; 149 } 150 151 if (mode == PRIMARY_IN) { 152 if (g_queue_get_length(&conn->primary_list) <= 153 MAX_QUEUE_SIZE) { 154 g_queue_push_tail(&conn->primary_list, pkt); 155 if (conn->ip_proto == IPPROTO_TCP) { 156 g_queue_sort(&conn->primary_list, 157 (GCompareDataFunc)seq_sorter, 158 NULL); 159 } 160 } else { 161 error_report("colo compare primary queue size too big," 162 "drop packet"); 163 } 164 } else { 165 if (g_queue_get_length(&conn->secondary_list) <= 166 MAX_QUEUE_SIZE) { 167 g_queue_push_tail(&conn->secondary_list, pkt); 168 if (conn->ip_proto == IPPROTO_TCP) { 169 g_queue_sort(&conn->secondary_list, 170 (GCompareDataFunc)seq_sorter, 171 NULL); 172 } 173 } else { 174 error_report("colo compare secondary queue size too big," 175 "drop packet"); 176 } 177 } 178 179 return 0; 180 } 181 182 /* 183 * The IP packets sent by primary and secondary 184 * will be compared in here 185 * TODO support ip fragment, Out-Of-Order 186 * return: 0 means packet same 187 * > 0 || < 0 means packet different 188 */ 189 static int colo_packet_compare_common(Packet *ppkt, 190 Packet *spkt, 191 int poffset, 192 int soffset) 193 { 194 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 195 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20]; 196 197 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); 198 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); 199 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); 200 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); 201 202 trace_colo_compare_ip_info(ppkt->size, pri_ip_src, 203 pri_ip_dst, spkt->size, 204 sec_ip_src, sec_ip_dst); 205 } 206 207 poffset = ppkt->vnet_hdr_len + poffset; 208 soffset = ppkt->vnet_hdr_len + soffset; 209 210 if (ppkt->size - poffset == spkt->size - soffset) { 211 return memcmp(ppkt->data + poffset, 212 spkt->data + soffset, 213 spkt->size - soffset); 214 } else { 215 trace_colo_compare_main("Net packet size are not the same"); 216 return -1; 217 } 218 } 219 220 /* 221 * Called from the compare thread on the primary 222 * for compare tcp packet 223 * compare_tcp copied from Dr. David Alan Gilbert's branch 224 */ 225 static int colo_packet_compare_tcp(Packet *spkt, Packet *ppkt) 226 { 227 struct tcphdr *ptcp, *stcp; 228 int res; 229 230 trace_colo_compare_main("compare tcp"); 231 232 ptcp = (struct tcphdr *)ppkt->transport_header; 233 stcp = (struct tcphdr *)spkt->transport_header; 234 235 /* 236 * The 'identification' field in the IP header is *very* random 237 * it almost never matches. Fudge this by ignoring differences in 238 * unfragmented packets; they'll normally sort themselves out if different 239 * anyway, and it should recover at the TCP level. 240 * An alternative would be to get both the primary and secondary to rewrite 241 * somehow; but that would need some sync traffic to sync the state 242 */ 243 if (ntohs(ppkt->ip->ip_off) & IP_DF) { 244 spkt->ip->ip_id = ppkt->ip->ip_id; 245 /* and the sum will be different if the IDs were different */ 246 spkt->ip->ip_sum = ppkt->ip->ip_sum; 247 } 248 249 /* 250 * Check tcp header length for tcp option field. 251 * th_off > 5 means this tcp packet have options field. 252 * The tcp options maybe always different. 253 * for example: 254 * From RFC 7323. 255 * TCP Timestamps option (TSopt): 256 * Kind: 8 257 * 258 * Length: 10 bytes 259 * 260 * +-------+-------+---------------------+---------------------+ 261 * |Kind=8 | 10 | TS Value (TSval) |TS Echo Reply (TSecr)| 262 * +-------+-------+---------------------+---------------------+ 263 * 1 1 4 4 264 * 265 * In this case the primary guest's timestamp always different with 266 * the secondary guest's timestamp. COLO just focus on payload, 267 * so we just need skip this field. 268 */ 269 if (ptcp->th_off > 5) { 270 ptrdiff_t ptcp_offset, stcp_offset; 271 272 ptcp_offset = ppkt->transport_header - (uint8_t *)ppkt->data 273 + (ptcp->th_off * 4) - ppkt->vnet_hdr_len; 274 stcp_offset = spkt->transport_header - (uint8_t *)spkt->data 275 + (stcp->th_off * 4) - spkt->vnet_hdr_len; 276 277 /* 278 * When network is busy, some tcp options(like sack) will unpredictable 279 * occur in primary side or secondary side. it will make packet size 280 * not same, but the two packet's payload is identical. colo just 281 * care about packet payload, so we skip the option field. 282 */ 283 res = colo_packet_compare_common(ppkt, spkt, ptcp_offset, stcp_offset); 284 } else if (ptcp->th_sum == stcp->th_sum) { 285 res = colo_packet_compare_common(ppkt, spkt, ETH_HLEN, ETH_HLEN); 286 } else { 287 res = -1; 288 } 289 290 if (res != 0 && 291 trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 292 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20]; 293 294 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); 295 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); 296 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); 297 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); 298 299 trace_colo_compare_ip_info(ppkt->size, pri_ip_src, 300 pri_ip_dst, spkt->size, 301 sec_ip_src, sec_ip_dst); 302 303 trace_colo_compare_tcp_info("pri tcp packet", 304 ntohl(ptcp->th_seq), 305 ntohl(ptcp->th_ack), 306 res, ptcp->th_flags, 307 ppkt->size); 308 309 trace_colo_compare_tcp_info("sec tcp packet", 310 ntohl(stcp->th_seq), 311 ntohl(stcp->th_ack), 312 res, stcp->th_flags, 313 spkt->size); 314 315 qemu_hexdump((char *)ppkt->data, stderr, 316 "colo-compare ppkt", ppkt->size); 317 qemu_hexdump((char *)spkt->data, stderr, 318 "colo-compare spkt", spkt->size); 319 } 320 321 return res; 322 } 323 324 /* 325 * Called from the compare thread on the primary 326 * for compare udp packet 327 */ 328 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt) 329 { 330 int ret; 331 int network_header_length = ppkt->ip->ip_hl * 4; 332 333 trace_colo_compare_main("compare udp"); 334 335 /* 336 * Because of ppkt and spkt are both in the same connection, 337 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are 338 * same with spkt. In addition, IP header's Identification is a random 339 * field, we can handle it in IP fragmentation function later. 340 * COLO just concern the response net packet payload from primary guest 341 * and secondary guest are same or not, So we ignored all IP header include 342 * other field like TOS,TTL,IP Checksum. we only need to compare 343 * the ip payload here. 344 */ 345 ret = colo_packet_compare_common(ppkt, spkt, 346 network_header_length + ETH_HLEN, 347 network_header_length + ETH_HLEN); 348 349 if (ret) { 350 trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size); 351 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size); 352 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 353 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt", 354 ppkt->size); 355 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt", 356 spkt->size); 357 } 358 } 359 360 return ret; 361 } 362 363 /* 364 * Called from the compare thread on the primary 365 * for compare icmp packet 366 */ 367 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt) 368 { 369 int network_header_length = ppkt->ip->ip_hl * 4; 370 371 trace_colo_compare_main("compare icmp"); 372 373 /* 374 * Because of ppkt and spkt are both in the same connection, 375 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are 376 * same with spkt. In addition, IP header's Identification is a random 377 * field, we can handle it in IP fragmentation function later. 378 * COLO just concern the response net packet payload from primary guest 379 * and secondary guest are same or not, So we ignored all IP header include 380 * other field like TOS,TTL,IP Checksum. we only need to compare 381 * the ip payload here. 382 */ 383 if (colo_packet_compare_common(ppkt, spkt, 384 network_header_length + ETH_HLEN, 385 network_header_length + ETH_HLEN)) { 386 trace_colo_compare_icmp_miscompare("primary pkt size", 387 ppkt->size); 388 trace_colo_compare_icmp_miscompare("Secondary pkt size", 389 spkt->size); 390 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 391 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt", 392 ppkt->size); 393 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt", 394 spkt->size); 395 } 396 return -1; 397 } else { 398 return 0; 399 } 400 } 401 402 /* 403 * Called from the compare thread on the primary 404 * for compare other packet 405 */ 406 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt) 407 { 408 trace_colo_compare_main("compare other"); 409 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) { 410 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20]; 411 412 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src)); 413 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst)); 414 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src)); 415 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst)); 416 417 trace_colo_compare_ip_info(ppkt->size, pri_ip_src, 418 pri_ip_dst, spkt->size, 419 sec_ip_src, sec_ip_dst); 420 } 421 422 return colo_packet_compare_common(ppkt, spkt, 0, 0); 423 } 424 425 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time) 426 { 427 int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST); 428 429 if ((now - pkt->creation_ms) > (*check_time)) { 430 trace_colo_old_packet_check_found(pkt->creation_ms); 431 return 0; 432 } else { 433 return 1; 434 } 435 } 436 437 static int colo_old_packet_check_one_conn(Connection *conn, 438 void *user_data) 439 { 440 GList *result = NULL; 441 int64_t check_time = REGULAR_PACKET_CHECK_MS; 442 443 result = g_queue_find_custom(&conn->primary_list, 444 &check_time, 445 (GCompareFunc)colo_old_packet_check_one); 446 447 if (result) { 448 /* Do checkpoint will flush old packet */ 449 /* 450 * TODO: Notify colo frame to do checkpoint. 451 * colo_compare_inconsistent_notify(); 452 */ 453 return 0; 454 } 455 456 return 1; 457 } 458 459 /* 460 * Look for old packets that the secondary hasn't matched, 461 * if we have some then we have to checkpoint to wake 462 * the secondary up. 463 */ 464 static void colo_old_packet_check(void *opaque) 465 { 466 CompareState *s = opaque; 467 468 /* 469 * If we find one old packet, stop finding job and notify 470 * COLO frame do checkpoint. 471 */ 472 g_queue_find_custom(&s->conn_list, NULL, 473 (GCompareFunc)colo_old_packet_check_one_conn); 474 } 475 476 /* 477 * Called from the compare thread on the primary 478 * for compare connection 479 */ 480 static void colo_compare_connection(void *opaque, void *user_data) 481 { 482 CompareState *s = user_data; 483 Connection *conn = opaque; 484 Packet *pkt = NULL; 485 GList *result = NULL; 486 int ret; 487 488 while (!g_queue_is_empty(&conn->primary_list) && 489 !g_queue_is_empty(&conn->secondary_list)) { 490 pkt = g_queue_pop_head(&conn->primary_list); 491 switch (conn->ip_proto) { 492 case IPPROTO_TCP: 493 result = g_queue_find_custom(&conn->secondary_list, 494 pkt, (GCompareFunc)colo_packet_compare_tcp); 495 break; 496 case IPPROTO_UDP: 497 result = g_queue_find_custom(&conn->secondary_list, 498 pkt, (GCompareFunc)colo_packet_compare_udp); 499 break; 500 case IPPROTO_ICMP: 501 result = g_queue_find_custom(&conn->secondary_list, 502 pkt, (GCompareFunc)colo_packet_compare_icmp); 503 break; 504 default: 505 result = g_queue_find_custom(&conn->secondary_list, 506 pkt, (GCompareFunc)colo_packet_compare_other); 507 break; 508 } 509 510 if (result) { 511 ret = compare_chr_send(s, 512 pkt->data, 513 pkt->size, 514 pkt->vnet_hdr_len); 515 if (ret < 0) { 516 error_report("colo_send_primary_packet failed"); 517 } 518 trace_colo_compare_main("packet same and release packet"); 519 g_queue_remove(&conn->secondary_list, result->data); 520 packet_destroy(pkt, NULL); 521 } else { 522 /* 523 * If one packet arrive late, the secondary_list or 524 * primary_list will be empty, so we can't compare it 525 * until next comparison. 526 */ 527 trace_colo_compare_main("packet different"); 528 g_queue_push_head(&conn->primary_list, pkt); 529 /* TODO: colo_notify_checkpoint();*/ 530 break; 531 } 532 } 533 } 534 535 static int compare_chr_send(CompareState *s, 536 const uint8_t *buf, 537 uint32_t size, 538 uint32_t vnet_hdr_len) 539 { 540 int ret = 0; 541 uint32_t len = htonl(size); 542 543 if (!size) { 544 return 0; 545 } 546 547 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); 548 if (ret != sizeof(len)) { 549 goto err; 550 } 551 552 if (s->vnet_hdr) { 553 /* 554 * We send vnet header len make other module(like filter-redirector) 555 * know how to parse net packet correctly. 556 */ 557 len = htonl(vnet_hdr_len); 558 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); 559 if (ret != sizeof(len)) { 560 goto err; 561 } 562 } 563 564 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size); 565 if (ret != size) { 566 goto err; 567 } 568 569 return 0; 570 571 err: 572 return ret < 0 ? ret : -EIO; 573 } 574 575 static int compare_chr_can_read(void *opaque) 576 { 577 return COMPARE_READ_LEN_MAX; 578 } 579 580 /* 581 * Called from the main thread on the primary for packets 582 * arriving over the socket from the primary. 583 */ 584 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size) 585 { 586 CompareState *s = COLO_COMPARE(opaque); 587 int ret; 588 589 ret = net_fill_rstate(&s->pri_rs, buf, size); 590 if (ret == -1) { 591 qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL, 592 NULL, NULL, true); 593 error_report("colo-compare primary_in error"); 594 } 595 } 596 597 /* 598 * Called from the main thread on the primary for packets 599 * arriving over the socket from the secondary. 600 */ 601 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) 602 { 603 CompareState *s = COLO_COMPARE(opaque); 604 int ret; 605 606 ret = net_fill_rstate(&s->sec_rs, buf, size); 607 if (ret == -1) { 608 qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL, 609 NULL, NULL, true); 610 error_report("colo-compare secondary_in error"); 611 } 612 } 613 614 /* 615 * Check old packet regularly so it can watch for any packets 616 * that the secondary hasn't produced equivalents of. 617 */ 618 static void check_old_packet_regular(void *opaque) 619 { 620 CompareState *s = opaque; 621 622 /* if have old packet we will notify checkpoint */ 623 colo_old_packet_check(s); 624 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + 625 REGULAR_PACKET_CHECK_MS); 626 } 627 628 static void colo_compare_timer_init(CompareState *s) 629 { 630 AioContext *ctx = iothread_get_aio_context(s->iothread); 631 632 s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL, 633 SCALE_MS, check_old_packet_regular, 634 s); 635 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) + 636 REGULAR_PACKET_CHECK_MS); 637 } 638 639 static void colo_compare_timer_del(CompareState *s) 640 { 641 if (s->packet_check_timer) { 642 timer_del(s->packet_check_timer); 643 timer_free(s->packet_check_timer); 644 s->packet_check_timer = NULL; 645 } 646 } 647 648 static void colo_compare_iothread(CompareState *s) 649 { 650 object_ref(OBJECT(s->iothread)); 651 s->worker_context = iothread_get_g_main_context(s->iothread); 652 653 qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read, 654 compare_pri_chr_in, NULL, NULL, 655 s, s->worker_context, true); 656 qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read, 657 compare_sec_chr_in, NULL, NULL, 658 s, s->worker_context, true); 659 660 colo_compare_timer_init(s); 661 } 662 663 static char *compare_get_pri_indev(Object *obj, Error **errp) 664 { 665 CompareState *s = COLO_COMPARE(obj); 666 667 return g_strdup(s->pri_indev); 668 } 669 670 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp) 671 { 672 CompareState *s = COLO_COMPARE(obj); 673 674 g_free(s->pri_indev); 675 s->pri_indev = g_strdup(value); 676 } 677 678 static char *compare_get_sec_indev(Object *obj, Error **errp) 679 { 680 CompareState *s = COLO_COMPARE(obj); 681 682 return g_strdup(s->sec_indev); 683 } 684 685 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp) 686 { 687 CompareState *s = COLO_COMPARE(obj); 688 689 g_free(s->sec_indev); 690 s->sec_indev = g_strdup(value); 691 } 692 693 static char *compare_get_outdev(Object *obj, Error **errp) 694 { 695 CompareState *s = COLO_COMPARE(obj); 696 697 return g_strdup(s->outdev); 698 } 699 700 static void compare_set_outdev(Object *obj, const char *value, Error **errp) 701 { 702 CompareState *s = COLO_COMPARE(obj); 703 704 g_free(s->outdev); 705 s->outdev = g_strdup(value); 706 } 707 708 static bool compare_get_vnet_hdr(Object *obj, Error **errp) 709 { 710 CompareState *s = COLO_COMPARE(obj); 711 712 return s->vnet_hdr; 713 } 714 715 static void compare_set_vnet_hdr(Object *obj, 716 bool value, 717 Error **errp) 718 { 719 CompareState *s = COLO_COMPARE(obj); 720 721 s->vnet_hdr = value; 722 } 723 724 static void compare_pri_rs_finalize(SocketReadState *pri_rs) 725 { 726 CompareState *s = container_of(pri_rs, CompareState, pri_rs); 727 728 if (packet_enqueue(s, PRIMARY_IN)) { 729 trace_colo_compare_main("primary: unsupported packet in"); 730 compare_chr_send(s, 731 pri_rs->buf, 732 pri_rs->packet_len, 733 pri_rs->vnet_hdr_len); 734 } else { 735 /* compare connection */ 736 g_queue_foreach(&s->conn_list, colo_compare_connection, s); 737 } 738 } 739 740 static void compare_sec_rs_finalize(SocketReadState *sec_rs) 741 { 742 CompareState *s = container_of(sec_rs, CompareState, sec_rs); 743 744 if (packet_enqueue(s, SECONDARY_IN)) { 745 trace_colo_compare_main("secondary: unsupported packet in"); 746 } else { 747 /* compare connection */ 748 g_queue_foreach(&s->conn_list, colo_compare_connection, s); 749 } 750 } 751 752 753 /* 754 * Return 0 is success. 755 * Return 1 is failed. 756 */ 757 static int find_and_check_chardev(Chardev **chr, 758 char *chr_name, 759 Error **errp) 760 { 761 *chr = qemu_chr_find(chr_name); 762 if (*chr == NULL) { 763 error_setg(errp, "Device '%s' not found", 764 chr_name); 765 return 1; 766 } 767 768 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) { 769 error_setg(errp, "chardev \"%s\" is not reconnectable", 770 chr_name); 771 return 1; 772 } 773 774 return 0; 775 } 776 777 /* 778 * Called from the main thread on the primary 779 * to setup colo-compare. 780 */ 781 static void colo_compare_complete(UserCreatable *uc, Error **errp) 782 { 783 CompareState *s = COLO_COMPARE(uc); 784 Chardev *chr; 785 786 if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) { 787 error_setg(errp, "colo compare needs 'primary_in' ," 788 "'secondary_in','outdev','iothread' property set"); 789 return; 790 } else if (!strcmp(s->pri_indev, s->outdev) || 791 !strcmp(s->sec_indev, s->outdev) || 792 !strcmp(s->pri_indev, s->sec_indev)) { 793 error_setg(errp, "'indev' and 'outdev' could not be same " 794 "for compare module"); 795 return; 796 } 797 798 if (find_and_check_chardev(&chr, s->pri_indev, errp) || 799 !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) { 800 return; 801 } 802 803 if (find_and_check_chardev(&chr, s->sec_indev, errp) || 804 !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) { 805 return; 806 } 807 808 if (find_and_check_chardev(&chr, s->outdev, errp) || 809 !qemu_chr_fe_init(&s->chr_out, chr, errp)) { 810 return; 811 } 812 813 net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr); 814 net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr); 815 816 g_queue_init(&s->conn_list); 817 818 s->connection_track_table = g_hash_table_new_full(connection_key_hash, 819 connection_key_equal, 820 g_free, 821 connection_destroy); 822 823 colo_compare_iothread(s); 824 return; 825 } 826 827 static void colo_flush_packets(void *opaque, void *user_data) 828 { 829 CompareState *s = user_data; 830 Connection *conn = opaque; 831 Packet *pkt = NULL; 832 833 while (!g_queue_is_empty(&conn->primary_list)) { 834 pkt = g_queue_pop_head(&conn->primary_list); 835 compare_chr_send(s, 836 pkt->data, 837 pkt->size, 838 pkt->vnet_hdr_len); 839 packet_destroy(pkt, NULL); 840 } 841 while (!g_queue_is_empty(&conn->secondary_list)) { 842 pkt = g_queue_pop_head(&conn->secondary_list); 843 packet_destroy(pkt, NULL); 844 } 845 } 846 847 static void colo_compare_class_init(ObjectClass *oc, void *data) 848 { 849 UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc); 850 851 ucc->complete = colo_compare_complete; 852 } 853 854 static void colo_compare_init(Object *obj) 855 { 856 CompareState *s = COLO_COMPARE(obj); 857 858 object_property_add_str(obj, "primary_in", 859 compare_get_pri_indev, compare_set_pri_indev, 860 NULL); 861 object_property_add_str(obj, "secondary_in", 862 compare_get_sec_indev, compare_set_sec_indev, 863 NULL); 864 object_property_add_str(obj, "outdev", 865 compare_get_outdev, compare_set_outdev, 866 NULL); 867 object_property_add_link(obj, "iothread", TYPE_IOTHREAD, 868 (Object **)&s->iothread, 869 object_property_allow_set_link, 870 OBJ_PROP_LINK_UNREF_ON_RELEASE, NULL); 871 872 s->vnet_hdr = false; 873 object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr, 874 compare_set_vnet_hdr, NULL); 875 } 876 877 static void colo_compare_finalize(Object *obj) 878 { 879 CompareState *s = COLO_COMPARE(obj); 880 881 qemu_chr_fe_deinit(&s->chr_pri_in, false); 882 qemu_chr_fe_deinit(&s->chr_sec_in, false); 883 qemu_chr_fe_deinit(&s->chr_out, false); 884 if (s->iothread) { 885 colo_compare_timer_del(s); 886 } 887 /* Release all unhandled packets after compare thead exited */ 888 g_queue_foreach(&s->conn_list, colo_flush_packets, s); 889 890 g_queue_clear(&s->conn_list); 891 892 if (s->connection_track_table) { 893 g_hash_table_destroy(s->connection_track_table); 894 } 895 896 if (s->iothread) { 897 object_unref(OBJECT(s->iothread)); 898 } 899 g_free(s->pri_indev); 900 g_free(s->sec_indev); 901 g_free(s->outdev); 902 } 903 904 static const TypeInfo colo_compare_info = { 905 .name = TYPE_COLO_COMPARE, 906 .parent = TYPE_OBJECT, 907 .instance_size = sizeof(CompareState), 908 .instance_init = colo_compare_init, 909 .instance_finalize = colo_compare_finalize, 910 .class_size = sizeof(CompareClass), 911 .class_init = colo_compare_class_init, 912 .interfaces = (InterfaceInfo[]) { 913 { TYPE_USER_CREATABLE }, 914 { } 915 } 916 }; 917 918 static void register_types(void) 919 { 920 type_register_static(&colo_compare_info); 921 } 922 923 type_init(register_types); 924