xref: /openbmc/qemu/net/colo-compare.c (revision f4ef8c9cc10b3bee829b9775879d4ff9f77c2442)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or
12  * later.  See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 #include "qemu-common.h"
17 #include "qemu/error-report.h"
18 #include "trace.h"
19 #include "qapi/error.h"
20 #include "net/net.h"
21 #include "net/eth.h"
22 #include "qom/object_interfaces.h"
23 #include "qemu/iov.h"
24 #include "qom/object.h"
25 #include "net/queue.h"
26 #include "chardev/char-fe.h"
27 #include "qemu/sockets.h"
28 #include "colo.h"
29 #include "sysemu/iothread.h"
30 #include "net/colo-compare.h"
31 #include "migration/colo.h"
32 #include "migration/migration.h"
33 #include "util.h"
34 
35 #include "block/aio-wait.h"
36 #include "qemu/coroutine.h"
37 
38 #define TYPE_COLO_COMPARE "colo-compare"
39 typedef struct CompareState CompareState;
40 DECLARE_INSTANCE_CHECKER(CompareState, COLO_COMPARE,
41                          TYPE_COLO_COMPARE)
42 
43 static QTAILQ_HEAD(, CompareState) net_compares =
44        QTAILQ_HEAD_INITIALIZER(net_compares);
45 
46 static NotifierList colo_compare_notifiers =
47     NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
48 
49 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
50 #define MAX_QUEUE_SIZE 1024
51 
52 #define COLO_COMPARE_FREE_PRIMARY     0x01
53 #define COLO_COMPARE_FREE_SECONDARY   0x02
54 
55 #define REGULAR_PACKET_CHECK_MS 3000
56 #define DEFAULT_TIME_OUT_MS 3000
57 
58 /* #define DEBUG_COLO_PACKETS */
59 
60 static QemuMutex colo_compare_mutex;
61 static bool colo_compare_active;
62 static QemuMutex event_mtx;
63 static QemuCond event_complete_cond;
64 static int event_unhandled_count;
65 static uint32_t max_queue_size;
66 
67 /*
68  *  + CompareState ++
69  *  |               |
70  *  +---------------+   +---------------+         +---------------+
71  *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
72  *  +---------------+   +---------------+         +---------------+
73  *  |               |     |           |             |          |
74  *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
75  *                    |primary |  |secondary    |primary | |secondary
76  *                    |packet  |  |packet  +    |packet  | |packet  +
77  *                    +--------+  +--------+    +--------+ +--------+
78  *                        |           |             |          |
79  *                    +---v----+  +---v----+    +---v----+ +---v----+
80  *                    |primary |  |secondary    |primary | |secondary
81  *                    |packet  |  |packet  +    |packet  | |packet  +
82  *                    +--------+  +--------+    +--------+ +--------+
83  *                        |           |             |          |
84  *                    +---v----+  +---v----+    +---v----+ +---v----+
85  *                    |primary |  |secondary    |primary | |secondary
86  *                    |packet  |  |packet  +    |packet  | |packet  +
87  *                    +--------+  +--------+    +--------+ +--------+
88  */
89 
90 typedef struct SendCo {
91     Coroutine *co;
92     struct CompareState *s;
93     CharBackend *chr;
94     GQueue send_list;
95     bool notify_remote_frame;
96     bool done;
97     int ret;
98 } SendCo;
99 
100 typedef struct SendEntry {
101     uint32_t size;
102     uint32_t vnet_hdr_len;
103     uint8_t *buf;
104 } SendEntry;
105 
106 struct CompareState {
107     Object parent;
108 
109     char *pri_indev;
110     char *sec_indev;
111     char *outdev;
112     char *notify_dev;
113     CharBackend chr_pri_in;
114     CharBackend chr_sec_in;
115     CharBackend chr_out;
116     CharBackend chr_notify_dev;
117     SocketReadState pri_rs;
118     SocketReadState sec_rs;
119     SocketReadState notify_rs;
120     SendCo out_sendco;
121     SendCo notify_sendco;
122     bool vnet_hdr;
123     uint32_t compare_timeout;
124     uint32_t expired_scan_cycle;
125 
126     /*
127      * Record the connection that through the NIC
128      * Element type: Connection
129      */
130     GQueue conn_list;
131     /* Record the connection without repetition */
132     GHashTable *connection_track_table;
133 
134     IOThread *iothread;
135     GMainContext *worker_context;
136     QEMUTimer *packet_check_timer;
137 
138     QEMUBH *event_bh;
139     enum colo_event event;
140 
141     QTAILQ_ENTRY(CompareState) next;
142 };
143 
144 typedef struct CompareClass {
145     ObjectClass parent_class;
146 } CompareClass;
147 
148 enum {
149     PRIMARY_IN = 0,
150     SECONDARY_IN,
151 };
152 
153 static const char *colo_mode[] = {
154     [PRIMARY_IN] = "primary",
155     [SECONDARY_IN] = "secondary",
156 };
157 
158 static int compare_chr_send(CompareState *s,
159                             uint8_t *buf,
160                             uint32_t size,
161                             uint32_t vnet_hdr_len,
162                             bool notify_remote_frame,
163                             bool zero_copy);
164 
165 static bool packet_matches_str(const char *str,
166                                const uint8_t *buf,
167                                uint32_t packet_len)
168 {
169     if (packet_len != strlen(str)) {
170         return false;
171     }
172 
173     return !memcmp(str, buf, strlen(str));
174 }
175 
176 static void notify_remote_frame(CompareState *s)
177 {
178     char msg[] = "DO_CHECKPOINT";
179     int ret = 0;
180 
181     ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
182     if (ret < 0) {
183         error_report("Notify Xen COLO-frame failed");
184     }
185 }
186 
187 static void colo_compare_inconsistency_notify(CompareState *s)
188 {
189     if (s->notify_dev) {
190         notify_remote_frame(s);
191     } else {
192         notifier_list_notify(&colo_compare_notifiers,
193                              migrate_get_current());
194     }
195 }
196 
197 static gint seq_sorter(Packet *a, Packet *b, gpointer data)
198 {
199     struct tcp_hdr *atcp, *btcp;
200 
201     atcp = (struct tcp_hdr *)(a->transport_header);
202     btcp = (struct tcp_hdr *)(b->transport_header);
203     return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
204 }
205 
206 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
207 {
208     Packet *pkt = data;
209     struct tcp_hdr *tcphd;
210 
211     tcphd = (struct tcp_hdr *)pkt->transport_header;
212 
213     pkt->tcp_seq = ntohl(tcphd->th_seq);
214     pkt->tcp_ack = ntohl(tcphd->th_ack);
215     *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
216     pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
217                        + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
218     pkt->payload_size = pkt->size - pkt->header_size;
219     pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
220     pkt->flags = tcphd->th_flags;
221 }
222 
223 /*
224  * Return 1 on success, if return 0 means the
225  * packet will be dropped
226  */
227 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
228 {
229     if (g_queue_get_length(queue) <= max_queue_size) {
230         if (pkt->ip->ip_p == IPPROTO_TCP) {
231             fill_pkt_tcp_info(pkt, max_ack);
232             g_queue_insert_sorted(queue,
233                                   pkt,
234                                   (GCompareDataFunc)seq_sorter,
235                                   NULL);
236         } else {
237             g_queue_push_tail(queue, pkt);
238         }
239         return 1;
240     }
241     return 0;
242 }
243 
244 /*
245  * Return 0 on success, if return -1 means the pkt
246  * is unsupported(arp and ipv6) and will be sent later
247  */
248 static int packet_enqueue(CompareState *s, int mode, Connection **con)
249 {
250     ConnectionKey key;
251     Packet *pkt = NULL;
252     Connection *conn;
253     int ret;
254 
255     if (mode == PRIMARY_IN) {
256         pkt = packet_new(s->pri_rs.buf,
257                          s->pri_rs.packet_len,
258                          s->pri_rs.vnet_hdr_len);
259     } else {
260         pkt = packet_new(s->sec_rs.buf,
261                          s->sec_rs.packet_len,
262                          s->sec_rs.vnet_hdr_len);
263     }
264 
265     if (parse_packet_early(pkt)) {
266         packet_destroy(pkt, NULL);
267         pkt = NULL;
268         return -1;
269     }
270     fill_connection_key(pkt, &key);
271 
272     conn = connection_get(s->connection_track_table,
273                           &key,
274                           &s->conn_list);
275 
276     if (!conn->processing) {
277         g_queue_push_tail(&s->conn_list, conn);
278         conn->processing = true;
279     }
280 
281     if (mode == PRIMARY_IN) {
282         ret = colo_insert_packet(&conn->primary_list, pkt, &conn->pack);
283     } else {
284         ret = colo_insert_packet(&conn->secondary_list, pkt, &conn->sack);
285     }
286 
287     if (!ret) {
288         trace_colo_compare_drop_packet(colo_mode[mode],
289             "queue size too big, drop packet");
290         packet_destroy(pkt, NULL);
291         pkt = NULL;
292     }
293 
294     *con = conn;
295 
296     return 0;
297 }
298 
299 static inline bool after(uint32_t seq1, uint32_t seq2)
300 {
301         return (int32_t)(seq1 - seq2) > 0;
302 }
303 
304 static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
305 {
306     int ret;
307     ret = compare_chr_send(s,
308                            pkt->data,
309                            pkt->size,
310                            pkt->vnet_hdr_len,
311                            false,
312                            true);
313     if (ret < 0) {
314         error_report("colo send primary packet failed");
315     }
316     trace_colo_compare_main("packet same and release packet");
317     packet_destroy_partial(pkt, NULL);
318 }
319 
320 /*
321  * The IP packets sent by primary and secondary
322  * will be compared in here
323  * TODO support ip fragment, Out-Of-Order
324  * return:    0  means packet same
325  *            > 0 || < 0 means packet different
326  */
327 static int colo_compare_packet_payload(Packet *ppkt,
328                                        Packet *spkt,
329                                        uint16_t poffset,
330                                        uint16_t soffset,
331                                        uint16_t len)
332 
333 {
334     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_IP_INFO)) {
335         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
336 
337         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
338         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
339         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
340         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
341 
342         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
343                                    pri_ip_dst, spkt->size,
344                                    sec_ip_src, sec_ip_dst);
345     }
346 
347     return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
348 }
349 
350 /*
351  * return true means that the payload is consist and
352  * need to make the next comparison, false means do
353  * the checkpoint
354 */
355 static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
356                               int8_t *mark, uint32_t max_ack)
357 {
358     *mark = 0;
359 
360     if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
361         if (!colo_compare_packet_payload(ppkt, spkt,
362                                         ppkt->header_size, spkt->header_size,
363                                         ppkt->payload_size)) {
364             *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
365             return true;
366         }
367     }
368 
369     /* one part of secondary packet payload still need to be compared */
370     if (!after(ppkt->seq_end, spkt->seq_end)) {
371         if (!colo_compare_packet_payload(ppkt, spkt,
372                                         ppkt->header_size + ppkt->offset,
373                                         spkt->header_size + spkt->offset,
374                                         ppkt->payload_size - ppkt->offset)) {
375             if (!after(ppkt->tcp_ack, max_ack)) {
376                 *mark = COLO_COMPARE_FREE_PRIMARY;
377                 spkt->offset += ppkt->payload_size - ppkt->offset;
378                 return true;
379             } else {
380                 /* secondary guest hasn't ack the data, don't send
381                  * out this packet
382                  */
383                 return false;
384             }
385         }
386     } else {
387         /* primary packet is longer than secondary packet, compare
388          * the same part and mark the primary packet offset
389          */
390         if (!colo_compare_packet_payload(ppkt, spkt,
391                                         ppkt->header_size + ppkt->offset,
392                                         spkt->header_size + spkt->offset,
393                                         spkt->payload_size - spkt->offset)) {
394             *mark = COLO_COMPARE_FREE_SECONDARY;
395             ppkt->offset += spkt->payload_size - spkt->offset;
396             return true;
397         }
398     }
399 
400     return false;
401 }
402 
403 static void colo_compare_tcp(CompareState *s, Connection *conn)
404 {
405     Packet *ppkt = NULL, *spkt = NULL;
406     int8_t mark;
407 
408     /*
409      * If ppkt and spkt have the same payload, but ppkt's ACK
410      * is greater than spkt's ACK, in this case we can not
411      * send the ppkt because it will cause the secondary guest
412      * to miss sending some data in the next. Therefore, we
413      * record the maximum ACK in the current queue at both
414      * primary side and secondary side. Only when the ack is
415      * less than the smaller of the two maximum ack, then we
416      * can ensure that the packet's payload is acknowledged by
417      * primary and secondary.
418     */
419     uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
420 
421 pri:
422     if (g_queue_is_empty(&conn->primary_list)) {
423         return;
424     }
425     ppkt = g_queue_pop_head(&conn->primary_list);
426 sec:
427     if (g_queue_is_empty(&conn->secondary_list)) {
428         g_queue_push_head(&conn->primary_list, ppkt);
429         return;
430     }
431     spkt = g_queue_pop_head(&conn->secondary_list);
432 
433     if (ppkt->tcp_seq == ppkt->seq_end) {
434         colo_release_primary_pkt(s, ppkt);
435         ppkt = NULL;
436     }
437 
438     if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
439         trace_colo_compare_main("pri: this packet has compared");
440         colo_release_primary_pkt(s, ppkt);
441         ppkt = NULL;
442     }
443 
444     if (spkt->tcp_seq == spkt->seq_end) {
445         packet_destroy(spkt, NULL);
446         if (!ppkt) {
447             goto pri;
448         } else {
449             goto sec;
450         }
451     } else {
452         if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
453             trace_colo_compare_main("sec: this packet has compared");
454             packet_destroy(spkt, NULL);
455             if (!ppkt) {
456                 goto pri;
457             } else {
458                 goto sec;
459             }
460         }
461         if (!ppkt) {
462             g_queue_push_head(&conn->secondary_list, spkt);
463             goto pri;
464         }
465     }
466 
467     if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
468         trace_colo_compare_tcp_info("pri",
469                                     ppkt->tcp_seq, ppkt->tcp_ack,
470                                     ppkt->header_size, ppkt->payload_size,
471                                     ppkt->offset, ppkt->flags);
472 
473         trace_colo_compare_tcp_info("sec",
474                                     spkt->tcp_seq, spkt->tcp_ack,
475                                     spkt->header_size, spkt->payload_size,
476                                     spkt->offset, spkt->flags);
477 
478         if (mark == COLO_COMPARE_FREE_PRIMARY) {
479             conn->compare_seq = ppkt->seq_end;
480             colo_release_primary_pkt(s, ppkt);
481             g_queue_push_head(&conn->secondary_list, spkt);
482             goto pri;
483         }
484         if (mark == COLO_COMPARE_FREE_SECONDARY) {
485             conn->compare_seq = spkt->seq_end;
486             packet_destroy(spkt, NULL);
487             goto sec;
488         }
489         if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
490             conn->compare_seq = ppkt->seq_end;
491             colo_release_primary_pkt(s, ppkt);
492             packet_destroy(spkt, NULL);
493             goto pri;
494         }
495     } else {
496         g_queue_push_head(&conn->primary_list, ppkt);
497         g_queue_push_head(&conn->secondary_list, spkt);
498 
499 #ifdef DEBUG_COLO_PACKETS
500         qemu_hexdump((char *)ppkt->data, stderr,
501                      "colo-compare ppkt", ppkt->size);
502         qemu_hexdump((char *)spkt->data, stderr,
503                      "colo-compare spkt", spkt->size);
504 #endif
505 
506         colo_compare_inconsistency_notify(s);
507     }
508 }
509 
510 
511 /*
512  * Called from the compare thread on the primary
513  * for compare udp packet
514  */
515 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
516 {
517     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
518     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
519 
520     trace_colo_compare_main("compare udp");
521 
522     /*
523      * Because of ppkt and spkt are both in the same connection,
524      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
525      * same with spkt. In addition, IP header's Identification is a random
526      * field, we can handle it in IP fragmentation function later.
527      * COLO just concern the response net packet payload from primary guest
528      * and secondary guest are same or not, So we ignored all IP header include
529      * other field like TOS,TTL,IP Checksum. we only need to compare
530      * the ip payload here.
531      */
532     if (ppkt->size != spkt->size) {
533         trace_colo_compare_main("UDP: payload size of packets are different");
534         return -1;
535     }
536     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
537                                     ppkt->size - offset)) {
538         trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
539         trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
540 #ifdef DEBUG_COLO_PACKETS
541         qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
542                      ppkt->size);
543         qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
544                      spkt->size);
545 #endif
546         return -1;
547     } else {
548         return 0;
549     }
550 }
551 
552 /*
553  * Called from the compare thread on the primary
554  * for compare icmp packet
555  */
556 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
557 {
558     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
559     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
560 
561     trace_colo_compare_main("compare icmp");
562 
563     /*
564      * Because of ppkt and spkt are both in the same connection,
565      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
566      * same with spkt. In addition, IP header's Identification is a random
567      * field, we can handle it in IP fragmentation function later.
568      * COLO just concern the response net packet payload from primary guest
569      * and secondary guest are same or not, So we ignored all IP header include
570      * other field like TOS,TTL,IP Checksum. we only need to compare
571      * the ip payload here.
572      */
573     if (ppkt->size != spkt->size) {
574         trace_colo_compare_main("ICMP: payload size of packets are different");
575         return -1;
576     }
577     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
578                                     ppkt->size - offset)) {
579         trace_colo_compare_icmp_miscompare("primary pkt size",
580                                            ppkt->size);
581         trace_colo_compare_icmp_miscompare("Secondary pkt size",
582                                            spkt->size);
583 #ifdef DEBUG_COLO_PACKETS
584         qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
585                      ppkt->size);
586         qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
587                      spkt->size);
588 #endif
589         return -1;
590     } else {
591         return 0;
592     }
593 }
594 
595 /*
596  * Called from the compare thread on the primary
597  * for compare other packet
598  */
599 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
600 {
601     uint16_t offset = ppkt->vnet_hdr_len;
602 
603     trace_colo_compare_main("compare other");
604     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_IP_INFO)) {
605         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
606 
607         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
608         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
609         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
610         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
611 
612         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
613                                    pri_ip_dst, spkt->size,
614                                    sec_ip_src, sec_ip_dst);
615     }
616 
617     if (ppkt->size != spkt->size) {
618         trace_colo_compare_main("Other: payload size of packets are different");
619         return -1;
620     }
621     return colo_compare_packet_payload(ppkt, spkt, offset, offset,
622                                        ppkt->size - offset);
623 }
624 
625 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
626 {
627     int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
628 
629     if ((now - pkt->creation_ms) > (*check_time)) {
630         trace_colo_old_packet_check_found(pkt->creation_ms);
631         return 0;
632     } else {
633         return 1;
634     }
635 }
636 
637 void colo_compare_register_notifier(Notifier *notify)
638 {
639     notifier_list_add(&colo_compare_notifiers, notify);
640 }
641 
642 void colo_compare_unregister_notifier(Notifier *notify)
643 {
644     notifier_remove(notify);
645 }
646 
647 static int colo_old_packet_check_one_conn(Connection *conn,
648                                           CompareState *s)
649 {
650     GList *result = NULL;
651 
652     result = g_queue_find_custom(&conn->primary_list,
653                                  &s->compare_timeout,
654                                  (GCompareFunc)colo_old_packet_check_one);
655 
656     if (result) {
657         /* Do checkpoint will flush old packet */
658         colo_compare_inconsistency_notify(s);
659         return 0;
660     }
661 
662     return 1;
663 }
664 
665 /*
666  * Look for old packets that the secondary hasn't matched,
667  * if we have some then we have to checkpoint to wake
668  * the secondary up.
669  */
670 static void colo_old_packet_check(void *opaque)
671 {
672     CompareState *s = opaque;
673 
674     /*
675      * If we find one old packet, stop finding job and notify
676      * COLO frame do checkpoint.
677      */
678     g_queue_find_custom(&s->conn_list, s,
679                         (GCompareFunc)colo_old_packet_check_one_conn);
680 }
681 
682 static void colo_compare_packet(CompareState *s, Connection *conn,
683                                 int (*HandlePacket)(Packet *spkt,
684                                 Packet *ppkt))
685 {
686     Packet *pkt = NULL;
687     GList *result = NULL;
688 
689     while (!g_queue_is_empty(&conn->primary_list) &&
690            !g_queue_is_empty(&conn->secondary_list)) {
691         pkt = g_queue_pop_head(&conn->primary_list);
692         result = g_queue_find_custom(&conn->secondary_list,
693                  pkt, (GCompareFunc)HandlePacket);
694 
695         if (result) {
696             colo_release_primary_pkt(s, pkt);
697             g_queue_remove(&conn->secondary_list, result->data);
698         } else {
699             /*
700              * If one packet arrive late, the secondary_list or
701              * primary_list will be empty, so we can't compare it
702              * until next comparison. If the packets in the list are
703              * timeout, it will trigger a checkpoint request.
704              */
705             trace_colo_compare_main("packet different");
706             g_queue_push_head(&conn->primary_list, pkt);
707 
708             colo_compare_inconsistency_notify(s);
709             break;
710         }
711     }
712 }
713 
714 /*
715  * Called from the compare thread on the primary
716  * for compare packet with secondary list of the
717  * specified connection when a new packet was
718  * queued to it.
719  */
720 static void colo_compare_connection(void *opaque, void *user_data)
721 {
722     CompareState *s = user_data;
723     Connection *conn = opaque;
724 
725     switch (conn->ip_proto) {
726     case IPPROTO_TCP:
727         colo_compare_tcp(s, conn);
728         break;
729     case IPPROTO_UDP:
730         colo_compare_packet(s, conn, colo_packet_compare_udp);
731         break;
732     case IPPROTO_ICMP:
733         colo_compare_packet(s, conn, colo_packet_compare_icmp);
734         break;
735     default:
736         colo_compare_packet(s, conn, colo_packet_compare_other);
737         break;
738     }
739 }
740 
741 static void coroutine_fn _compare_chr_send(void *opaque)
742 {
743     SendCo *sendco = opaque;
744     CompareState *s = sendco->s;
745     int ret = 0;
746 
747     while (!g_queue_is_empty(&sendco->send_list)) {
748         SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
749         uint32_t len = htonl(entry->size);
750 
751         ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
752 
753         if (ret != sizeof(len)) {
754             g_free(entry->buf);
755             g_slice_free(SendEntry, entry);
756             goto err;
757         }
758 
759         if (!sendco->notify_remote_frame && s->vnet_hdr) {
760             /*
761              * We send vnet header len make other module(like filter-redirector)
762              * know how to parse net packet correctly.
763              */
764             len = htonl(entry->vnet_hdr_len);
765 
766             ret = qemu_chr_fe_write_all(sendco->chr,
767                                         (uint8_t *)&len,
768                                         sizeof(len));
769 
770             if (ret != sizeof(len)) {
771                 g_free(entry->buf);
772                 g_slice_free(SendEntry, entry);
773                 goto err;
774             }
775         }
776 
777         ret = qemu_chr_fe_write_all(sendco->chr,
778                                     (uint8_t *)entry->buf,
779                                     entry->size);
780 
781         if (ret != entry->size) {
782             g_free(entry->buf);
783             g_slice_free(SendEntry, entry);
784             goto err;
785         }
786 
787         g_free(entry->buf);
788         g_slice_free(SendEntry, entry);
789     }
790 
791     sendco->ret = 0;
792     goto out;
793 
794 err:
795     while (!g_queue_is_empty(&sendco->send_list)) {
796         SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
797         g_free(entry->buf);
798         g_slice_free(SendEntry, entry);
799     }
800     sendco->ret = ret < 0 ? ret : -EIO;
801 out:
802     sendco->co = NULL;
803     sendco->done = true;
804     aio_wait_kick();
805 }
806 
807 static int compare_chr_send(CompareState *s,
808                             uint8_t *buf,
809                             uint32_t size,
810                             uint32_t vnet_hdr_len,
811                             bool notify_remote_frame,
812                             bool zero_copy)
813 {
814     SendCo *sendco;
815     SendEntry *entry;
816 
817     if (notify_remote_frame) {
818         sendco = &s->notify_sendco;
819     } else {
820         sendco = &s->out_sendco;
821     }
822 
823     if (!size) {
824         return 0;
825     }
826 
827     entry = g_slice_new(SendEntry);
828     entry->size = size;
829     entry->vnet_hdr_len = vnet_hdr_len;
830     if (zero_copy) {
831         entry->buf = buf;
832     } else {
833         entry->buf = g_malloc(size);
834         memcpy(entry->buf, buf, size);
835     }
836     g_queue_push_head(&sendco->send_list, entry);
837 
838     if (sendco->done) {
839         sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
840         sendco->done = false;
841         qemu_coroutine_enter(sendco->co);
842         if (sendco->done) {
843             /* report early errors */
844             return sendco->ret;
845         }
846     }
847 
848     /* assume success */
849     return 0;
850 }
851 
852 static int compare_chr_can_read(void *opaque)
853 {
854     return COMPARE_READ_LEN_MAX;
855 }
856 
857 /*
858  * Called from the main thread on the primary for packets
859  * arriving over the socket from the primary.
860  */
861 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
862 {
863     CompareState *s = COLO_COMPARE(opaque);
864     int ret;
865 
866     ret = net_fill_rstate(&s->pri_rs, buf, size);
867     if (ret == -1) {
868         qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
869                                  NULL, NULL, true);
870         error_report("colo-compare primary_in error");
871     }
872 }
873 
874 /*
875  * Called from the main thread on the primary for packets
876  * arriving over the socket from the secondary.
877  */
878 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
879 {
880     CompareState *s = COLO_COMPARE(opaque);
881     int ret;
882 
883     ret = net_fill_rstate(&s->sec_rs, buf, size);
884     if (ret == -1) {
885         qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
886                                  NULL, NULL, true);
887         error_report("colo-compare secondary_in error");
888     }
889 }
890 
891 static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
892 {
893     CompareState *s = COLO_COMPARE(opaque);
894     int ret;
895 
896     ret = net_fill_rstate(&s->notify_rs, buf, size);
897     if (ret == -1) {
898         qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
899                                  NULL, NULL, true);
900         error_report("colo-compare notify_dev error");
901     }
902 }
903 
904 /*
905  * Check old packet regularly so it can watch for any packets
906  * that the secondary hasn't produced equivalents of.
907  */
908 static void check_old_packet_regular(void *opaque)
909 {
910     CompareState *s = opaque;
911 
912     /* if have old packet we will notify checkpoint */
913     colo_old_packet_check(s);
914     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
915               s->expired_scan_cycle);
916 }
917 
918 /* Public API, Used for COLO frame to notify compare event */
919 void colo_notify_compares_event(void *opaque, int event, Error **errp)
920 {
921     CompareState *s;
922     qemu_mutex_lock(&colo_compare_mutex);
923 
924     if (!colo_compare_active) {
925         qemu_mutex_unlock(&colo_compare_mutex);
926         return;
927     }
928 
929     qemu_mutex_lock(&event_mtx);
930     QTAILQ_FOREACH(s, &net_compares, next) {
931         s->event = event;
932         qemu_bh_schedule(s->event_bh);
933         event_unhandled_count++;
934     }
935     /* Wait all compare threads to finish handling this event */
936     while (event_unhandled_count > 0) {
937         qemu_cond_wait(&event_complete_cond, &event_mtx);
938     }
939 
940     qemu_mutex_unlock(&event_mtx);
941     qemu_mutex_unlock(&colo_compare_mutex);
942 }
943 
944 static void colo_compare_timer_init(CompareState *s)
945 {
946     AioContext *ctx = iothread_get_aio_context(s->iothread);
947 
948     s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
949                                 SCALE_MS, check_old_packet_regular,
950                                 s);
951     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
952               s->expired_scan_cycle);
953 }
954 
955 static void colo_compare_timer_del(CompareState *s)
956 {
957     if (s->packet_check_timer) {
958         timer_del(s->packet_check_timer);
959         timer_free(s->packet_check_timer);
960         s->packet_check_timer = NULL;
961     }
962  }
963 
964 static void colo_flush_packets(void *opaque, void *user_data);
965 
966 static void colo_compare_handle_event(void *opaque)
967 {
968     CompareState *s = opaque;
969 
970     switch (s->event) {
971     case COLO_EVENT_CHECKPOINT:
972         g_queue_foreach(&s->conn_list, colo_flush_packets, s);
973         break;
974     case COLO_EVENT_FAILOVER:
975         break;
976     default:
977         break;
978     }
979 
980     qemu_mutex_lock(&event_mtx);
981     assert(event_unhandled_count > 0);
982     event_unhandled_count--;
983     qemu_cond_broadcast(&event_complete_cond);
984     qemu_mutex_unlock(&event_mtx);
985 }
986 
987 static void colo_compare_iothread(CompareState *s)
988 {
989     AioContext *ctx = iothread_get_aio_context(s->iothread);
990     object_ref(OBJECT(s->iothread));
991     s->worker_context = iothread_get_g_main_context(s->iothread);
992 
993     qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
994                              compare_pri_chr_in, NULL, NULL,
995                              s, s->worker_context, true);
996     qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
997                              compare_sec_chr_in, NULL, NULL,
998                              s, s->worker_context, true);
999     if (s->notify_dev) {
1000         qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
1001                                  compare_notify_chr, NULL, NULL,
1002                                  s, s->worker_context, true);
1003     }
1004 
1005     colo_compare_timer_init(s);
1006     s->event_bh = aio_bh_new(ctx, colo_compare_handle_event, s);
1007 }
1008 
1009 static char *compare_get_pri_indev(Object *obj, Error **errp)
1010 {
1011     CompareState *s = COLO_COMPARE(obj);
1012 
1013     return g_strdup(s->pri_indev);
1014 }
1015 
1016 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
1017 {
1018     CompareState *s = COLO_COMPARE(obj);
1019 
1020     g_free(s->pri_indev);
1021     s->pri_indev = g_strdup(value);
1022 }
1023 
1024 static char *compare_get_sec_indev(Object *obj, Error **errp)
1025 {
1026     CompareState *s = COLO_COMPARE(obj);
1027 
1028     return g_strdup(s->sec_indev);
1029 }
1030 
1031 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
1032 {
1033     CompareState *s = COLO_COMPARE(obj);
1034 
1035     g_free(s->sec_indev);
1036     s->sec_indev = g_strdup(value);
1037 }
1038 
1039 static char *compare_get_outdev(Object *obj, Error **errp)
1040 {
1041     CompareState *s = COLO_COMPARE(obj);
1042 
1043     return g_strdup(s->outdev);
1044 }
1045 
1046 static void compare_set_outdev(Object *obj, const char *value, Error **errp)
1047 {
1048     CompareState *s = COLO_COMPARE(obj);
1049 
1050     g_free(s->outdev);
1051     s->outdev = g_strdup(value);
1052 }
1053 
1054 static bool compare_get_vnet_hdr(Object *obj, Error **errp)
1055 {
1056     CompareState *s = COLO_COMPARE(obj);
1057 
1058     return s->vnet_hdr;
1059 }
1060 
1061 static void compare_set_vnet_hdr(Object *obj,
1062                                  bool value,
1063                                  Error **errp)
1064 {
1065     CompareState *s = COLO_COMPARE(obj);
1066 
1067     s->vnet_hdr = value;
1068 }
1069 
1070 static char *compare_get_notify_dev(Object *obj, Error **errp)
1071 {
1072     CompareState *s = COLO_COMPARE(obj);
1073 
1074     return g_strdup(s->notify_dev);
1075 }
1076 
1077 static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
1078 {
1079     CompareState *s = COLO_COMPARE(obj);
1080 
1081     g_free(s->notify_dev);
1082     s->notify_dev = g_strdup(value);
1083 }
1084 
1085 static void compare_get_timeout(Object *obj, Visitor *v,
1086                                 const char *name, void *opaque,
1087                                 Error **errp)
1088 {
1089     CompareState *s = COLO_COMPARE(obj);
1090     uint32_t value = s->compare_timeout;
1091 
1092     visit_type_uint32(v, name, &value, errp);
1093 }
1094 
1095 static void compare_set_timeout(Object *obj, Visitor *v,
1096                                 const char *name, void *opaque,
1097                                 Error **errp)
1098 {
1099     CompareState *s = COLO_COMPARE(obj);
1100     uint32_t value;
1101 
1102     if (!visit_type_uint32(v, name, &value, errp)) {
1103         return;
1104     }
1105     if (!value) {
1106         error_setg(errp, "Property '%s.%s' requires a positive value",
1107                    object_get_typename(obj), name);
1108         return;
1109     }
1110     s->compare_timeout = value;
1111 }
1112 
1113 static void compare_get_expired_scan_cycle(Object *obj, Visitor *v,
1114                                            const char *name, void *opaque,
1115                                            Error **errp)
1116 {
1117     CompareState *s = COLO_COMPARE(obj);
1118     uint32_t value = s->expired_scan_cycle;
1119 
1120     visit_type_uint32(v, name, &value, errp);
1121 }
1122 
1123 static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
1124                                            const char *name, void *opaque,
1125                                            Error **errp)
1126 {
1127     CompareState *s = COLO_COMPARE(obj);
1128     uint32_t value;
1129 
1130     if (!visit_type_uint32(v, name, &value, errp)) {
1131         return;
1132     }
1133     if (!value) {
1134         error_setg(errp, "Property '%s.%s' requires a positive value",
1135                    object_get_typename(obj), name);
1136         return;
1137     }
1138     s->expired_scan_cycle = value;
1139 }
1140 
1141 static void get_max_queue_size(Object *obj, Visitor *v,
1142                                const char *name, void *opaque,
1143                                Error **errp)
1144 {
1145     uint32_t value = max_queue_size;
1146 
1147     visit_type_uint32(v, name, &value, errp);
1148 }
1149 
1150 static void set_max_queue_size(Object *obj, Visitor *v,
1151                                const char *name, void *opaque,
1152                                Error **errp)
1153 {
1154     Error *local_err = NULL;
1155     uint32_t value;
1156 
1157     visit_type_uint32(v, name, &value, &local_err);
1158     if (local_err) {
1159         goto out;
1160     }
1161     if (!value) {
1162         error_setg(&local_err, "Property '%s.%s' requires a positive value",
1163                    object_get_typename(obj), name);
1164         goto out;
1165     }
1166     max_queue_size = value;
1167 
1168 out:
1169     error_propagate(errp, local_err);
1170 }
1171 
1172 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
1173 {
1174     CompareState *s = container_of(pri_rs, CompareState, pri_rs);
1175     Connection *conn = NULL;
1176 
1177     if (packet_enqueue(s, PRIMARY_IN, &conn)) {
1178         trace_colo_compare_main("primary: unsupported packet in");
1179         compare_chr_send(s,
1180                          pri_rs->buf,
1181                          pri_rs->packet_len,
1182                          pri_rs->vnet_hdr_len,
1183                          false,
1184                          false);
1185     } else {
1186         /* compare packet in the specified connection */
1187         colo_compare_connection(conn, s);
1188     }
1189 }
1190 
1191 static void compare_sec_rs_finalize(SocketReadState *sec_rs)
1192 {
1193     CompareState *s = container_of(sec_rs, CompareState, sec_rs);
1194     Connection *conn = NULL;
1195 
1196     if (packet_enqueue(s, SECONDARY_IN, &conn)) {
1197         trace_colo_compare_main("secondary: unsupported packet in");
1198     } else {
1199         /* compare packet in the specified connection */
1200         colo_compare_connection(conn, s);
1201     }
1202 }
1203 
1204 static void compare_notify_rs_finalize(SocketReadState *notify_rs)
1205 {
1206     CompareState *s = container_of(notify_rs, CompareState, notify_rs);
1207 
1208     const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
1209     int ret;
1210 
1211     if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
1212                            notify_rs->buf,
1213                            notify_rs->packet_len)) {
1214         ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
1215         if (ret < 0) {
1216             error_report("Notify Xen COLO-frame INIT failed");
1217         }
1218     } else if (packet_matches_str("COLO_CHECKPOINT",
1219                                   notify_rs->buf,
1220                                   notify_rs->packet_len)) {
1221         /* colo-compare do checkpoint, flush pri packet and remove sec packet */
1222         g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1223     } else {
1224         error_report("COLO compare got unsupported instruction");
1225     }
1226 }
1227 
1228 /*
1229  * Return 0 is success.
1230  * Return 1 is failed.
1231  */
1232 static int find_and_check_chardev(Chardev **chr,
1233                                   char *chr_name,
1234                                   Error **errp)
1235 {
1236     *chr = qemu_chr_find(chr_name);
1237     if (*chr == NULL) {
1238         error_setg(errp, "Device '%s' not found",
1239                    chr_name);
1240         return 1;
1241     }
1242 
1243     if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
1244         error_setg(errp, "chardev \"%s\" is not reconnectable",
1245                    chr_name);
1246         return 1;
1247     }
1248 
1249     if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
1250         error_setg(errp, "chardev \"%s\" cannot switch context",
1251                    chr_name);
1252         return 1;
1253     }
1254 
1255     return 0;
1256 }
1257 
1258 /*
1259  * Called from the main thread on the primary
1260  * to setup colo-compare.
1261  */
1262 static void colo_compare_complete(UserCreatable *uc, Error **errp)
1263 {
1264     CompareState *s = COLO_COMPARE(uc);
1265     Chardev *chr;
1266 
1267     if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
1268         error_setg(errp, "colo compare needs 'primary_in' ,"
1269                    "'secondary_in','outdev','iothread' property set");
1270         return;
1271     } else if (!strcmp(s->pri_indev, s->outdev) ||
1272                !strcmp(s->sec_indev, s->outdev) ||
1273                !strcmp(s->pri_indev, s->sec_indev)) {
1274         error_setg(errp, "'indev' and 'outdev' could not be same "
1275                    "for compare module");
1276         return;
1277     }
1278 
1279     if (!s->compare_timeout) {
1280         /* Set default value to 3000 MS */
1281         s->compare_timeout = DEFAULT_TIME_OUT_MS;
1282     }
1283 
1284     if (!s->expired_scan_cycle) {
1285         /* Set default value to 3000 MS */
1286         s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
1287     }
1288 
1289     if (!max_queue_size) {
1290         /* Set default queue size to 1024 */
1291         max_queue_size = MAX_QUEUE_SIZE;
1292     }
1293 
1294     if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
1295         !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
1296         return;
1297     }
1298 
1299     if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
1300         !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
1301         return;
1302     }
1303 
1304     if (find_and_check_chardev(&chr, s->outdev, errp) ||
1305         !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
1306         return;
1307     }
1308 
1309     net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
1310     net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
1311 
1312     /* Try to enable remote notify chardev, currently just for Xen COLO */
1313     if (s->notify_dev) {
1314         if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
1315             !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
1316             return;
1317         }
1318 
1319         net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
1320                            s->vnet_hdr);
1321     }
1322 
1323     s->out_sendco.s = s;
1324     s->out_sendco.chr = &s->chr_out;
1325     s->out_sendco.notify_remote_frame = false;
1326     s->out_sendco.done = true;
1327     g_queue_init(&s->out_sendco.send_list);
1328 
1329     if (s->notify_dev) {
1330         s->notify_sendco.s = s;
1331         s->notify_sendco.chr = &s->chr_notify_dev;
1332         s->notify_sendco.notify_remote_frame = true;
1333         s->notify_sendco.done = true;
1334         g_queue_init(&s->notify_sendco.send_list);
1335     }
1336 
1337     g_queue_init(&s->conn_list);
1338 
1339     s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1340                                                       connection_key_equal,
1341                                                       g_free,
1342                                                       connection_destroy);
1343 
1344     colo_compare_iothread(s);
1345 
1346     qemu_mutex_lock(&colo_compare_mutex);
1347     if (!colo_compare_active) {
1348         qemu_mutex_init(&event_mtx);
1349         qemu_cond_init(&event_complete_cond);
1350         colo_compare_active = true;
1351     }
1352     QTAILQ_INSERT_TAIL(&net_compares, s, next);
1353     qemu_mutex_unlock(&colo_compare_mutex);
1354 
1355     return;
1356 }
1357 
1358 static void colo_flush_packets(void *opaque, void *user_data)
1359 {
1360     CompareState *s = user_data;
1361     Connection *conn = opaque;
1362     Packet *pkt = NULL;
1363 
1364     while (!g_queue_is_empty(&conn->primary_list)) {
1365         pkt = g_queue_pop_head(&conn->primary_list);
1366         compare_chr_send(s,
1367                          pkt->data,
1368                          pkt->size,
1369                          pkt->vnet_hdr_len,
1370                          false,
1371                          true);
1372         packet_destroy_partial(pkt, NULL);
1373     }
1374     while (!g_queue_is_empty(&conn->secondary_list)) {
1375         pkt = g_queue_pop_head(&conn->secondary_list);
1376         packet_destroy(pkt, NULL);
1377     }
1378 }
1379 
1380 static void colo_compare_class_init(ObjectClass *oc, void *data)
1381 {
1382     UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1383 
1384     ucc->complete = colo_compare_complete;
1385 }
1386 
1387 static void colo_compare_init(Object *obj)
1388 {
1389     CompareState *s = COLO_COMPARE(obj);
1390 
1391     object_property_add_str(obj, "primary_in",
1392                             compare_get_pri_indev, compare_set_pri_indev);
1393     object_property_add_str(obj, "secondary_in",
1394                             compare_get_sec_indev, compare_set_sec_indev);
1395     object_property_add_str(obj, "outdev",
1396                             compare_get_outdev, compare_set_outdev);
1397     object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1398                             (Object **)&s->iothread,
1399                             object_property_allow_set_link,
1400                             OBJ_PROP_LINK_STRONG);
1401     /* This parameter just for Xen COLO */
1402     object_property_add_str(obj, "notify_dev",
1403                             compare_get_notify_dev, compare_set_notify_dev);
1404 
1405     object_property_add(obj, "compare_timeout", "uint32",
1406                         compare_get_timeout,
1407                         compare_set_timeout, NULL, NULL);
1408 
1409     object_property_add(obj, "expired_scan_cycle", "uint32",
1410                         compare_get_expired_scan_cycle,
1411                         compare_set_expired_scan_cycle, NULL, NULL);
1412 
1413     object_property_add(obj, "max_queue_size", "uint32",
1414                         get_max_queue_size,
1415                         set_max_queue_size, NULL, NULL);
1416 
1417     s->vnet_hdr = false;
1418     object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
1419                              compare_set_vnet_hdr);
1420 }
1421 
1422 static void colo_compare_finalize(Object *obj)
1423 {
1424     CompareState *s = COLO_COMPARE(obj);
1425     CompareState *tmp = NULL;
1426 
1427     qemu_mutex_lock(&colo_compare_mutex);
1428     QTAILQ_FOREACH(tmp, &net_compares, next) {
1429         if (tmp == s) {
1430             QTAILQ_REMOVE(&net_compares, s, next);
1431             break;
1432         }
1433     }
1434     if (QTAILQ_EMPTY(&net_compares)) {
1435         colo_compare_active = false;
1436         qemu_mutex_destroy(&event_mtx);
1437         qemu_cond_destroy(&event_complete_cond);
1438     }
1439     qemu_mutex_unlock(&colo_compare_mutex);
1440 
1441     qemu_chr_fe_deinit(&s->chr_pri_in, false);
1442     qemu_chr_fe_deinit(&s->chr_sec_in, false);
1443     qemu_chr_fe_deinit(&s->chr_out, false);
1444     if (s->notify_dev) {
1445         qemu_chr_fe_deinit(&s->chr_notify_dev, false);
1446     }
1447 
1448     colo_compare_timer_del(s);
1449 
1450     qemu_bh_delete(s->event_bh);
1451 
1452     AioContext *ctx = iothread_get_aio_context(s->iothread);
1453     aio_context_acquire(ctx);
1454     AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
1455     if (s->notify_dev) {
1456         AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
1457     }
1458     aio_context_release(ctx);
1459 
1460     /* Release all unhandled packets after compare thead exited */
1461     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1462     AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
1463 
1464     g_queue_clear(&s->conn_list);
1465     g_queue_clear(&s->out_sendco.send_list);
1466     if (s->notify_dev) {
1467         g_queue_clear(&s->notify_sendco.send_list);
1468     }
1469 
1470     if (s->connection_track_table) {
1471         g_hash_table_destroy(s->connection_track_table);
1472     }
1473 
1474     object_unref(OBJECT(s->iothread));
1475 
1476     g_free(s->pri_indev);
1477     g_free(s->sec_indev);
1478     g_free(s->outdev);
1479     g_free(s->notify_dev);
1480 }
1481 
1482 static void __attribute__((__constructor__)) colo_compare_init_globals(void)
1483 {
1484     colo_compare_active = false;
1485     qemu_mutex_init(&colo_compare_mutex);
1486 }
1487 
1488 static const TypeInfo colo_compare_info = {
1489     .name = TYPE_COLO_COMPARE,
1490     .parent = TYPE_OBJECT,
1491     .instance_size = sizeof(CompareState),
1492     .instance_init = colo_compare_init,
1493     .instance_finalize = colo_compare_finalize,
1494     .class_size = sizeof(CompareClass),
1495     .class_init = colo_compare_class_init,
1496     .interfaces = (InterfaceInfo[]) {
1497         { TYPE_USER_CREATABLE },
1498         { }
1499     }
1500 };
1501 
1502 static void register_types(void)
1503 {
1504     type_register_static(&colo_compare_info);
1505 }
1506 
1507 type_init(register_types);
1508