xref: /openbmc/qemu/net/colo-compare.c (revision b15e402f)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or
12  * later.  See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 #include "qemu-common.h"
17 #include "qemu/error-report.h"
18 #include "trace.h"
19 #include "qapi/error.h"
20 #include "net/net.h"
21 #include "net/eth.h"
22 #include "qom/object_interfaces.h"
23 #include "qemu/iov.h"
24 #include "qom/object.h"
25 #include "net/queue.h"
26 #include "chardev/char-fe.h"
27 #include "qemu/sockets.h"
28 #include "colo.h"
29 #include "sysemu/iothread.h"
30 #include "net/colo-compare.h"
31 #include "migration/colo.h"
32 #include "migration/migration.h"
33 #include "util.h"
34 
35 #include "block/aio-wait.h"
36 #include "qemu/coroutine.h"
37 
38 #define TYPE_COLO_COMPARE "colo-compare"
39 #define COLO_COMPARE(obj) \
40     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
41 
42 static QTAILQ_HEAD(, CompareState) net_compares =
43        QTAILQ_HEAD_INITIALIZER(net_compares);
44 
45 static NotifierList colo_compare_notifiers =
46     NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
47 
48 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
49 #define MAX_QUEUE_SIZE 1024
50 
51 #define COLO_COMPARE_FREE_PRIMARY     0x01
52 #define COLO_COMPARE_FREE_SECONDARY   0x02
53 
54 #define REGULAR_PACKET_CHECK_MS 3000
55 #define DEFAULT_TIME_OUT_MS 3000
56 
57 /* #define DEBUG_COLO_PACKETS */
58 
59 static QemuMutex colo_compare_mutex;
60 static bool colo_compare_active;
61 static QemuMutex event_mtx;
62 static QemuCond event_complete_cond;
63 static int event_unhandled_count;
64 static uint32_t max_queue_size;
65 
66 /*
67  *  + CompareState ++
68  *  |               |
69  *  +---------------+   +---------------+         +---------------+
70  *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
71  *  +---------------+   +---------------+         +---------------+
72  *  |               |     |           |             |          |
73  *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
74  *                    |primary |  |secondary    |primary | |secondary
75  *                    |packet  |  |packet  +    |packet  | |packet  +
76  *                    +--------+  +--------+    +--------+ +--------+
77  *                        |           |             |          |
78  *                    +---v----+  +---v----+    +---v----+ +---v----+
79  *                    |primary |  |secondary    |primary | |secondary
80  *                    |packet  |  |packet  +    |packet  | |packet  +
81  *                    +--------+  +--------+    +--------+ +--------+
82  *                        |           |             |          |
83  *                    +---v----+  +---v----+    +---v----+ +---v----+
84  *                    |primary |  |secondary    |primary | |secondary
85  *                    |packet  |  |packet  +    |packet  | |packet  +
86  *                    +--------+  +--------+    +--------+ +--------+
87  */
88 
89 typedef struct SendCo {
90     Coroutine *co;
91     struct CompareState *s;
92     CharBackend *chr;
93     GQueue send_list;
94     bool notify_remote_frame;
95     bool done;
96     int ret;
97 } SendCo;
98 
99 typedef struct SendEntry {
100     uint32_t size;
101     uint32_t vnet_hdr_len;
102     uint8_t *buf;
103 } SendEntry;
104 
105 typedef struct CompareState {
106     Object parent;
107 
108     char *pri_indev;
109     char *sec_indev;
110     char *outdev;
111     char *notify_dev;
112     CharBackend chr_pri_in;
113     CharBackend chr_sec_in;
114     CharBackend chr_out;
115     CharBackend chr_notify_dev;
116     SocketReadState pri_rs;
117     SocketReadState sec_rs;
118     SocketReadState notify_rs;
119     SendCo out_sendco;
120     SendCo notify_sendco;
121     bool vnet_hdr;
122     uint32_t compare_timeout;
123     uint32_t expired_scan_cycle;
124 
125     /*
126      * Record the connection that through the NIC
127      * Element type: Connection
128      */
129     GQueue conn_list;
130     /* Record the connection without repetition */
131     GHashTable *connection_track_table;
132 
133     IOThread *iothread;
134     GMainContext *worker_context;
135     QEMUTimer *packet_check_timer;
136 
137     QEMUBH *event_bh;
138     enum colo_event event;
139 
140     QTAILQ_ENTRY(CompareState) next;
141 } CompareState;
142 
143 typedef struct CompareClass {
144     ObjectClass parent_class;
145 } CompareClass;
146 
147 enum {
148     PRIMARY_IN = 0,
149     SECONDARY_IN,
150 };
151 
152 static const char *colo_mode[] = {
153     [PRIMARY_IN] = "primary",
154     [SECONDARY_IN] = "secondary",
155 };
156 
157 static int compare_chr_send(CompareState *s,
158                             uint8_t *buf,
159                             uint32_t size,
160                             uint32_t vnet_hdr_len,
161                             bool notify_remote_frame,
162                             bool zero_copy);
163 
164 static bool packet_matches_str(const char *str,
165                                const uint8_t *buf,
166                                uint32_t packet_len)
167 {
168     if (packet_len != strlen(str)) {
169         return false;
170     }
171 
172     return !memcmp(str, buf, strlen(str));
173 }
174 
175 static void notify_remote_frame(CompareState *s)
176 {
177     char msg[] = "DO_CHECKPOINT";
178     int ret = 0;
179 
180     ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
181     if (ret < 0) {
182         error_report("Notify Xen COLO-frame failed");
183     }
184 }
185 
186 static void colo_compare_inconsistency_notify(CompareState *s)
187 {
188     if (s->notify_dev) {
189         notify_remote_frame(s);
190     } else {
191         notifier_list_notify(&colo_compare_notifiers,
192                              migrate_get_current());
193     }
194 }
195 
196 static gint seq_sorter(Packet *a, Packet *b, gpointer data)
197 {
198     struct tcp_hdr *atcp, *btcp;
199 
200     atcp = (struct tcp_hdr *)(a->transport_header);
201     btcp = (struct tcp_hdr *)(b->transport_header);
202     return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
203 }
204 
205 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
206 {
207     Packet *pkt = data;
208     struct tcp_hdr *tcphd;
209 
210     tcphd = (struct tcp_hdr *)pkt->transport_header;
211 
212     pkt->tcp_seq = ntohl(tcphd->th_seq);
213     pkt->tcp_ack = ntohl(tcphd->th_ack);
214     *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
215     pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
216                        + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
217     pkt->payload_size = pkt->size - pkt->header_size;
218     pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
219     pkt->flags = tcphd->th_flags;
220 }
221 
222 /*
223  * Return 1 on success, if return 0 means the
224  * packet will be dropped
225  */
226 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
227 {
228     if (g_queue_get_length(queue) <= max_queue_size) {
229         if (pkt->ip->ip_p == IPPROTO_TCP) {
230             fill_pkt_tcp_info(pkt, max_ack);
231             g_queue_insert_sorted(queue,
232                                   pkt,
233                                   (GCompareDataFunc)seq_sorter,
234                                   NULL);
235         } else {
236             g_queue_push_tail(queue, pkt);
237         }
238         return 1;
239     }
240     return 0;
241 }
242 
243 /*
244  * Return 0 on success, if return -1 means the pkt
245  * is unsupported(arp and ipv6) and will be sent later
246  */
247 static int packet_enqueue(CompareState *s, int mode, Connection **con)
248 {
249     ConnectionKey key;
250     Packet *pkt = NULL;
251     Connection *conn;
252     int ret;
253 
254     if (mode == PRIMARY_IN) {
255         pkt = packet_new(s->pri_rs.buf,
256                          s->pri_rs.packet_len,
257                          s->pri_rs.vnet_hdr_len);
258     } else {
259         pkt = packet_new(s->sec_rs.buf,
260                          s->sec_rs.packet_len,
261                          s->sec_rs.vnet_hdr_len);
262     }
263 
264     if (parse_packet_early(pkt)) {
265         packet_destroy(pkt, NULL);
266         pkt = NULL;
267         return -1;
268     }
269     fill_connection_key(pkt, &key);
270 
271     conn = connection_get(s->connection_track_table,
272                           &key,
273                           &s->conn_list);
274 
275     if (!conn->processing) {
276         g_queue_push_tail(&s->conn_list, conn);
277         conn->processing = true;
278     }
279 
280     if (mode == PRIMARY_IN) {
281         ret = colo_insert_packet(&conn->primary_list, pkt, &conn->pack);
282     } else {
283         ret = colo_insert_packet(&conn->secondary_list, pkt, &conn->sack);
284     }
285 
286     if (!ret) {
287         trace_colo_compare_drop_packet(colo_mode[mode],
288             "queue size too big, drop packet");
289         packet_destroy(pkt, NULL);
290         pkt = NULL;
291     }
292 
293     *con = conn;
294 
295     return 0;
296 }
297 
298 static inline bool after(uint32_t seq1, uint32_t seq2)
299 {
300         return (int32_t)(seq1 - seq2) > 0;
301 }
302 
303 static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
304 {
305     int ret;
306     ret = compare_chr_send(s,
307                            pkt->data,
308                            pkt->size,
309                            pkt->vnet_hdr_len,
310                            false,
311                            true);
312     if (ret < 0) {
313         error_report("colo send primary packet failed");
314     }
315     trace_colo_compare_main("packet same and release packet");
316     packet_destroy_partial(pkt, NULL);
317 }
318 
319 /*
320  * The IP packets sent by primary and secondary
321  * will be compared in here
322  * TODO support ip fragment, Out-Of-Order
323  * return:    0  means packet same
324  *            > 0 || < 0 means packet different
325  */
326 static int colo_compare_packet_payload(Packet *ppkt,
327                                        Packet *spkt,
328                                        uint16_t poffset,
329                                        uint16_t soffset,
330                                        uint16_t len)
331 
332 {
333     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_IP_INFO)) {
334         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
335 
336         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
337         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
338         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
339         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
340 
341         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
342                                    pri_ip_dst, spkt->size,
343                                    sec_ip_src, sec_ip_dst);
344     }
345 
346     return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
347 }
348 
349 /*
350  * return true means that the payload is consist and
351  * need to make the next comparison, false means do
352  * the checkpoint
353 */
354 static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
355                               int8_t *mark, uint32_t max_ack)
356 {
357     *mark = 0;
358 
359     if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
360         if (!colo_compare_packet_payload(ppkt, spkt,
361                                         ppkt->header_size, spkt->header_size,
362                                         ppkt->payload_size)) {
363             *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
364             return true;
365         }
366     }
367 
368     /* one part of secondary packet payload still need to be compared */
369     if (!after(ppkt->seq_end, spkt->seq_end)) {
370         if (!colo_compare_packet_payload(ppkt, spkt,
371                                         ppkt->header_size + ppkt->offset,
372                                         spkt->header_size + spkt->offset,
373                                         ppkt->payload_size - ppkt->offset)) {
374             if (!after(ppkt->tcp_ack, max_ack)) {
375                 *mark = COLO_COMPARE_FREE_PRIMARY;
376                 spkt->offset += ppkt->payload_size - ppkt->offset;
377                 return true;
378             } else {
379                 /* secondary guest hasn't ack the data, don't send
380                  * out this packet
381                  */
382                 return false;
383             }
384         }
385     } else {
386         /* primary packet is longer than secondary packet, compare
387          * the same part and mark the primary packet offset
388          */
389         if (!colo_compare_packet_payload(ppkt, spkt,
390                                         ppkt->header_size + ppkt->offset,
391                                         spkt->header_size + spkt->offset,
392                                         spkt->payload_size - spkt->offset)) {
393             *mark = COLO_COMPARE_FREE_SECONDARY;
394             ppkt->offset += spkt->payload_size - spkt->offset;
395             return true;
396         }
397     }
398 
399     return false;
400 }
401 
402 static void colo_compare_tcp(CompareState *s, Connection *conn)
403 {
404     Packet *ppkt = NULL, *spkt = NULL;
405     int8_t mark;
406 
407     /*
408      * If ppkt and spkt have the same payload, but ppkt's ACK
409      * is greater than spkt's ACK, in this case we can not
410      * send the ppkt because it will cause the secondary guest
411      * to miss sending some data in the next. Therefore, we
412      * record the maximum ACK in the current queue at both
413      * primary side and secondary side. Only when the ack is
414      * less than the smaller of the two maximum ack, then we
415      * can ensure that the packet's payload is acknowledged by
416      * primary and secondary.
417     */
418     uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
419 
420 pri:
421     if (g_queue_is_empty(&conn->primary_list)) {
422         return;
423     }
424     ppkt = g_queue_pop_head(&conn->primary_list);
425 sec:
426     if (g_queue_is_empty(&conn->secondary_list)) {
427         g_queue_push_head(&conn->primary_list, ppkt);
428         return;
429     }
430     spkt = g_queue_pop_head(&conn->secondary_list);
431 
432     if (ppkt->tcp_seq == ppkt->seq_end) {
433         colo_release_primary_pkt(s, ppkt);
434         ppkt = NULL;
435     }
436 
437     if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
438         trace_colo_compare_main("pri: this packet has compared");
439         colo_release_primary_pkt(s, ppkt);
440         ppkt = NULL;
441     }
442 
443     if (spkt->tcp_seq == spkt->seq_end) {
444         packet_destroy(spkt, NULL);
445         if (!ppkt) {
446             goto pri;
447         } else {
448             goto sec;
449         }
450     } else {
451         if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
452             trace_colo_compare_main("sec: this packet has compared");
453             packet_destroy(spkt, NULL);
454             if (!ppkt) {
455                 goto pri;
456             } else {
457                 goto sec;
458             }
459         }
460         if (!ppkt) {
461             g_queue_push_head(&conn->secondary_list, spkt);
462             goto pri;
463         }
464     }
465 
466     if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
467         trace_colo_compare_tcp_info("pri",
468                                     ppkt->tcp_seq, ppkt->tcp_ack,
469                                     ppkt->header_size, ppkt->payload_size,
470                                     ppkt->offset, ppkt->flags);
471 
472         trace_colo_compare_tcp_info("sec",
473                                     spkt->tcp_seq, spkt->tcp_ack,
474                                     spkt->header_size, spkt->payload_size,
475                                     spkt->offset, spkt->flags);
476 
477         if (mark == COLO_COMPARE_FREE_PRIMARY) {
478             conn->compare_seq = ppkt->seq_end;
479             colo_release_primary_pkt(s, ppkt);
480             g_queue_push_head(&conn->secondary_list, spkt);
481             goto pri;
482         }
483         if (mark == COLO_COMPARE_FREE_SECONDARY) {
484             conn->compare_seq = spkt->seq_end;
485             packet_destroy(spkt, NULL);
486             goto sec;
487         }
488         if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
489             conn->compare_seq = ppkt->seq_end;
490             colo_release_primary_pkt(s, ppkt);
491             packet_destroy(spkt, NULL);
492             goto pri;
493         }
494     } else {
495         g_queue_push_head(&conn->primary_list, ppkt);
496         g_queue_push_head(&conn->secondary_list, spkt);
497 
498 #ifdef DEBUG_COLO_PACKETS
499         qemu_hexdump((char *)ppkt->data, stderr,
500                      "colo-compare ppkt", ppkt->size);
501         qemu_hexdump((char *)spkt->data, stderr,
502                      "colo-compare spkt", spkt->size);
503 #endif
504 
505         colo_compare_inconsistency_notify(s);
506     }
507 }
508 
509 
510 /*
511  * Called from the compare thread on the primary
512  * for compare udp packet
513  */
514 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
515 {
516     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
517     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
518 
519     trace_colo_compare_main("compare udp");
520 
521     /*
522      * Because of ppkt and spkt are both in the same connection,
523      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
524      * same with spkt. In addition, IP header's Identification is a random
525      * field, we can handle it in IP fragmentation function later.
526      * COLO just concern the response net packet payload from primary guest
527      * and secondary guest are same or not, So we ignored all IP header include
528      * other field like TOS,TTL,IP Checksum. we only need to compare
529      * the ip payload here.
530      */
531     if (ppkt->size != spkt->size) {
532         trace_colo_compare_main("UDP: payload size of packets are different");
533         return -1;
534     }
535     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
536                                     ppkt->size - offset)) {
537         trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
538         trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
539 #ifdef DEBUG_COLO_PACKETS
540         qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
541                      ppkt->size);
542         qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
543                      spkt->size);
544 #endif
545         return -1;
546     } else {
547         return 0;
548     }
549 }
550 
551 /*
552  * Called from the compare thread on the primary
553  * for compare icmp packet
554  */
555 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
556 {
557     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
558     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
559 
560     trace_colo_compare_main("compare icmp");
561 
562     /*
563      * Because of ppkt and spkt are both in the same connection,
564      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
565      * same with spkt. In addition, IP header's Identification is a random
566      * field, we can handle it in IP fragmentation function later.
567      * COLO just concern the response net packet payload from primary guest
568      * and secondary guest are same or not, So we ignored all IP header include
569      * other field like TOS,TTL,IP Checksum. we only need to compare
570      * the ip payload here.
571      */
572     if (ppkt->size != spkt->size) {
573         trace_colo_compare_main("ICMP: payload size of packets are different");
574         return -1;
575     }
576     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
577                                     ppkt->size - offset)) {
578         trace_colo_compare_icmp_miscompare("primary pkt size",
579                                            ppkt->size);
580         trace_colo_compare_icmp_miscompare("Secondary pkt size",
581                                            spkt->size);
582 #ifdef DEBUG_COLO_PACKETS
583         qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
584                      ppkt->size);
585         qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
586                      spkt->size);
587 #endif
588         return -1;
589     } else {
590         return 0;
591     }
592 }
593 
594 /*
595  * Called from the compare thread on the primary
596  * for compare other packet
597  */
598 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
599 {
600     uint16_t offset = ppkt->vnet_hdr_len;
601 
602     trace_colo_compare_main("compare other");
603     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_IP_INFO)) {
604         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
605 
606         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
607         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
608         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
609         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
610 
611         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
612                                    pri_ip_dst, spkt->size,
613                                    sec_ip_src, sec_ip_dst);
614     }
615 
616     if (ppkt->size != spkt->size) {
617         trace_colo_compare_main("Other: payload size of packets are different");
618         return -1;
619     }
620     return colo_compare_packet_payload(ppkt, spkt, offset, offset,
621                                        ppkt->size - offset);
622 }
623 
624 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
625 {
626     int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
627 
628     if ((now - pkt->creation_ms) > (*check_time)) {
629         trace_colo_old_packet_check_found(pkt->creation_ms);
630         return 0;
631     } else {
632         return 1;
633     }
634 }
635 
636 void colo_compare_register_notifier(Notifier *notify)
637 {
638     notifier_list_add(&colo_compare_notifiers, notify);
639 }
640 
641 void colo_compare_unregister_notifier(Notifier *notify)
642 {
643     notifier_remove(notify);
644 }
645 
646 static int colo_old_packet_check_one_conn(Connection *conn,
647                                           CompareState *s)
648 {
649     GList *result = NULL;
650 
651     result = g_queue_find_custom(&conn->primary_list,
652                                  &s->compare_timeout,
653                                  (GCompareFunc)colo_old_packet_check_one);
654 
655     if (result) {
656         /* Do checkpoint will flush old packet */
657         colo_compare_inconsistency_notify(s);
658         return 0;
659     }
660 
661     return 1;
662 }
663 
664 /*
665  * Look for old packets that the secondary hasn't matched,
666  * if we have some then we have to checkpoint to wake
667  * the secondary up.
668  */
669 static void colo_old_packet_check(void *opaque)
670 {
671     CompareState *s = opaque;
672 
673     /*
674      * If we find one old packet, stop finding job and notify
675      * COLO frame do checkpoint.
676      */
677     g_queue_find_custom(&s->conn_list, s,
678                         (GCompareFunc)colo_old_packet_check_one_conn);
679 }
680 
681 static void colo_compare_packet(CompareState *s, Connection *conn,
682                                 int (*HandlePacket)(Packet *spkt,
683                                 Packet *ppkt))
684 {
685     Packet *pkt = NULL;
686     GList *result = NULL;
687 
688     while (!g_queue_is_empty(&conn->primary_list) &&
689            !g_queue_is_empty(&conn->secondary_list)) {
690         pkt = g_queue_pop_head(&conn->primary_list);
691         result = g_queue_find_custom(&conn->secondary_list,
692                  pkt, (GCompareFunc)HandlePacket);
693 
694         if (result) {
695             colo_release_primary_pkt(s, pkt);
696             g_queue_remove(&conn->secondary_list, result->data);
697         } else {
698             /*
699              * If one packet arrive late, the secondary_list or
700              * primary_list will be empty, so we can't compare it
701              * until next comparison. If the packets in the list are
702              * timeout, it will trigger a checkpoint request.
703              */
704             trace_colo_compare_main("packet different");
705             g_queue_push_head(&conn->primary_list, pkt);
706 
707             colo_compare_inconsistency_notify(s);
708             break;
709         }
710     }
711 }
712 
713 /*
714  * Called from the compare thread on the primary
715  * for compare packet with secondary list of the
716  * specified connection when a new packet was
717  * queued to it.
718  */
719 static void colo_compare_connection(void *opaque, void *user_data)
720 {
721     CompareState *s = user_data;
722     Connection *conn = opaque;
723 
724     switch (conn->ip_proto) {
725     case IPPROTO_TCP:
726         colo_compare_tcp(s, conn);
727         break;
728     case IPPROTO_UDP:
729         colo_compare_packet(s, conn, colo_packet_compare_udp);
730         break;
731     case IPPROTO_ICMP:
732         colo_compare_packet(s, conn, colo_packet_compare_icmp);
733         break;
734     default:
735         colo_compare_packet(s, conn, colo_packet_compare_other);
736         break;
737     }
738 }
739 
740 static void coroutine_fn _compare_chr_send(void *opaque)
741 {
742     SendCo *sendco = opaque;
743     CompareState *s = sendco->s;
744     int ret = 0;
745 
746     while (!g_queue_is_empty(&sendco->send_list)) {
747         SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
748         uint32_t len = htonl(entry->size);
749 
750         ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
751 
752         if (ret != sizeof(len)) {
753             g_free(entry->buf);
754             g_slice_free(SendEntry, entry);
755             goto err;
756         }
757 
758         if (!sendco->notify_remote_frame && s->vnet_hdr) {
759             /*
760              * We send vnet header len make other module(like filter-redirector)
761              * know how to parse net packet correctly.
762              */
763             len = htonl(entry->vnet_hdr_len);
764 
765             ret = qemu_chr_fe_write_all(sendco->chr,
766                                         (uint8_t *)&len,
767                                         sizeof(len));
768 
769             if (ret != sizeof(len)) {
770                 g_free(entry->buf);
771                 g_slice_free(SendEntry, entry);
772                 goto err;
773             }
774         }
775 
776         ret = qemu_chr_fe_write_all(sendco->chr,
777                                     (uint8_t *)entry->buf,
778                                     entry->size);
779 
780         if (ret != entry->size) {
781             g_free(entry->buf);
782             g_slice_free(SendEntry, entry);
783             goto err;
784         }
785 
786         g_free(entry->buf);
787         g_slice_free(SendEntry, entry);
788     }
789 
790     sendco->ret = 0;
791     goto out;
792 
793 err:
794     while (!g_queue_is_empty(&sendco->send_list)) {
795         SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
796         g_free(entry->buf);
797         g_slice_free(SendEntry, entry);
798     }
799     sendco->ret = ret < 0 ? ret : -EIO;
800 out:
801     sendco->co = NULL;
802     sendco->done = true;
803     aio_wait_kick();
804 }
805 
806 static int compare_chr_send(CompareState *s,
807                             uint8_t *buf,
808                             uint32_t size,
809                             uint32_t vnet_hdr_len,
810                             bool notify_remote_frame,
811                             bool zero_copy)
812 {
813     SendCo *sendco;
814     SendEntry *entry;
815 
816     if (notify_remote_frame) {
817         sendco = &s->notify_sendco;
818     } else {
819         sendco = &s->out_sendco;
820     }
821 
822     if (!size) {
823         return 0;
824     }
825 
826     entry = g_slice_new(SendEntry);
827     entry->size = size;
828     entry->vnet_hdr_len = vnet_hdr_len;
829     if (zero_copy) {
830         entry->buf = buf;
831     } else {
832         entry->buf = g_malloc(size);
833         memcpy(entry->buf, buf, size);
834     }
835     g_queue_push_head(&sendco->send_list, entry);
836 
837     if (sendco->done) {
838         sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
839         sendco->done = false;
840         qemu_coroutine_enter(sendco->co);
841         if (sendco->done) {
842             /* report early errors */
843             return sendco->ret;
844         }
845     }
846 
847     /* assume success */
848     return 0;
849 }
850 
851 static int compare_chr_can_read(void *opaque)
852 {
853     return COMPARE_READ_LEN_MAX;
854 }
855 
856 /*
857  * Called from the main thread on the primary for packets
858  * arriving over the socket from the primary.
859  */
860 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
861 {
862     CompareState *s = COLO_COMPARE(opaque);
863     int ret;
864 
865     ret = net_fill_rstate(&s->pri_rs, buf, size);
866     if (ret == -1) {
867         qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
868                                  NULL, NULL, true);
869         error_report("colo-compare primary_in error");
870     }
871 }
872 
873 /*
874  * Called from the main thread on the primary for packets
875  * arriving over the socket from the secondary.
876  */
877 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
878 {
879     CompareState *s = COLO_COMPARE(opaque);
880     int ret;
881 
882     ret = net_fill_rstate(&s->sec_rs, buf, size);
883     if (ret == -1) {
884         qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
885                                  NULL, NULL, true);
886         error_report("colo-compare secondary_in error");
887     }
888 }
889 
890 static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
891 {
892     CompareState *s = COLO_COMPARE(opaque);
893     int ret;
894 
895     ret = net_fill_rstate(&s->notify_rs, buf, size);
896     if (ret == -1) {
897         qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
898                                  NULL, NULL, true);
899         error_report("colo-compare notify_dev error");
900     }
901 }
902 
903 /*
904  * Check old packet regularly so it can watch for any packets
905  * that the secondary hasn't produced equivalents of.
906  */
907 static void check_old_packet_regular(void *opaque)
908 {
909     CompareState *s = opaque;
910 
911     /* if have old packet we will notify checkpoint */
912     colo_old_packet_check(s);
913     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
914               s->expired_scan_cycle);
915 }
916 
917 /* Public API, Used for COLO frame to notify compare event */
918 void colo_notify_compares_event(void *opaque, int event, Error **errp)
919 {
920     CompareState *s;
921     qemu_mutex_lock(&colo_compare_mutex);
922 
923     if (!colo_compare_active) {
924         qemu_mutex_unlock(&colo_compare_mutex);
925         return;
926     }
927 
928     qemu_mutex_lock(&event_mtx);
929     QTAILQ_FOREACH(s, &net_compares, next) {
930         s->event = event;
931         qemu_bh_schedule(s->event_bh);
932         event_unhandled_count++;
933     }
934     /* Wait all compare threads to finish handling this event */
935     while (event_unhandled_count > 0) {
936         qemu_cond_wait(&event_complete_cond, &event_mtx);
937     }
938 
939     qemu_mutex_unlock(&event_mtx);
940     qemu_mutex_unlock(&colo_compare_mutex);
941 }
942 
943 static void colo_compare_timer_init(CompareState *s)
944 {
945     AioContext *ctx = iothread_get_aio_context(s->iothread);
946 
947     s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
948                                 SCALE_MS, check_old_packet_regular,
949                                 s);
950     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
951               s->expired_scan_cycle);
952 }
953 
954 static void colo_compare_timer_del(CompareState *s)
955 {
956     if (s->packet_check_timer) {
957         timer_del(s->packet_check_timer);
958         timer_free(s->packet_check_timer);
959         s->packet_check_timer = NULL;
960     }
961  }
962 
963 static void colo_flush_packets(void *opaque, void *user_data);
964 
965 static void colo_compare_handle_event(void *opaque)
966 {
967     CompareState *s = opaque;
968 
969     switch (s->event) {
970     case COLO_EVENT_CHECKPOINT:
971         g_queue_foreach(&s->conn_list, colo_flush_packets, s);
972         break;
973     case COLO_EVENT_FAILOVER:
974         break;
975     default:
976         break;
977     }
978 
979     qemu_mutex_lock(&event_mtx);
980     assert(event_unhandled_count > 0);
981     event_unhandled_count--;
982     qemu_cond_broadcast(&event_complete_cond);
983     qemu_mutex_unlock(&event_mtx);
984 }
985 
986 static void colo_compare_iothread(CompareState *s)
987 {
988     AioContext *ctx = iothread_get_aio_context(s->iothread);
989     object_ref(OBJECT(s->iothread));
990     s->worker_context = iothread_get_g_main_context(s->iothread);
991 
992     qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
993                              compare_pri_chr_in, NULL, NULL,
994                              s, s->worker_context, true);
995     qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
996                              compare_sec_chr_in, NULL, NULL,
997                              s, s->worker_context, true);
998     if (s->notify_dev) {
999         qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
1000                                  compare_notify_chr, NULL, NULL,
1001                                  s, s->worker_context, true);
1002     }
1003 
1004     colo_compare_timer_init(s);
1005     s->event_bh = aio_bh_new(ctx, colo_compare_handle_event, s);
1006 }
1007 
1008 static char *compare_get_pri_indev(Object *obj, Error **errp)
1009 {
1010     CompareState *s = COLO_COMPARE(obj);
1011 
1012     return g_strdup(s->pri_indev);
1013 }
1014 
1015 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
1016 {
1017     CompareState *s = COLO_COMPARE(obj);
1018 
1019     g_free(s->pri_indev);
1020     s->pri_indev = g_strdup(value);
1021 }
1022 
1023 static char *compare_get_sec_indev(Object *obj, Error **errp)
1024 {
1025     CompareState *s = COLO_COMPARE(obj);
1026 
1027     return g_strdup(s->sec_indev);
1028 }
1029 
1030 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
1031 {
1032     CompareState *s = COLO_COMPARE(obj);
1033 
1034     g_free(s->sec_indev);
1035     s->sec_indev = g_strdup(value);
1036 }
1037 
1038 static char *compare_get_outdev(Object *obj, Error **errp)
1039 {
1040     CompareState *s = COLO_COMPARE(obj);
1041 
1042     return g_strdup(s->outdev);
1043 }
1044 
1045 static void compare_set_outdev(Object *obj, const char *value, Error **errp)
1046 {
1047     CompareState *s = COLO_COMPARE(obj);
1048 
1049     g_free(s->outdev);
1050     s->outdev = g_strdup(value);
1051 }
1052 
1053 static bool compare_get_vnet_hdr(Object *obj, Error **errp)
1054 {
1055     CompareState *s = COLO_COMPARE(obj);
1056 
1057     return s->vnet_hdr;
1058 }
1059 
1060 static void compare_set_vnet_hdr(Object *obj,
1061                                  bool value,
1062                                  Error **errp)
1063 {
1064     CompareState *s = COLO_COMPARE(obj);
1065 
1066     s->vnet_hdr = value;
1067 }
1068 
1069 static char *compare_get_notify_dev(Object *obj, Error **errp)
1070 {
1071     CompareState *s = COLO_COMPARE(obj);
1072 
1073     return g_strdup(s->notify_dev);
1074 }
1075 
1076 static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
1077 {
1078     CompareState *s = COLO_COMPARE(obj);
1079 
1080     g_free(s->notify_dev);
1081     s->notify_dev = g_strdup(value);
1082 }
1083 
1084 static void compare_get_timeout(Object *obj, Visitor *v,
1085                                 const char *name, void *opaque,
1086                                 Error **errp)
1087 {
1088     CompareState *s = COLO_COMPARE(obj);
1089     uint32_t value = s->compare_timeout;
1090 
1091     visit_type_uint32(v, name, &value, errp);
1092 }
1093 
1094 static void compare_set_timeout(Object *obj, Visitor *v,
1095                                 const char *name, void *opaque,
1096                                 Error **errp)
1097 {
1098     CompareState *s = COLO_COMPARE(obj);
1099     uint32_t value;
1100 
1101     if (!visit_type_uint32(v, name, &value, errp)) {
1102         return;
1103     }
1104     if (!value) {
1105         error_setg(errp, "Property '%s.%s' requires a positive value",
1106                    object_get_typename(obj), name);
1107         return;
1108     }
1109     s->compare_timeout = value;
1110 }
1111 
1112 static void compare_get_expired_scan_cycle(Object *obj, Visitor *v,
1113                                            const char *name, void *opaque,
1114                                            Error **errp)
1115 {
1116     CompareState *s = COLO_COMPARE(obj);
1117     uint32_t value = s->expired_scan_cycle;
1118 
1119     visit_type_uint32(v, name, &value, errp);
1120 }
1121 
1122 static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
1123                                            const char *name, void *opaque,
1124                                            Error **errp)
1125 {
1126     CompareState *s = COLO_COMPARE(obj);
1127     uint32_t value;
1128 
1129     if (!visit_type_uint32(v, name, &value, errp)) {
1130         return;
1131     }
1132     if (!value) {
1133         error_setg(errp, "Property '%s.%s' requires a positive value",
1134                    object_get_typename(obj), name);
1135         return;
1136     }
1137     s->expired_scan_cycle = value;
1138 }
1139 
1140 static void get_max_queue_size(Object *obj, Visitor *v,
1141                                const char *name, void *opaque,
1142                                Error **errp)
1143 {
1144     uint32_t value = max_queue_size;
1145 
1146     visit_type_uint32(v, name, &value, errp);
1147 }
1148 
1149 static void set_max_queue_size(Object *obj, Visitor *v,
1150                                const char *name, void *opaque,
1151                                Error **errp)
1152 {
1153     Error *local_err = NULL;
1154     uint32_t value;
1155 
1156     visit_type_uint32(v, name, &value, &local_err);
1157     if (local_err) {
1158         goto out;
1159     }
1160     if (!value) {
1161         error_setg(&local_err, "Property '%s.%s' requires a positive value",
1162                    object_get_typename(obj), name);
1163         goto out;
1164     }
1165     max_queue_size = value;
1166 
1167 out:
1168     error_propagate(errp, local_err);
1169 }
1170 
1171 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
1172 {
1173     CompareState *s = container_of(pri_rs, CompareState, pri_rs);
1174     Connection *conn = NULL;
1175 
1176     if (packet_enqueue(s, PRIMARY_IN, &conn)) {
1177         trace_colo_compare_main("primary: unsupported packet in");
1178         compare_chr_send(s,
1179                          pri_rs->buf,
1180                          pri_rs->packet_len,
1181                          pri_rs->vnet_hdr_len,
1182                          false,
1183                          false);
1184     } else {
1185         /* compare packet in the specified connection */
1186         colo_compare_connection(conn, s);
1187     }
1188 }
1189 
1190 static void compare_sec_rs_finalize(SocketReadState *sec_rs)
1191 {
1192     CompareState *s = container_of(sec_rs, CompareState, sec_rs);
1193     Connection *conn = NULL;
1194 
1195     if (packet_enqueue(s, SECONDARY_IN, &conn)) {
1196         trace_colo_compare_main("secondary: unsupported packet in");
1197     } else {
1198         /* compare packet in the specified connection */
1199         colo_compare_connection(conn, s);
1200     }
1201 }
1202 
1203 static void compare_notify_rs_finalize(SocketReadState *notify_rs)
1204 {
1205     CompareState *s = container_of(notify_rs, CompareState, notify_rs);
1206 
1207     const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
1208     int ret;
1209 
1210     if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
1211                            notify_rs->buf,
1212                            notify_rs->packet_len)) {
1213         ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
1214         if (ret < 0) {
1215             error_report("Notify Xen COLO-frame INIT failed");
1216         }
1217     } else if (packet_matches_str("COLO_CHECKPOINT",
1218                                   notify_rs->buf,
1219                                   notify_rs->packet_len)) {
1220         /* colo-compare do checkpoint, flush pri packet and remove sec packet */
1221         g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1222     } else {
1223         error_report("COLO compare got unsupported instruction");
1224     }
1225 }
1226 
1227 /*
1228  * Return 0 is success.
1229  * Return 1 is failed.
1230  */
1231 static int find_and_check_chardev(Chardev **chr,
1232                                   char *chr_name,
1233                                   Error **errp)
1234 {
1235     *chr = qemu_chr_find(chr_name);
1236     if (*chr == NULL) {
1237         error_setg(errp, "Device '%s' not found",
1238                    chr_name);
1239         return 1;
1240     }
1241 
1242     if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
1243         error_setg(errp, "chardev \"%s\" is not reconnectable",
1244                    chr_name);
1245         return 1;
1246     }
1247 
1248     if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
1249         error_setg(errp, "chardev \"%s\" cannot switch context",
1250                    chr_name);
1251         return 1;
1252     }
1253 
1254     return 0;
1255 }
1256 
1257 /*
1258  * Called from the main thread on the primary
1259  * to setup colo-compare.
1260  */
1261 static void colo_compare_complete(UserCreatable *uc, Error **errp)
1262 {
1263     CompareState *s = COLO_COMPARE(uc);
1264     Chardev *chr;
1265 
1266     if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
1267         error_setg(errp, "colo compare needs 'primary_in' ,"
1268                    "'secondary_in','outdev','iothread' property set");
1269         return;
1270     } else if (!strcmp(s->pri_indev, s->outdev) ||
1271                !strcmp(s->sec_indev, s->outdev) ||
1272                !strcmp(s->pri_indev, s->sec_indev)) {
1273         error_setg(errp, "'indev' and 'outdev' could not be same "
1274                    "for compare module");
1275         return;
1276     }
1277 
1278     if (!s->compare_timeout) {
1279         /* Set default value to 3000 MS */
1280         s->compare_timeout = DEFAULT_TIME_OUT_MS;
1281     }
1282 
1283     if (!s->expired_scan_cycle) {
1284         /* Set default value to 3000 MS */
1285         s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
1286     }
1287 
1288     if (!max_queue_size) {
1289         /* Set default queue size to 1024 */
1290         max_queue_size = MAX_QUEUE_SIZE;
1291     }
1292 
1293     if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
1294         !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
1295         return;
1296     }
1297 
1298     if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
1299         !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
1300         return;
1301     }
1302 
1303     if (find_and_check_chardev(&chr, s->outdev, errp) ||
1304         !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
1305         return;
1306     }
1307 
1308     net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
1309     net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
1310 
1311     /* Try to enable remote notify chardev, currently just for Xen COLO */
1312     if (s->notify_dev) {
1313         if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
1314             !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
1315             return;
1316         }
1317 
1318         net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
1319                            s->vnet_hdr);
1320     }
1321 
1322     s->out_sendco.s = s;
1323     s->out_sendco.chr = &s->chr_out;
1324     s->out_sendco.notify_remote_frame = false;
1325     s->out_sendco.done = true;
1326     g_queue_init(&s->out_sendco.send_list);
1327 
1328     if (s->notify_dev) {
1329         s->notify_sendco.s = s;
1330         s->notify_sendco.chr = &s->chr_notify_dev;
1331         s->notify_sendco.notify_remote_frame = true;
1332         s->notify_sendco.done = true;
1333         g_queue_init(&s->notify_sendco.send_list);
1334     }
1335 
1336     g_queue_init(&s->conn_list);
1337 
1338     s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1339                                                       connection_key_equal,
1340                                                       g_free,
1341                                                       connection_destroy);
1342 
1343     colo_compare_iothread(s);
1344 
1345     qemu_mutex_lock(&colo_compare_mutex);
1346     if (!colo_compare_active) {
1347         qemu_mutex_init(&event_mtx);
1348         qemu_cond_init(&event_complete_cond);
1349         colo_compare_active = true;
1350     }
1351     QTAILQ_INSERT_TAIL(&net_compares, s, next);
1352     qemu_mutex_unlock(&colo_compare_mutex);
1353 
1354     return;
1355 }
1356 
1357 static void colo_flush_packets(void *opaque, void *user_data)
1358 {
1359     CompareState *s = user_data;
1360     Connection *conn = opaque;
1361     Packet *pkt = NULL;
1362 
1363     while (!g_queue_is_empty(&conn->primary_list)) {
1364         pkt = g_queue_pop_head(&conn->primary_list);
1365         compare_chr_send(s,
1366                          pkt->data,
1367                          pkt->size,
1368                          pkt->vnet_hdr_len,
1369                          false,
1370                          true);
1371         packet_destroy_partial(pkt, NULL);
1372     }
1373     while (!g_queue_is_empty(&conn->secondary_list)) {
1374         pkt = g_queue_pop_head(&conn->secondary_list);
1375         packet_destroy(pkt, NULL);
1376     }
1377 }
1378 
1379 static void colo_compare_class_init(ObjectClass *oc, void *data)
1380 {
1381     UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1382 
1383     ucc->complete = colo_compare_complete;
1384 }
1385 
1386 static void colo_compare_init(Object *obj)
1387 {
1388     CompareState *s = COLO_COMPARE(obj);
1389 
1390     object_property_add_str(obj, "primary_in",
1391                             compare_get_pri_indev, compare_set_pri_indev);
1392     object_property_add_str(obj, "secondary_in",
1393                             compare_get_sec_indev, compare_set_sec_indev);
1394     object_property_add_str(obj, "outdev",
1395                             compare_get_outdev, compare_set_outdev);
1396     object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1397                             (Object **)&s->iothread,
1398                             object_property_allow_set_link,
1399                             OBJ_PROP_LINK_STRONG);
1400     /* This parameter just for Xen COLO */
1401     object_property_add_str(obj, "notify_dev",
1402                             compare_get_notify_dev, compare_set_notify_dev);
1403 
1404     object_property_add(obj, "compare_timeout", "uint32",
1405                         compare_get_timeout,
1406                         compare_set_timeout, NULL, NULL);
1407 
1408     object_property_add(obj, "expired_scan_cycle", "uint32",
1409                         compare_get_expired_scan_cycle,
1410                         compare_set_expired_scan_cycle, NULL, NULL);
1411 
1412     object_property_add(obj, "max_queue_size", "uint32",
1413                         get_max_queue_size,
1414                         set_max_queue_size, NULL, NULL);
1415 
1416     s->vnet_hdr = false;
1417     object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
1418                              compare_set_vnet_hdr);
1419 }
1420 
1421 static void colo_compare_finalize(Object *obj)
1422 {
1423     CompareState *s = COLO_COMPARE(obj);
1424     CompareState *tmp = NULL;
1425 
1426     qemu_mutex_lock(&colo_compare_mutex);
1427     QTAILQ_FOREACH(tmp, &net_compares, next) {
1428         if (tmp == s) {
1429             QTAILQ_REMOVE(&net_compares, s, next);
1430             break;
1431         }
1432     }
1433     if (QTAILQ_EMPTY(&net_compares)) {
1434         colo_compare_active = false;
1435         qemu_mutex_destroy(&event_mtx);
1436         qemu_cond_destroy(&event_complete_cond);
1437     }
1438     qemu_mutex_unlock(&colo_compare_mutex);
1439 
1440     qemu_chr_fe_deinit(&s->chr_pri_in, false);
1441     qemu_chr_fe_deinit(&s->chr_sec_in, false);
1442     qemu_chr_fe_deinit(&s->chr_out, false);
1443     if (s->notify_dev) {
1444         qemu_chr_fe_deinit(&s->chr_notify_dev, false);
1445     }
1446 
1447     colo_compare_timer_del(s);
1448 
1449     qemu_bh_delete(s->event_bh);
1450 
1451     AioContext *ctx = iothread_get_aio_context(s->iothread);
1452     aio_context_acquire(ctx);
1453     AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
1454     if (s->notify_dev) {
1455         AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
1456     }
1457     aio_context_release(ctx);
1458 
1459     /* Release all unhandled packets after compare thead exited */
1460     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1461     AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
1462 
1463     g_queue_clear(&s->conn_list);
1464     g_queue_clear(&s->out_sendco.send_list);
1465     if (s->notify_dev) {
1466         g_queue_clear(&s->notify_sendco.send_list);
1467     }
1468 
1469     if (s->connection_track_table) {
1470         g_hash_table_destroy(s->connection_track_table);
1471     }
1472 
1473     object_unref(OBJECT(s->iothread));
1474 
1475     g_free(s->pri_indev);
1476     g_free(s->sec_indev);
1477     g_free(s->outdev);
1478     g_free(s->notify_dev);
1479 }
1480 
1481 static void __attribute__((__constructor__)) colo_compare_init_globals(void)
1482 {
1483     colo_compare_active = false;
1484     qemu_mutex_init(&colo_compare_mutex);
1485 }
1486 
1487 static const TypeInfo colo_compare_info = {
1488     .name = TYPE_COLO_COMPARE,
1489     .parent = TYPE_OBJECT,
1490     .instance_size = sizeof(CompareState),
1491     .instance_init = colo_compare_init,
1492     .instance_finalize = colo_compare_finalize,
1493     .class_size = sizeof(CompareClass),
1494     .class_init = colo_compare_class_init,
1495     .interfaces = (InterfaceInfo[]) {
1496         { TYPE_USER_CREATABLE },
1497         { }
1498     }
1499 };
1500 
1501 static void register_types(void)
1502 {
1503     type_register_static(&colo_compare_info);
1504 }
1505 
1506 type_init(register_types);
1507