xref: /openbmc/qemu/net/colo-compare.c (revision 8f0a3716)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or
12  * later.  See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 #include "qemu/error-report.h"
17 #include "trace.h"
18 #include "qemu-common.h"
19 #include "qapi/qmp/qerror.h"
20 #include "qapi/error.h"
21 #include "net/net.h"
22 #include "net/eth.h"
23 #include "qom/object_interfaces.h"
24 #include "qemu/iov.h"
25 #include "qom/object.h"
26 #include "net/queue.h"
27 #include "chardev/char-fe.h"
28 #include "qemu/sockets.h"
29 #include "qapi-visit.h"
30 #include "net/colo.h"
31 #include "sysemu/iothread.h"
32 
33 #define TYPE_COLO_COMPARE "colo-compare"
34 #define COLO_COMPARE(obj) \
35     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
36 
37 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
38 #define MAX_QUEUE_SIZE 1024
39 
40 #define COLO_COMPARE_FREE_PRIMARY     0x01
41 #define COLO_COMPARE_FREE_SECONDARY   0x02
42 
43 /* TODO: Should be configurable */
44 #define REGULAR_PACKET_CHECK_MS 3000
45 
46 /*
47  *  + CompareState ++
48  *  |               |
49  *  +---------------+   +---------------+         +---------------+
50  *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
51  *  +---------------+   +---------------+         +---------------+
52  *  |               |     |           |             |          |
53  *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
54  *                    |primary |  |secondary    |primary | |secondary
55  *                    |packet  |  |packet  +    |packet  | |packet  +
56  *                    +--------+  +--------+    +--------+ +--------+
57  *                        |           |             |          |
58  *                    +---v----+  +---v----+    +---v----+ +---v----+
59  *                    |primary |  |secondary    |primary | |secondary
60  *                    |packet  |  |packet  +    |packet  | |packet  +
61  *                    +--------+  +--------+    +--------+ +--------+
62  *                        |           |             |          |
63  *                    +---v----+  +---v----+    +---v----+ +---v----+
64  *                    |primary |  |secondary    |primary | |secondary
65  *                    |packet  |  |packet  +    |packet  | |packet  +
66  *                    +--------+  +--------+    +--------+ +--------+
67  */
68 typedef struct CompareState {
69     Object parent;
70 
71     char *pri_indev;
72     char *sec_indev;
73     char *outdev;
74     CharBackend chr_pri_in;
75     CharBackend chr_sec_in;
76     CharBackend chr_out;
77     SocketReadState pri_rs;
78     SocketReadState sec_rs;
79     bool vnet_hdr;
80 
81     /*
82      * Record the connection that through the NIC
83      * Element type: Connection
84      */
85     GQueue conn_list;
86     /* Record the connection without repetition */
87     GHashTable *connection_track_table;
88 
89     IOThread *iothread;
90     GMainContext *worker_context;
91     QEMUTimer *packet_check_timer;
92 } CompareState;
93 
94 typedef struct CompareClass {
95     ObjectClass parent_class;
96 } CompareClass;
97 
98 enum {
99     PRIMARY_IN = 0,
100     SECONDARY_IN,
101 };
102 
103 static int compare_chr_send(CompareState *s,
104                             const uint8_t *buf,
105                             uint32_t size,
106                             uint32_t vnet_hdr_len);
107 
108 static gint seq_sorter(Packet *a, Packet *b, gpointer data)
109 {
110     struct tcphdr *atcp, *btcp;
111 
112     atcp = (struct tcphdr *)(a->transport_header);
113     btcp = (struct tcphdr *)(b->transport_header);
114     return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
115 }
116 
117 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
118 {
119     Packet *pkt = data;
120     struct tcphdr *tcphd;
121 
122     tcphd = (struct tcphdr *)pkt->transport_header;
123 
124     pkt->tcp_seq = ntohl(tcphd->th_seq);
125     pkt->tcp_ack = ntohl(tcphd->th_ack);
126     *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
127     pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
128                        + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
129     pkt->payload_size = pkt->size - pkt->header_size;
130     pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
131     pkt->flags = tcphd->th_flags;
132 }
133 
134 /*
135  * Return 1 on success, if return 0 means the
136  * packet will be dropped
137  */
138 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
139 {
140     if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
141         if (pkt->ip->ip_p == IPPROTO_TCP) {
142             fill_pkt_tcp_info(pkt, max_ack);
143             g_queue_insert_sorted(queue,
144                                   pkt,
145                                   (GCompareDataFunc)seq_sorter,
146                                   NULL);
147         } else {
148             g_queue_push_tail(queue, pkt);
149         }
150         return 1;
151     }
152     return 0;
153 }
154 
155 /*
156  * Return 0 on success, if return -1 means the pkt
157  * is unsupported(arp and ipv6) and will be sent later
158  */
159 static int packet_enqueue(CompareState *s, int mode, Connection **con)
160 {
161     ConnectionKey key;
162     Packet *pkt = NULL;
163     Connection *conn;
164 
165     if (mode == PRIMARY_IN) {
166         pkt = packet_new(s->pri_rs.buf,
167                          s->pri_rs.packet_len,
168                          s->pri_rs.vnet_hdr_len);
169     } else {
170         pkt = packet_new(s->sec_rs.buf,
171                          s->sec_rs.packet_len,
172                          s->sec_rs.vnet_hdr_len);
173     }
174 
175     if (parse_packet_early(pkt)) {
176         packet_destroy(pkt, NULL);
177         pkt = NULL;
178         return -1;
179     }
180     fill_connection_key(pkt, &key);
181 
182     conn = connection_get(s->connection_track_table,
183                           &key,
184                           &s->conn_list);
185 
186     if (!conn->processing) {
187         g_queue_push_tail(&s->conn_list, conn);
188         conn->processing = true;
189     }
190 
191     if (mode == PRIMARY_IN) {
192         if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
193             error_report("colo compare primary queue size too big,"
194                          "drop packet");
195         }
196     } else {
197         if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
198             error_report("colo compare secondary queue size too big,"
199                          "drop packet");
200         }
201     }
202     *con = conn;
203 
204     return 0;
205 }
206 
207 static inline bool after(uint32_t seq1, uint32_t seq2)
208 {
209         return (int32_t)(seq1 - seq2) > 0;
210 }
211 
212 static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
213 {
214     int ret;
215     ret = compare_chr_send(s,
216                            pkt->data,
217                            pkt->size,
218                            pkt->vnet_hdr_len);
219     if (ret < 0) {
220         error_report("colo send primary packet failed");
221     }
222     trace_colo_compare_main("packet same and release packet");
223     packet_destroy(pkt, NULL);
224 }
225 
226 /*
227  * The IP packets sent by primary and secondary
228  * will be compared in here
229  * TODO support ip fragment, Out-Of-Order
230  * return:    0  means packet same
231  *            > 0 || < 0 means packet different
232  */
233 static int colo_compare_packet_payload(Packet *ppkt,
234                                        Packet *spkt,
235                                        uint16_t poffset,
236                                        uint16_t soffset,
237                                        uint16_t len)
238 
239 {
240     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
241         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
242 
243         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
244         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
245         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
246         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
247 
248         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
249                                    pri_ip_dst, spkt->size,
250                                    sec_ip_src, sec_ip_dst);
251     }
252 
253     return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
254 }
255 
256 /*
257  * return true means that the payload is consist and
258  * need to make the next comparison, false means do
259  * the checkpoint
260 */
261 static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
262                               int8_t *mark, uint32_t max_ack)
263 {
264     *mark = 0;
265 
266     if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
267         if (colo_compare_packet_payload(ppkt, spkt,
268                                         ppkt->header_size, spkt->header_size,
269                                         ppkt->payload_size)) {
270             *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
271             return true;
272         }
273     }
274     if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
275         if (colo_compare_packet_payload(ppkt, spkt,
276                                         ppkt->header_size, spkt->header_size,
277                                         ppkt->payload_size)) {
278             *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
279             return true;
280         }
281     }
282 
283     /* one part of secondary packet payload still need to be compared */
284     if (!after(ppkt->seq_end, spkt->seq_end)) {
285         if (colo_compare_packet_payload(ppkt, spkt,
286                                         ppkt->header_size + ppkt->offset,
287                                         spkt->header_size + spkt->offset,
288                                         ppkt->payload_size - ppkt->offset)) {
289             if (!after(ppkt->tcp_ack, max_ack)) {
290                 *mark = COLO_COMPARE_FREE_PRIMARY;
291                 spkt->offset += ppkt->payload_size - ppkt->offset;
292                 return true;
293             } else {
294                 /* secondary guest hasn't ack the data, don't send
295                  * out this packet
296                  */
297                 return false;
298             }
299         }
300     } else {
301         /* primary packet is longer than secondary packet, compare
302          * the same part and mark the primary packet offset
303          */
304         if (colo_compare_packet_payload(ppkt, spkt,
305                                         ppkt->header_size + ppkt->offset,
306                                         spkt->header_size + spkt->offset,
307                                         spkt->payload_size - spkt->offset)) {
308             *mark = COLO_COMPARE_FREE_SECONDARY;
309             ppkt->offset += spkt->payload_size - spkt->offset;
310             return true;
311         }
312     }
313 
314     return false;
315 }
316 
317 static void colo_compare_tcp(CompareState *s, Connection *conn)
318 {
319     Packet *ppkt = NULL, *spkt = NULL;
320     int8_t mark;
321 
322     /*
323      * If ppkt and spkt have the same payload, but ppkt's ACK
324      * is greater than spkt's ACK, in this case we can not
325      * send the ppkt because it will cause the secondary guest
326      * to miss sending some data in the next. Therefore, we
327      * record the maximum ACK in the current queue at both
328      * primary side and secondary side. Only when the ack is
329      * less than the smaller of the two maximum ack, then we
330      * can ensure that the packet's payload is acknowledged by
331      * primary and secondary.
332     */
333     uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
334 
335 pri:
336     if (g_queue_is_empty(&conn->primary_list)) {
337         return;
338     }
339     ppkt = g_queue_pop_head(&conn->primary_list);
340 sec:
341     if (g_queue_is_empty(&conn->secondary_list)) {
342         g_queue_push_head(&conn->primary_list, ppkt);
343         return;
344     }
345     spkt = g_queue_pop_head(&conn->secondary_list);
346 
347     if (ppkt->tcp_seq == ppkt->seq_end) {
348         colo_release_primary_pkt(s, ppkt);
349         ppkt = NULL;
350     }
351 
352     if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
353         trace_colo_compare_main("pri: this packet has compared");
354         colo_release_primary_pkt(s, ppkt);
355         ppkt = NULL;
356     }
357 
358     if (spkt->tcp_seq == spkt->seq_end) {
359         packet_destroy(spkt, NULL);
360         if (!ppkt) {
361             goto pri;
362         } else {
363             goto sec;
364         }
365     } else {
366         if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
367             trace_colo_compare_main("sec: this packet has compared");
368             packet_destroy(spkt, NULL);
369             if (!ppkt) {
370                 goto pri;
371             } else {
372                 goto sec;
373             }
374         }
375         if (!ppkt) {
376             g_queue_push_head(&conn->secondary_list, spkt);
377             goto pri;
378         }
379     }
380 
381     if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
382         trace_colo_compare_tcp_info("pri",
383                                     ppkt->tcp_seq, ppkt->tcp_ack,
384                                     ppkt->header_size, ppkt->payload_size,
385                                     ppkt->offset, ppkt->flags);
386 
387         trace_colo_compare_tcp_info("sec",
388                                     spkt->tcp_seq, spkt->tcp_ack,
389                                     spkt->header_size, spkt->payload_size,
390                                     spkt->offset, spkt->flags);
391 
392         if (mark == COLO_COMPARE_FREE_PRIMARY) {
393             conn->compare_seq = ppkt->seq_end;
394             colo_release_primary_pkt(s, ppkt);
395             g_queue_push_head(&conn->secondary_list, spkt);
396             goto pri;
397         }
398         if (mark == COLO_COMPARE_FREE_SECONDARY) {
399             conn->compare_seq = spkt->seq_end;
400             packet_destroy(spkt, NULL);
401             goto sec;
402         }
403         if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
404             conn->compare_seq = ppkt->seq_end;
405             colo_release_primary_pkt(s, ppkt);
406             packet_destroy(spkt, NULL);
407             goto pri;
408         }
409     } else {
410         g_queue_push_head(&conn->primary_list, ppkt);
411         g_queue_push_head(&conn->secondary_list, spkt);
412 
413         qemu_hexdump((char *)ppkt->data, stderr,
414                      "colo-compare ppkt", ppkt->size);
415         qemu_hexdump((char *)spkt->data, stderr,
416                      "colo-compare spkt", spkt->size);
417 
418         /*
419          * colo_compare_inconsistent_notify();
420          * TODO: notice to checkpoint();
421          */
422     }
423 }
424 
425 
426 /*
427  * Called from the compare thread on the primary
428  * for compare udp packet
429  */
430 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
431 {
432     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
433     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
434 
435     trace_colo_compare_main("compare udp");
436 
437     /*
438      * Because of ppkt and spkt are both in the same connection,
439      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
440      * same with spkt. In addition, IP header's Identification is a random
441      * field, we can handle it in IP fragmentation function later.
442      * COLO just concern the response net packet payload from primary guest
443      * and secondary guest are same or not, So we ignored all IP header include
444      * other field like TOS,TTL,IP Checksum. we only need to compare
445      * the ip payload here.
446      */
447     if (ppkt->size != spkt->size) {
448         trace_colo_compare_main("UDP: payload size of packets are different");
449         return -1;
450     }
451     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
452                                     ppkt->size - offset)) {
453         trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
454         trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
455         if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
456             qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
457                          ppkt->size);
458             qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
459                          spkt->size);
460         }
461         return -1;
462     } else {
463         return 0;
464     }
465 }
466 
467 /*
468  * Called from the compare thread on the primary
469  * for compare icmp packet
470  */
471 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
472 {
473     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
474     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
475 
476     trace_colo_compare_main("compare icmp");
477 
478     /*
479      * Because of ppkt and spkt are both in the same connection,
480      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
481      * same with spkt. In addition, IP header's Identification is a random
482      * field, we can handle it in IP fragmentation function later.
483      * COLO just concern the response net packet payload from primary guest
484      * and secondary guest are same or not, So we ignored all IP header include
485      * other field like TOS,TTL,IP Checksum. we only need to compare
486      * the ip payload here.
487      */
488     if (ppkt->size != spkt->size) {
489         trace_colo_compare_main("ICMP: payload size of packets are different");
490         return -1;
491     }
492     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
493                                     ppkt->size - offset)) {
494         trace_colo_compare_icmp_miscompare("primary pkt size",
495                                            ppkt->size);
496         trace_colo_compare_icmp_miscompare("Secondary pkt size",
497                                            spkt->size);
498         if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
499             qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
500                          ppkt->size);
501             qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
502                          spkt->size);
503         }
504         return -1;
505     } else {
506         return 0;
507     }
508 }
509 
510 /*
511  * Called from the compare thread on the primary
512  * for compare other packet
513  */
514 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
515 {
516     uint16_t offset = ppkt->vnet_hdr_len;
517 
518     trace_colo_compare_main("compare other");
519     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
520         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
521 
522         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
523         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
524         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
525         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
526 
527         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
528                                    pri_ip_dst, spkt->size,
529                                    sec_ip_src, sec_ip_dst);
530     }
531 
532     if (ppkt->size != spkt->size) {
533         trace_colo_compare_main("Other: payload size of packets are different");
534         return -1;
535     }
536     return colo_compare_packet_payload(ppkt, spkt, offset, offset,
537                                        ppkt->size - offset);
538 }
539 
540 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
541 {
542     int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
543 
544     if ((now - pkt->creation_ms) > (*check_time)) {
545         trace_colo_old_packet_check_found(pkt->creation_ms);
546         return 0;
547     } else {
548         return 1;
549     }
550 }
551 
552 static int colo_old_packet_check_one_conn(Connection *conn,
553                                           void *user_data)
554 {
555     GList *result = NULL;
556     int64_t check_time = REGULAR_PACKET_CHECK_MS;
557 
558     result = g_queue_find_custom(&conn->primary_list,
559                                  &check_time,
560                                  (GCompareFunc)colo_old_packet_check_one);
561 
562     if (result) {
563         /* Do checkpoint will flush old packet */
564         /*
565          * TODO: Notify colo frame to do checkpoint.
566          * colo_compare_inconsistent_notify();
567          */
568         return 0;
569     }
570 
571     return 1;
572 }
573 
574 /*
575  * Look for old packets that the secondary hasn't matched,
576  * if we have some then we have to checkpoint to wake
577  * the secondary up.
578  */
579 static void colo_old_packet_check(void *opaque)
580 {
581     CompareState *s = opaque;
582 
583     /*
584      * If we find one old packet, stop finding job and notify
585      * COLO frame do checkpoint.
586      */
587     g_queue_find_custom(&s->conn_list, NULL,
588                         (GCompareFunc)colo_old_packet_check_one_conn);
589 }
590 
591 static void colo_compare_packet(CompareState *s, Connection *conn,
592                                 int (*HandlePacket)(Packet *spkt,
593                                 Packet *ppkt))
594 {
595     Packet *pkt = NULL;
596     GList *result = NULL;
597 
598     while (!g_queue_is_empty(&conn->primary_list) &&
599            !g_queue_is_empty(&conn->secondary_list)) {
600         pkt = g_queue_pop_head(&conn->primary_list);
601         result = g_queue_find_custom(&conn->secondary_list,
602                  pkt, (GCompareFunc)HandlePacket);
603 
604         if (result) {
605             colo_release_primary_pkt(s, pkt);
606             g_queue_remove(&conn->secondary_list, result->data);
607         } else {
608             /*
609              * If one packet arrive late, the secondary_list or
610              * primary_list will be empty, so we can't compare it
611              * until next comparison.
612              */
613             trace_colo_compare_main("packet different");
614             g_queue_push_head(&conn->primary_list, pkt);
615             /* TODO: colo_notify_checkpoint();*/
616             break;
617         }
618     }
619 }
620 
621 /*
622  * Called from the compare thread on the primary
623  * for compare packet with secondary list of the
624  * specified connection when a new packet was
625  * queued to it.
626  */
627 static void colo_compare_connection(void *opaque, void *user_data)
628 {
629     CompareState *s = user_data;
630     Connection *conn = opaque;
631 
632     switch (conn->ip_proto) {
633     case IPPROTO_TCP:
634         colo_compare_tcp(s, conn);
635         break;
636     case IPPROTO_UDP:
637         colo_compare_packet(s, conn, colo_packet_compare_udp);
638         break;
639     case IPPROTO_ICMP:
640         colo_compare_packet(s, conn, colo_packet_compare_icmp);
641         break;
642     default:
643         colo_compare_packet(s, conn, colo_packet_compare_other);
644         break;
645     }
646 }
647 
648 static int compare_chr_send(CompareState *s,
649                             const uint8_t *buf,
650                             uint32_t size,
651                             uint32_t vnet_hdr_len)
652 {
653     int ret = 0;
654     uint32_t len = htonl(size);
655 
656     if (!size) {
657         return 0;
658     }
659 
660     ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
661     if (ret != sizeof(len)) {
662         goto err;
663     }
664 
665     if (s->vnet_hdr) {
666         /*
667          * We send vnet header len make other module(like filter-redirector)
668          * know how to parse net packet correctly.
669          */
670         len = htonl(vnet_hdr_len);
671         ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
672         if (ret != sizeof(len)) {
673             goto err;
674         }
675     }
676 
677     ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
678     if (ret != size) {
679         goto err;
680     }
681 
682     return 0;
683 
684 err:
685     return ret < 0 ? ret : -EIO;
686 }
687 
688 static int compare_chr_can_read(void *opaque)
689 {
690     return COMPARE_READ_LEN_MAX;
691 }
692 
693 /*
694  * Called from the main thread on the primary for packets
695  * arriving over the socket from the primary.
696  */
697 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
698 {
699     CompareState *s = COLO_COMPARE(opaque);
700     int ret;
701 
702     ret = net_fill_rstate(&s->pri_rs, buf, size);
703     if (ret == -1) {
704         qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
705                                  NULL, NULL, true);
706         error_report("colo-compare primary_in error");
707     }
708 }
709 
710 /*
711  * Called from the main thread on the primary for packets
712  * arriving over the socket from the secondary.
713  */
714 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
715 {
716     CompareState *s = COLO_COMPARE(opaque);
717     int ret;
718 
719     ret = net_fill_rstate(&s->sec_rs, buf, size);
720     if (ret == -1) {
721         qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
722                                  NULL, NULL, true);
723         error_report("colo-compare secondary_in error");
724     }
725 }
726 
727 /*
728  * Check old packet regularly so it can watch for any packets
729  * that the secondary hasn't produced equivalents of.
730  */
731 static void check_old_packet_regular(void *opaque)
732 {
733     CompareState *s = opaque;
734 
735     /* if have old packet we will notify checkpoint */
736     colo_old_packet_check(s);
737     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
738                 REGULAR_PACKET_CHECK_MS);
739 }
740 
741 static void colo_compare_timer_init(CompareState *s)
742 {
743     AioContext *ctx = iothread_get_aio_context(s->iothread);
744 
745     s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
746                                 SCALE_MS, check_old_packet_regular,
747                                 s);
748     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
749                     REGULAR_PACKET_CHECK_MS);
750 }
751 
752 static void colo_compare_timer_del(CompareState *s)
753 {
754     if (s->packet_check_timer) {
755         timer_del(s->packet_check_timer);
756         timer_free(s->packet_check_timer);
757         s->packet_check_timer = NULL;
758     }
759  }
760 
761 static void colo_compare_iothread(CompareState *s)
762 {
763     object_ref(OBJECT(s->iothread));
764     s->worker_context = iothread_get_g_main_context(s->iothread);
765 
766     qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
767                              compare_pri_chr_in, NULL, NULL,
768                              s, s->worker_context, true);
769     qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
770                              compare_sec_chr_in, NULL, NULL,
771                              s, s->worker_context, true);
772 
773     colo_compare_timer_init(s);
774 }
775 
776 static char *compare_get_pri_indev(Object *obj, Error **errp)
777 {
778     CompareState *s = COLO_COMPARE(obj);
779 
780     return g_strdup(s->pri_indev);
781 }
782 
783 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
784 {
785     CompareState *s = COLO_COMPARE(obj);
786 
787     g_free(s->pri_indev);
788     s->pri_indev = g_strdup(value);
789 }
790 
791 static char *compare_get_sec_indev(Object *obj, Error **errp)
792 {
793     CompareState *s = COLO_COMPARE(obj);
794 
795     return g_strdup(s->sec_indev);
796 }
797 
798 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
799 {
800     CompareState *s = COLO_COMPARE(obj);
801 
802     g_free(s->sec_indev);
803     s->sec_indev = g_strdup(value);
804 }
805 
806 static char *compare_get_outdev(Object *obj, Error **errp)
807 {
808     CompareState *s = COLO_COMPARE(obj);
809 
810     return g_strdup(s->outdev);
811 }
812 
813 static void compare_set_outdev(Object *obj, const char *value, Error **errp)
814 {
815     CompareState *s = COLO_COMPARE(obj);
816 
817     g_free(s->outdev);
818     s->outdev = g_strdup(value);
819 }
820 
821 static bool compare_get_vnet_hdr(Object *obj, Error **errp)
822 {
823     CompareState *s = COLO_COMPARE(obj);
824 
825     return s->vnet_hdr;
826 }
827 
828 static void compare_set_vnet_hdr(Object *obj,
829                                  bool value,
830                                  Error **errp)
831 {
832     CompareState *s = COLO_COMPARE(obj);
833 
834     s->vnet_hdr = value;
835 }
836 
837 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
838 {
839     CompareState *s = container_of(pri_rs, CompareState, pri_rs);
840     Connection *conn = NULL;
841 
842     if (packet_enqueue(s, PRIMARY_IN, &conn)) {
843         trace_colo_compare_main("primary: unsupported packet in");
844         compare_chr_send(s,
845                          pri_rs->buf,
846                          pri_rs->packet_len,
847                          pri_rs->vnet_hdr_len);
848     } else {
849         /* compare packet in the specified connection */
850         colo_compare_connection(conn, s);
851     }
852 }
853 
854 static void compare_sec_rs_finalize(SocketReadState *sec_rs)
855 {
856     CompareState *s = container_of(sec_rs, CompareState, sec_rs);
857     Connection *conn = NULL;
858 
859     if (packet_enqueue(s, SECONDARY_IN, &conn)) {
860         trace_colo_compare_main("secondary: unsupported packet in");
861     } else {
862         /* compare packet in the specified connection */
863         colo_compare_connection(conn, s);
864     }
865 }
866 
867 
868 /*
869  * Return 0 is success.
870  * Return 1 is failed.
871  */
872 static int find_and_check_chardev(Chardev **chr,
873                                   char *chr_name,
874                                   Error **errp)
875 {
876     *chr = qemu_chr_find(chr_name);
877     if (*chr == NULL) {
878         error_setg(errp, "Device '%s' not found",
879                    chr_name);
880         return 1;
881     }
882 
883     if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
884         error_setg(errp, "chardev \"%s\" is not reconnectable",
885                    chr_name);
886         return 1;
887     }
888 
889     return 0;
890 }
891 
892 /*
893  * Called from the main thread on the primary
894  * to setup colo-compare.
895  */
896 static void colo_compare_complete(UserCreatable *uc, Error **errp)
897 {
898     CompareState *s = COLO_COMPARE(uc);
899     Chardev *chr;
900 
901     if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
902         error_setg(errp, "colo compare needs 'primary_in' ,"
903                    "'secondary_in','outdev','iothread' property set");
904         return;
905     } else if (!strcmp(s->pri_indev, s->outdev) ||
906                !strcmp(s->sec_indev, s->outdev) ||
907                !strcmp(s->pri_indev, s->sec_indev)) {
908         error_setg(errp, "'indev' and 'outdev' could not be same "
909                    "for compare module");
910         return;
911     }
912 
913     if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
914         !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
915         return;
916     }
917 
918     if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
919         !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
920         return;
921     }
922 
923     if (find_and_check_chardev(&chr, s->outdev, errp) ||
924         !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
925         return;
926     }
927 
928     net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
929     net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
930 
931     g_queue_init(&s->conn_list);
932 
933     s->connection_track_table = g_hash_table_new_full(connection_key_hash,
934                                                       connection_key_equal,
935                                                       g_free,
936                                                       connection_destroy);
937 
938     colo_compare_iothread(s);
939     return;
940 }
941 
942 static void colo_flush_packets(void *opaque, void *user_data)
943 {
944     CompareState *s = user_data;
945     Connection *conn = opaque;
946     Packet *pkt = NULL;
947 
948     while (!g_queue_is_empty(&conn->primary_list)) {
949         pkt = g_queue_pop_head(&conn->primary_list);
950         compare_chr_send(s,
951                          pkt->data,
952                          pkt->size,
953                          pkt->vnet_hdr_len);
954         packet_destroy(pkt, NULL);
955     }
956     while (!g_queue_is_empty(&conn->secondary_list)) {
957         pkt = g_queue_pop_head(&conn->secondary_list);
958         packet_destroy(pkt, NULL);
959     }
960 }
961 
962 static void colo_compare_class_init(ObjectClass *oc, void *data)
963 {
964     UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
965 
966     ucc->complete = colo_compare_complete;
967 }
968 
969 static void colo_compare_init(Object *obj)
970 {
971     CompareState *s = COLO_COMPARE(obj);
972 
973     object_property_add_str(obj, "primary_in",
974                             compare_get_pri_indev, compare_set_pri_indev,
975                             NULL);
976     object_property_add_str(obj, "secondary_in",
977                             compare_get_sec_indev, compare_set_sec_indev,
978                             NULL);
979     object_property_add_str(obj, "outdev",
980                             compare_get_outdev, compare_set_outdev,
981                             NULL);
982     object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
983                             (Object **)&s->iothread,
984                             object_property_allow_set_link,
985                             OBJ_PROP_LINK_UNREF_ON_RELEASE, NULL);
986 
987     s->vnet_hdr = false;
988     object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
989                              compare_set_vnet_hdr, NULL);
990 }
991 
992 static void colo_compare_finalize(Object *obj)
993 {
994     CompareState *s = COLO_COMPARE(obj);
995 
996     qemu_chr_fe_deinit(&s->chr_pri_in, false);
997     qemu_chr_fe_deinit(&s->chr_sec_in, false);
998     qemu_chr_fe_deinit(&s->chr_out, false);
999     if (s->iothread) {
1000         colo_compare_timer_del(s);
1001     }
1002     /* Release all unhandled packets after compare thead exited */
1003     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1004 
1005     g_queue_clear(&s->conn_list);
1006 
1007     if (s->connection_track_table) {
1008         g_hash_table_destroy(s->connection_track_table);
1009     }
1010 
1011     if (s->iothread) {
1012         object_unref(OBJECT(s->iothread));
1013     }
1014     g_free(s->pri_indev);
1015     g_free(s->sec_indev);
1016     g_free(s->outdev);
1017 }
1018 
1019 static const TypeInfo colo_compare_info = {
1020     .name = TYPE_COLO_COMPARE,
1021     .parent = TYPE_OBJECT,
1022     .instance_size = sizeof(CompareState),
1023     .instance_init = colo_compare_init,
1024     .instance_finalize = colo_compare_finalize,
1025     .class_size = sizeof(CompareClass),
1026     .class_init = colo_compare_class_init,
1027     .interfaces = (InterfaceInfo[]) {
1028         { TYPE_USER_CREATABLE },
1029         { }
1030     }
1031 };
1032 
1033 static void register_types(void)
1034 {
1035     type_register_static(&colo_compare_info);
1036 }
1037 
1038 type_init(register_types);
1039