xref: /openbmc/qemu/net/colo-compare.c (revision 5cc8767d)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or
12  * later.  See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 #include "qemu-common.h"
17 #include "qemu/error-report.h"
18 #include "trace.h"
19 #include "qapi/error.h"
20 #include "net/net.h"
21 #include "net/eth.h"
22 #include "qom/object_interfaces.h"
23 #include "qemu/iov.h"
24 #include "qom/object.h"
25 #include "net/queue.h"
26 #include "chardev/char-fe.h"
27 #include "qemu/sockets.h"
28 #include "colo.h"
29 #include "sysemu/iothread.h"
30 #include "net/colo-compare.h"
31 #include "migration/colo.h"
32 #include "migration/migration.h"
33 #include "util.h"
34 
35 #define TYPE_COLO_COMPARE "colo-compare"
36 #define COLO_COMPARE(obj) \
37     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
38 
39 static QTAILQ_HEAD(, CompareState) net_compares =
40        QTAILQ_HEAD_INITIALIZER(net_compares);
41 
42 static NotifierList colo_compare_notifiers =
43     NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
44 
45 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
46 #define MAX_QUEUE_SIZE 1024
47 
48 #define COLO_COMPARE_FREE_PRIMARY     0x01
49 #define COLO_COMPARE_FREE_SECONDARY   0x02
50 
51 /* TODO: Should be configurable */
52 #define REGULAR_PACKET_CHECK_MS 3000
53 
54 static QemuMutex event_mtx;
55 static QemuCond event_complete_cond;
56 static int event_unhandled_count;
57 
58 /*
59  *  + CompareState ++
60  *  |               |
61  *  +---------------+   +---------------+         +---------------+
62  *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
63  *  +---------------+   +---------------+         +---------------+
64  *  |               |     |           |             |          |
65  *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
66  *                    |primary |  |secondary    |primary | |secondary
67  *                    |packet  |  |packet  +    |packet  | |packet  +
68  *                    +--------+  +--------+    +--------+ +--------+
69  *                        |           |             |          |
70  *                    +---v----+  +---v----+    +---v----+ +---v----+
71  *                    |primary |  |secondary    |primary | |secondary
72  *                    |packet  |  |packet  +    |packet  | |packet  +
73  *                    +--------+  +--------+    +--------+ +--------+
74  *                        |           |             |          |
75  *                    +---v----+  +---v----+    +---v----+ +---v----+
76  *                    |primary |  |secondary    |primary | |secondary
77  *                    |packet  |  |packet  +    |packet  | |packet  +
78  *                    +--------+  +--------+    +--------+ +--------+
79  */
80 typedef struct CompareState {
81     Object parent;
82 
83     char *pri_indev;
84     char *sec_indev;
85     char *outdev;
86     char *notify_dev;
87     CharBackend chr_pri_in;
88     CharBackend chr_sec_in;
89     CharBackend chr_out;
90     CharBackend chr_notify_dev;
91     SocketReadState pri_rs;
92     SocketReadState sec_rs;
93     SocketReadState notify_rs;
94     bool vnet_hdr;
95 
96     /*
97      * Record the connection that through the NIC
98      * Element type: Connection
99      */
100     GQueue conn_list;
101     /* Record the connection without repetition */
102     GHashTable *connection_track_table;
103 
104     IOThread *iothread;
105     GMainContext *worker_context;
106     QEMUTimer *packet_check_timer;
107 
108     QEMUBH *event_bh;
109     enum colo_event event;
110 
111     QTAILQ_ENTRY(CompareState) next;
112 } CompareState;
113 
114 typedef struct CompareClass {
115     ObjectClass parent_class;
116 } CompareClass;
117 
118 enum {
119     PRIMARY_IN = 0,
120     SECONDARY_IN,
121 };
122 
123 
124 static int compare_chr_send(CompareState *s,
125                             const uint8_t *buf,
126                             uint32_t size,
127                             uint32_t vnet_hdr_len,
128                             bool notify_remote_frame);
129 
130 static void notify_remote_frame(CompareState *s)
131 {
132     char msg[] = "DO_CHECKPOINT";
133     int ret = 0;
134 
135     ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
136     if (ret < 0) {
137         error_report("Notify Xen COLO-frame failed");
138     }
139 }
140 
141 static void colo_compare_inconsistency_notify(CompareState *s)
142 {
143     if (s->notify_dev) {
144         notify_remote_frame(s);
145     } else {
146         notifier_list_notify(&colo_compare_notifiers,
147                              migrate_get_current());
148     }
149 }
150 
151 static gint seq_sorter(Packet *a, Packet *b, gpointer data)
152 {
153     struct tcp_hdr *atcp, *btcp;
154 
155     atcp = (struct tcp_hdr *)(a->transport_header);
156     btcp = (struct tcp_hdr *)(b->transport_header);
157     return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
158 }
159 
160 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
161 {
162     Packet *pkt = data;
163     struct tcp_hdr *tcphd;
164 
165     tcphd = (struct tcp_hdr *)pkt->transport_header;
166 
167     pkt->tcp_seq = ntohl(tcphd->th_seq);
168     pkt->tcp_ack = ntohl(tcphd->th_ack);
169     *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
170     pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
171                        + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
172     pkt->payload_size = pkt->size - pkt->header_size;
173     pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
174     pkt->flags = tcphd->th_flags;
175 }
176 
177 /*
178  * Return 1 on success, if return 0 means the
179  * packet will be dropped
180  */
181 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
182 {
183     if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
184         if (pkt->ip->ip_p == IPPROTO_TCP) {
185             fill_pkt_tcp_info(pkt, max_ack);
186             g_queue_insert_sorted(queue,
187                                   pkt,
188                                   (GCompareDataFunc)seq_sorter,
189                                   NULL);
190         } else {
191             g_queue_push_tail(queue, pkt);
192         }
193         return 1;
194     }
195     return 0;
196 }
197 
198 /*
199  * Return 0 on success, if return -1 means the pkt
200  * is unsupported(arp and ipv6) and will be sent later
201  */
202 static int packet_enqueue(CompareState *s, int mode, Connection **con)
203 {
204     ConnectionKey key;
205     Packet *pkt = NULL;
206     Connection *conn;
207 
208     if (mode == PRIMARY_IN) {
209         pkt = packet_new(s->pri_rs.buf,
210                          s->pri_rs.packet_len,
211                          s->pri_rs.vnet_hdr_len);
212     } else {
213         pkt = packet_new(s->sec_rs.buf,
214                          s->sec_rs.packet_len,
215                          s->sec_rs.vnet_hdr_len);
216     }
217 
218     if (parse_packet_early(pkt)) {
219         packet_destroy(pkt, NULL);
220         pkt = NULL;
221         return -1;
222     }
223     fill_connection_key(pkt, &key);
224 
225     conn = connection_get(s->connection_track_table,
226                           &key,
227                           &s->conn_list);
228 
229     if (!conn->processing) {
230         g_queue_push_tail(&s->conn_list, conn);
231         conn->processing = true;
232     }
233 
234     if (mode == PRIMARY_IN) {
235         if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
236             error_report("colo compare primary queue size too big,"
237                          "drop packet");
238         }
239     } else {
240         if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
241             error_report("colo compare secondary queue size too big,"
242                          "drop packet");
243         }
244     }
245     *con = conn;
246 
247     return 0;
248 }
249 
250 static inline bool after(uint32_t seq1, uint32_t seq2)
251 {
252         return (int32_t)(seq1 - seq2) > 0;
253 }
254 
255 static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
256 {
257     int ret;
258     ret = compare_chr_send(s,
259                            pkt->data,
260                            pkt->size,
261                            pkt->vnet_hdr_len,
262                            false);
263     if (ret < 0) {
264         error_report("colo send primary packet failed");
265     }
266     trace_colo_compare_main("packet same and release packet");
267     packet_destroy(pkt, NULL);
268 }
269 
270 /*
271  * The IP packets sent by primary and secondary
272  * will be compared in here
273  * TODO support ip fragment, Out-Of-Order
274  * return:    0  means packet same
275  *            > 0 || < 0 means packet different
276  */
277 static int colo_compare_packet_payload(Packet *ppkt,
278                                        Packet *spkt,
279                                        uint16_t poffset,
280                                        uint16_t soffset,
281                                        uint16_t len)
282 
283 {
284     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
285         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
286 
287         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
288         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
289         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
290         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
291 
292         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
293                                    pri_ip_dst, spkt->size,
294                                    sec_ip_src, sec_ip_dst);
295     }
296 
297     return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
298 }
299 
300 /*
301  * return true means that the payload is consist and
302  * need to make the next comparison, false means do
303  * the checkpoint
304 */
305 static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
306                               int8_t *mark, uint32_t max_ack)
307 {
308     *mark = 0;
309 
310     if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
311         if (colo_compare_packet_payload(ppkt, spkt,
312                                         ppkt->header_size, spkt->header_size,
313                                         ppkt->payload_size)) {
314             *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
315             return true;
316         }
317     }
318 
319     /* one part of secondary packet payload still need to be compared */
320     if (!after(ppkt->seq_end, spkt->seq_end)) {
321         if (colo_compare_packet_payload(ppkt, spkt,
322                                         ppkt->header_size + ppkt->offset,
323                                         spkt->header_size + spkt->offset,
324                                         ppkt->payload_size - ppkt->offset)) {
325             if (!after(ppkt->tcp_ack, max_ack)) {
326                 *mark = COLO_COMPARE_FREE_PRIMARY;
327                 spkt->offset += ppkt->payload_size - ppkt->offset;
328                 return true;
329             } else {
330                 /* secondary guest hasn't ack the data, don't send
331                  * out this packet
332                  */
333                 return false;
334             }
335         }
336     } else {
337         /* primary packet is longer than secondary packet, compare
338          * the same part and mark the primary packet offset
339          */
340         if (colo_compare_packet_payload(ppkt, spkt,
341                                         ppkt->header_size + ppkt->offset,
342                                         spkt->header_size + spkt->offset,
343                                         spkt->payload_size - spkt->offset)) {
344             *mark = COLO_COMPARE_FREE_SECONDARY;
345             ppkt->offset += spkt->payload_size - spkt->offset;
346             return true;
347         }
348     }
349 
350     return false;
351 }
352 
353 static void colo_compare_tcp(CompareState *s, Connection *conn)
354 {
355     Packet *ppkt = NULL, *spkt = NULL;
356     int8_t mark;
357 
358     /*
359      * If ppkt and spkt have the same payload, but ppkt's ACK
360      * is greater than spkt's ACK, in this case we can not
361      * send the ppkt because it will cause the secondary guest
362      * to miss sending some data in the next. Therefore, we
363      * record the maximum ACK in the current queue at both
364      * primary side and secondary side. Only when the ack is
365      * less than the smaller of the two maximum ack, then we
366      * can ensure that the packet's payload is acknowledged by
367      * primary and secondary.
368     */
369     uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
370 
371 pri:
372     if (g_queue_is_empty(&conn->primary_list)) {
373         return;
374     }
375     ppkt = g_queue_pop_head(&conn->primary_list);
376 sec:
377     if (g_queue_is_empty(&conn->secondary_list)) {
378         g_queue_push_head(&conn->primary_list, ppkt);
379         return;
380     }
381     spkt = g_queue_pop_head(&conn->secondary_list);
382 
383     if (ppkt->tcp_seq == ppkt->seq_end) {
384         colo_release_primary_pkt(s, ppkt);
385         ppkt = NULL;
386     }
387 
388     if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
389         trace_colo_compare_main("pri: this packet has compared");
390         colo_release_primary_pkt(s, ppkt);
391         ppkt = NULL;
392     }
393 
394     if (spkt->tcp_seq == spkt->seq_end) {
395         packet_destroy(spkt, NULL);
396         if (!ppkt) {
397             goto pri;
398         } else {
399             goto sec;
400         }
401     } else {
402         if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
403             trace_colo_compare_main("sec: this packet has compared");
404             packet_destroy(spkt, NULL);
405             if (!ppkt) {
406                 goto pri;
407             } else {
408                 goto sec;
409             }
410         }
411         if (!ppkt) {
412             g_queue_push_head(&conn->secondary_list, spkt);
413             goto pri;
414         }
415     }
416 
417     if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
418         trace_colo_compare_tcp_info("pri",
419                                     ppkt->tcp_seq, ppkt->tcp_ack,
420                                     ppkt->header_size, ppkt->payload_size,
421                                     ppkt->offset, ppkt->flags);
422 
423         trace_colo_compare_tcp_info("sec",
424                                     spkt->tcp_seq, spkt->tcp_ack,
425                                     spkt->header_size, spkt->payload_size,
426                                     spkt->offset, spkt->flags);
427 
428         if (mark == COLO_COMPARE_FREE_PRIMARY) {
429             conn->compare_seq = ppkt->seq_end;
430             colo_release_primary_pkt(s, ppkt);
431             g_queue_push_head(&conn->secondary_list, spkt);
432             goto pri;
433         }
434         if (mark == COLO_COMPARE_FREE_SECONDARY) {
435             conn->compare_seq = spkt->seq_end;
436             packet_destroy(spkt, NULL);
437             goto sec;
438         }
439         if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
440             conn->compare_seq = ppkt->seq_end;
441             colo_release_primary_pkt(s, ppkt);
442             packet_destroy(spkt, NULL);
443             goto pri;
444         }
445     } else {
446         g_queue_push_head(&conn->primary_list, ppkt);
447         g_queue_push_head(&conn->secondary_list, spkt);
448 
449         qemu_hexdump((char *)ppkt->data, stderr,
450                      "colo-compare ppkt", ppkt->size);
451         qemu_hexdump((char *)spkt->data, stderr,
452                      "colo-compare spkt", spkt->size);
453 
454         colo_compare_inconsistency_notify(s);
455     }
456 }
457 
458 
459 /*
460  * Called from the compare thread on the primary
461  * for compare udp packet
462  */
463 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
464 {
465     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
466     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
467 
468     trace_colo_compare_main("compare udp");
469 
470     /*
471      * Because of ppkt and spkt are both in the same connection,
472      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
473      * same with spkt. In addition, IP header's Identification is a random
474      * field, we can handle it in IP fragmentation function later.
475      * COLO just concern the response net packet payload from primary guest
476      * and secondary guest are same or not, So we ignored all IP header include
477      * other field like TOS,TTL,IP Checksum. we only need to compare
478      * the ip payload here.
479      */
480     if (ppkt->size != spkt->size) {
481         trace_colo_compare_main("UDP: payload size of packets are different");
482         return -1;
483     }
484     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
485                                     ppkt->size - offset)) {
486         trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
487         trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
488         if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
489             qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
490                          ppkt->size);
491             qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
492                          spkt->size);
493         }
494         return -1;
495     } else {
496         return 0;
497     }
498 }
499 
500 /*
501  * Called from the compare thread on the primary
502  * for compare icmp packet
503  */
504 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
505 {
506     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
507     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
508 
509     trace_colo_compare_main("compare icmp");
510 
511     /*
512      * Because of ppkt and spkt are both in the same connection,
513      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
514      * same with spkt. In addition, IP header's Identification is a random
515      * field, we can handle it in IP fragmentation function later.
516      * COLO just concern the response net packet payload from primary guest
517      * and secondary guest are same or not, So we ignored all IP header include
518      * other field like TOS,TTL,IP Checksum. we only need to compare
519      * the ip payload here.
520      */
521     if (ppkt->size != spkt->size) {
522         trace_colo_compare_main("ICMP: payload size of packets are different");
523         return -1;
524     }
525     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
526                                     ppkt->size - offset)) {
527         trace_colo_compare_icmp_miscompare("primary pkt size",
528                                            ppkt->size);
529         trace_colo_compare_icmp_miscompare("Secondary pkt size",
530                                            spkt->size);
531         if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
532             qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
533                          ppkt->size);
534             qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
535                          spkt->size);
536         }
537         return -1;
538     } else {
539         return 0;
540     }
541 }
542 
543 /*
544  * Called from the compare thread on the primary
545  * for compare other packet
546  */
547 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
548 {
549     uint16_t offset = ppkt->vnet_hdr_len;
550 
551     trace_colo_compare_main("compare other");
552     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
553         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
554 
555         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
556         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
557         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
558         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
559 
560         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
561                                    pri_ip_dst, spkt->size,
562                                    sec_ip_src, sec_ip_dst);
563     }
564 
565     if (ppkt->size != spkt->size) {
566         trace_colo_compare_main("Other: payload size of packets are different");
567         return -1;
568     }
569     return colo_compare_packet_payload(ppkt, spkt, offset, offset,
570                                        ppkt->size - offset);
571 }
572 
573 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
574 {
575     int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
576 
577     if ((now - pkt->creation_ms) > (*check_time)) {
578         trace_colo_old_packet_check_found(pkt->creation_ms);
579         return 0;
580     } else {
581         return 1;
582     }
583 }
584 
585 void colo_compare_register_notifier(Notifier *notify)
586 {
587     notifier_list_add(&colo_compare_notifiers, notify);
588 }
589 
590 void colo_compare_unregister_notifier(Notifier *notify)
591 {
592     notifier_remove(notify);
593 }
594 
595 static int colo_old_packet_check_one_conn(Connection *conn,
596                                           CompareState *s)
597 {
598     GList *result = NULL;
599     int64_t check_time = REGULAR_PACKET_CHECK_MS;
600 
601     result = g_queue_find_custom(&conn->primary_list,
602                                  &check_time,
603                                  (GCompareFunc)colo_old_packet_check_one);
604 
605     if (result) {
606         /* Do checkpoint will flush old packet */
607         colo_compare_inconsistency_notify(s);
608         return 0;
609     }
610 
611     return 1;
612 }
613 
614 /*
615  * Look for old packets that the secondary hasn't matched,
616  * if we have some then we have to checkpoint to wake
617  * the secondary up.
618  */
619 static void colo_old_packet_check(void *opaque)
620 {
621     CompareState *s = opaque;
622 
623     /*
624      * If we find one old packet, stop finding job and notify
625      * COLO frame do checkpoint.
626      */
627     g_queue_find_custom(&s->conn_list, s,
628                         (GCompareFunc)colo_old_packet_check_one_conn);
629 }
630 
631 static void colo_compare_packet(CompareState *s, Connection *conn,
632                                 int (*HandlePacket)(Packet *spkt,
633                                 Packet *ppkt))
634 {
635     Packet *pkt = NULL;
636     GList *result = NULL;
637 
638     while (!g_queue_is_empty(&conn->primary_list) &&
639            !g_queue_is_empty(&conn->secondary_list)) {
640         pkt = g_queue_pop_head(&conn->primary_list);
641         result = g_queue_find_custom(&conn->secondary_list,
642                  pkt, (GCompareFunc)HandlePacket);
643 
644         if (result) {
645             colo_release_primary_pkt(s, pkt);
646             g_queue_remove(&conn->secondary_list, result->data);
647         } else {
648             /*
649              * If one packet arrive late, the secondary_list or
650              * primary_list will be empty, so we can't compare it
651              * until next comparison. If the packets in the list are
652              * timeout, it will trigger a checkpoint request.
653              */
654             trace_colo_compare_main("packet different");
655             g_queue_push_head(&conn->primary_list, pkt);
656 
657             colo_compare_inconsistency_notify(s);
658             break;
659         }
660     }
661 }
662 
663 /*
664  * Called from the compare thread on the primary
665  * for compare packet with secondary list of the
666  * specified connection when a new packet was
667  * queued to it.
668  */
669 static void colo_compare_connection(void *opaque, void *user_data)
670 {
671     CompareState *s = user_data;
672     Connection *conn = opaque;
673 
674     switch (conn->ip_proto) {
675     case IPPROTO_TCP:
676         colo_compare_tcp(s, conn);
677         break;
678     case IPPROTO_UDP:
679         colo_compare_packet(s, conn, colo_packet_compare_udp);
680         break;
681     case IPPROTO_ICMP:
682         colo_compare_packet(s, conn, colo_packet_compare_icmp);
683         break;
684     default:
685         colo_compare_packet(s, conn, colo_packet_compare_other);
686         break;
687     }
688 }
689 
690 static int compare_chr_send(CompareState *s,
691                             const uint8_t *buf,
692                             uint32_t size,
693                             uint32_t vnet_hdr_len,
694                             bool notify_remote_frame)
695 {
696     int ret = 0;
697     uint32_t len = htonl(size);
698 
699     if (!size) {
700         return 0;
701     }
702 
703     if (notify_remote_frame) {
704         ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
705                                     (uint8_t *)&len,
706                                     sizeof(len));
707     } else {
708         ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
709     }
710 
711     if (ret != sizeof(len)) {
712         goto err;
713     }
714 
715     if (s->vnet_hdr) {
716         /*
717          * We send vnet header len make other module(like filter-redirector)
718          * know how to parse net packet correctly.
719          */
720         len = htonl(vnet_hdr_len);
721 
722         if (!notify_remote_frame) {
723             ret = qemu_chr_fe_write_all(&s->chr_out,
724                                         (uint8_t *)&len,
725                                         sizeof(len));
726         }
727 
728         if (ret != sizeof(len)) {
729             goto err;
730         }
731     }
732 
733     if (notify_remote_frame) {
734         ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
735                                     (uint8_t *)buf,
736                                     size);
737     } else {
738         ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
739     }
740 
741     if (ret != size) {
742         goto err;
743     }
744 
745     return 0;
746 
747 err:
748     return ret < 0 ? ret : -EIO;
749 }
750 
751 static int compare_chr_can_read(void *opaque)
752 {
753     return COMPARE_READ_LEN_MAX;
754 }
755 
756 /*
757  * Called from the main thread on the primary for packets
758  * arriving over the socket from the primary.
759  */
760 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
761 {
762     CompareState *s = COLO_COMPARE(opaque);
763     int ret;
764 
765     ret = net_fill_rstate(&s->pri_rs, buf, size);
766     if (ret == -1) {
767         qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
768                                  NULL, NULL, true);
769         error_report("colo-compare primary_in error");
770     }
771 }
772 
773 /*
774  * Called from the main thread on the primary for packets
775  * arriving over the socket from the secondary.
776  */
777 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
778 {
779     CompareState *s = COLO_COMPARE(opaque);
780     int ret;
781 
782     ret = net_fill_rstate(&s->sec_rs, buf, size);
783     if (ret == -1) {
784         qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
785                                  NULL, NULL, true);
786         error_report("colo-compare secondary_in error");
787     }
788 }
789 
790 static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
791 {
792     CompareState *s = COLO_COMPARE(opaque);
793     int ret;
794 
795     ret = net_fill_rstate(&s->notify_rs, buf, size);
796     if (ret == -1) {
797         qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
798                                  NULL, NULL, true);
799         error_report("colo-compare notify_dev error");
800     }
801 }
802 
803 /*
804  * Check old packet regularly so it can watch for any packets
805  * that the secondary hasn't produced equivalents of.
806  */
807 static void check_old_packet_regular(void *opaque)
808 {
809     CompareState *s = opaque;
810 
811     /* if have old packet we will notify checkpoint */
812     colo_old_packet_check(s);
813     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
814                 REGULAR_PACKET_CHECK_MS);
815 }
816 
817 /* Public API, Used for COLO frame to notify compare event */
818 void colo_notify_compares_event(void *opaque, int event, Error **errp)
819 {
820     CompareState *s;
821 
822     qemu_mutex_lock(&event_mtx);
823     QTAILQ_FOREACH(s, &net_compares, next) {
824         s->event = event;
825         qemu_bh_schedule(s->event_bh);
826         event_unhandled_count++;
827     }
828     /* Wait all compare threads to finish handling this event */
829     while (event_unhandled_count > 0) {
830         qemu_cond_wait(&event_complete_cond, &event_mtx);
831     }
832 
833     qemu_mutex_unlock(&event_mtx);
834 }
835 
836 static void colo_compare_timer_init(CompareState *s)
837 {
838     AioContext *ctx = iothread_get_aio_context(s->iothread);
839 
840     s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
841                                 SCALE_MS, check_old_packet_regular,
842                                 s);
843     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
844                     REGULAR_PACKET_CHECK_MS);
845 }
846 
847 static void colo_compare_timer_del(CompareState *s)
848 {
849     if (s->packet_check_timer) {
850         timer_del(s->packet_check_timer);
851         timer_free(s->packet_check_timer);
852         s->packet_check_timer = NULL;
853     }
854  }
855 
856 static void colo_flush_packets(void *opaque, void *user_data);
857 
858 static void colo_compare_handle_event(void *opaque)
859 {
860     CompareState *s = opaque;
861 
862     switch (s->event) {
863     case COLO_EVENT_CHECKPOINT:
864         g_queue_foreach(&s->conn_list, colo_flush_packets, s);
865         break;
866     case COLO_EVENT_FAILOVER:
867         break;
868     default:
869         break;
870     }
871 
872     qemu_mutex_lock(&event_mtx);
873     assert(event_unhandled_count > 0);
874     event_unhandled_count--;
875     qemu_cond_broadcast(&event_complete_cond);
876     qemu_mutex_unlock(&event_mtx);
877 }
878 
879 static void colo_compare_iothread(CompareState *s)
880 {
881     object_ref(OBJECT(s->iothread));
882     s->worker_context = iothread_get_g_main_context(s->iothread);
883 
884     qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
885                              compare_pri_chr_in, NULL, NULL,
886                              s, s->worker_context, true);
887     qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
888                              compare_sec_chr_in, NULL, NULL,
889                              s, s->worker_context, true);
890     if (s->notify_dev) {
891         qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
892                                  compare_notify_chr, NULL, NULL,
893                                  s, s->worker_context, true);
894     }
895 
896     colo_compare_timer_init(s);
897     s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
898 }
899 
900 static char *compare_get_pri_indev(Object *obj, Error **errp)
901 {
902     CompareState *s = COLO_COMPARE(obj);
903 
904     return g_strdup(s->pri_indev);
905 }
906 
907 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
908 {
909     CompareState *s = COLO_COMPARE(obj);
910 
911     g_free(s->pri_indev);
912     s->pri_indev = g_strdup(value);
913 }
914 
915 static char *compare_get_sec_indev(Object *obj, Error **errp)
916 {
917     CompareState *s = COLO_COMPARE(obj);
918 
919     return g_strdup(s->sec_indev);
920 }
921 
922 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
923 {
924     CompareState *s = COLO_COMPARE(obj);
925 
926     g_free(s->sec_indev);
927     s->sec_indev = g_strdup(value);
928 }
929 
930 static char *compare_get_outdev(Object *obj, Error **errp)
931 {
932     CompareState *s = COLO_COMPARE(obj);
933 
934     return g_strdup(s->outdev);
935 }
936 
937 static void compare_set_outdev(Object *obj, const char *value, Error **errp)
938 {
939     CompareState *s = COLO_COMPARE(obj);
940 
941     g_free(s->outdev);
942     s->outdev = g_strdup(value);
943 }
944 
945 static bool compare_get_vnet_hdr(Object *obj, Error **errp)
946 {
947     CompareState *s = COLO_COMPARE(obj);
948 
949     return s->vnet_hdr;
950 }
951 
952 static void compare_set_vnet_hdr(Object *obj,
953                                  bool value,
954                                  Error **errp)
955 {
956     CompareState *s = COLO_COMPARE(obj);
957 
958     s->vnet_hdr = value;
959 }
960 
961 static char *compare_get_notify_dev(Object *obj, Error **errp)
962 {
963     CompareState *s = COLO_COMPARE(obj);
964 
965     return g_strdup(s->notify_dev);
966 }
967 
968 static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
969 {
970     CompareState *s = COLO_COMPARE(obj);
971 
972     g_free(s->notify_dev);
973     s->notify_dev = g_strdup(value);
974 }
975 
976 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
977 {
978     CompareState *s = container_of(pri_rs, CompareState, pri_rs);
979     Connection *conn = NULL;
980 
981     if (packet_enqueue(s, PRIMARY_IN, &conn)) {
982         trace_colo_compare_main("primary: unsupported packet in");
983         compare_chr_send(s,
984                          pri_rs->buf,
985                          pri_rs->packet_len,
986                          pri_rs->vnet_hdr_len,
987                          false);
988     } else {
989         /* compare packet in the specified connection */
990         colo_compare_connection(conn, s);
991     }
992 }
993 
994 static void compare_sec_rs_finalize(SocketReadState *sec_rs)
995 {
996     CompareState *s = container_of(sec_rs, CompareState, sec_rs);
997     Connection *conn = NULL;
998 
999     if (packet_enqueue(s, SECONDARY_IN, &conn)) {
1000         trace_colo_compare_main("secondary: unsupported packet in");
1001     } else {
1002         /* compare packet in the specified connection */
1003         colo_compare_connection(conn, s);
1004     }
1005 }
1006 
1007 static void compare_notify_rs_finalize(SocketReadState *notify_rs)
1008 {
1009     CompareState *s = container_of(notify_rs, CompareState, notify_rs);
1010 
1011     /* Get Xen colo-frame's notify and handle the message */
1012     char *data = g_memdup(notify_rs->buf, notify_rs->packet_len);
1013     char msg[] = "COLO_COMPARE_GET_XEN_INIT";
1014     int ret;
1015 
1016     if (!strcmp(data, "COLO_USERSPACE_PROXY_INIT")) {
1017         ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true);
1018         if (ret < 0) {
1019             error_report("Notify Xen COLO-frame INIT failed");
1020         }
1021     }
1022 
1023     if (!strcmp(data, "COLO_CHECKPOINT")) {
1024         /* colo-compare do checkpoint, flush pri packet and remove sec packet */
1025         g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1026     }
1027 }
1028 
1029 /*
1030  * Return 0 is success.
1031  * Return 1 is failed.
1032  */
1033 static int find_and_check_chardev(Chardev **chr,
1034                                   char *chr_name,
1035                                   Error **errp)
1036 {
1037     *chr = qemu_chr_find(chr_name);
1038     if (*chr == NULL) {
1039         error_setg(errp, "Device '%s' not found",
1040                    chr_name);
1041         return 1;
1042     }
1043 
1044     if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
1045         error_setg(errp, "chardev \"%s\" is not reconnectable",
1046                    chr_name);
1047         return 1;
1048     }
1049 
1050     if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
1051         error_setg(errp, "chardev \"%s\" cannot switch context",
1052                    chr_name);
1053         return 1;
1054     }
1055 
1056     return 0;
1057 }
1058 
1059 /*
1060  * Called from the main thread on the primary
1061  * to setup colo-compare.
1062  */
1063 static void colo_compare_complete(UserCreatable *uc, Error **errp)
1064 {
1065     CompareState *s = COLO_COMPARE(uc);
1066     Chardev *chr;
1067 
1068     if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
1069         error_setg(errp, "colo compare needs 'primary_in' ,"
1070                    "'secondary_in','outdev','iothread' property set");
1071         return;
1072     } else if (!strcmp(s->pri_indev, s->outdev) ||
1073                !strcmp(s->sec_indev, s->outdev) ||
1074                !strcmp(s->pri_indev, s->sec_indev)) {
1075         error_setg(errp, "'indev' and 'outdev' could not be same "
1076                    "for compare module");
1077         return;
1078     }
1079 
1080     if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
1081         !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
1082         return;
1083     }
1084 
1085     if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
1086         !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
1087         return;
1088     }
1089 
1090     if (find_and_check_chardev(&chr, s->outdev, errp) ||
1091         !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
1092         return;
1093     }
1094 
1095     net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
1096     net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
1097 
1098     /* Try to enable remote notify chardev, currently just for Xen COLO */
1099     if (s->notify_dev) {
1100         if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
1101             !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
1102             return;
1103         }
1104 
1105         net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
1106                            s->vnet_hdr);
1107     }
1108 
1109     QTAILQ_INSERT_TAIL(&net_compares, s, next);
1110 
1111     g_queue_init(&s->conn_list);
1112 
1113     qemu_mutex_init(&event_mtx);
1114     qemu_cond_init(&event_complete_cond);
1115 
1116     s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1117                                                       connection_key_equal,
1118                                                       g_free,
1119                                                       connection_destroy);
1120 
1121     colo_compare_iothread(s);
1122     return;
1123 }
1124 
1125 static void colo_flush_packets(void *opaque, void *user_data)
1126 {
1127     CompareState *s = user_data;
1128     Connection *conn = opaque;
1129     Packet *pkt = NULL;
1130 
1131     while (!g_queue_is_empty(&conn->primary_list)) {
1132         pkt = g_queue_pop_head(&conn->primary_list);
1133         compare_chr_send(s,
1134                          pkt->data,
1135                          pkt->size,
1136                          pkt->vnet_hdr_len,
1137                          false);
1138         packet_destroy(pkt, NULL);
1139     }
1140     while (!g_queue_is_empty(&conn->secondary_list)) {
1141         pkt = g_queue_pop_head(&conn->secondary_list);
1142         packet_destroy(pkt, NULL);
1143     }
1144 }
1145 
1146 static void colo_compare_class_init(ObjectClass *oc, void *data)
1147 {
1148     UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1149 
1150     ucc->complete = colo_compare_complete;
1151 }
1152 
1153 static void colo_compare_init(Object *obj)
1154 {
1155     CompareState *s = COLO_COMPARE(obj);
1156 
1157     object_property_add_str(obj, "primary_in",
1158                             compare_get_pri_indev, compare_set_pri_indev,
1159                             NULL);
1160     object_property_add_str(obj, "secondary_in",
1161                             compare_get_sec_indev, compare_set_sec_indev,
1162                             NULL);
1163     object_property_add_str(obj, "outdev",
1164                             compare_get_outdev, compare_set_outdev,
1165                             NULL);
1166     object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1167                             (Object **)&s->iothread,
1168                             object_property_allow_set_link,
1169                             OBJ_PROP_LINK_STRONG, NULL);
1170     /* This parameter just for Xen COLO */
1171     object_property_add_str(obj, "notify_dev",
1172                             compare_get_notify_dev, compare_set_notify_dev,
1173                             NULL);
1174 
1175     s->vnet_hdr = false;
1176     object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
1177                              compare_set_vnet_hdr, NULL);
1178 }
1179 
1180 static void colo_compare_finalize(Object *obj)
1181 {
1182     CompareState *s = COLO_COMPARE(obj);
1183     CompareState *tmp = NULL;
1184 
1185     qemu_chr_fe_deinit(&s->chr_pri_in, false);
1186     qemu_chr_fe_deinit(&s->chr_sec_in, false);
1187     qemu_chr_fe_deinit(&s->chr_out, false);
1188     if (s->notify_dev) {
1189         qemu_chr_fe_deinit(&s->chr_notify_dev, false);
1190     }
1191 
1192     if (s->iothread) {
1193         colo_compare_timer_del(s);
1194     }
1195 
1196     qemu_bh_delete(s->event_bh);
1197 
1198     QTAILQ_FOREACH(tmp, &net_compares, next) {
1199         if (tmp == s) {
1200             QTAILQ_REMOVE(&net_compares, s, next);
1201             break;
1202         }
1203     }
1204 
1205     /* Release all unhandled packets after compare thead exited */
1206     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1207 
1208     g_queue_clear(&s->conn_list);
1209 
1210     if (s->connection_track_table) {
1211         g_hash_table_destroy(s->connection_track_table);
1212     }
1213 
1214     if (s->iothread) {
1215         object_unref(OBJECT(s->iothread));
1216     }
1217 
1218     qemu_mutex_destroy(&event_mtx);
1219     qemu_cond_destroy(&event_complete_cond);
1220 
1221     g_free(s->pri_indev);
1222     g_free(s->sec_indev);
1223     g_free(s->outdev);
1224     g_free(s->notify_dev);
1225 }
1226 
1227 static const TypeInfo colo_compare_info = {
1228     .name = TYPE_COLO_COMPARE,
1229     .parent = TYPE_OBJECT,
1230     .instance_size = sizeof(CompareState),
1231     .instance_init = colo_compare_init,
1232     .instance_finalize = colo_compare_finalize,
1233     .class_size = sizeof(CompareClass),
1234     .class_init = colo_compare_class_init,
1235     .interfaces = (InterfaceInfo[]) {
1236         { TYPE_USER_CREATABLE },
1237         { }
1238     }
1239 };
1240 
1241 static void register_types(void)
1242 {
1243     type_register_static(&colo_compare_info);
1244 }
1245 
1246 type_init(register_types);
1247