xref: /openbmc/qemu/net/colo-compare.c (revision 438c78da)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * Author: Zhang Chen <zhangchen.fnst@cn.fujitsu.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or
12  * later.  See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 #include "qemu/error-report.h"
17 #include "trace.h"
18 #include "qemu-common.h"
19 #include "qapi/error.h"
20 #include "net/net.h"
21 #include "net/eth.h"
22 #include "qom/object_interfaces.h"
23 #include "qemu/iov.h"
24 #include "qom/object.h"
25 #include "net/queue.h"
26 #include "chardev/char-fe.h"
27 #include "qemu/sockets.h"
28 #include "colo.h"
29 #include "sysemu/iothread.h"
30 #include "net/colo-compare.h"
31 #include "migration/colo.h"
32 #include "migration/migration.h"
33 
34 #define TYPE_COLO_COMPARE "colo-compare"
35 #define COLO_COMPARE(obj) \
36     OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
37 
38 static QTAILQ_HEAD(, CompareState) net_compares =
39        QTAILQ_HEAD_INITIALIZER(net_compares);
40 
41 static NotifierList colo_compare_notifiers =
42     NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
43 
44 #define COMPARE_READ_LEN_MAX NET_BUFSIZE
45 #define MAX_QUEUE_SIZE 1024
46 
47 #define COLO_COMPARE_FREE_PRIMARY     0x01
48 #define COLO_COMPARE_FREE_SECONDARY   0x02
49 
50 /* TODO: Should be configurable */
51 #define REGULAR_PACKET_CHECK_MS 3000
52 
53 static QemuMutex event_mtx;
54 static QemuCond event_complete_cond;
55 static int event_unhandled_count;
56 
57 /*
58  *  + CompareState ++
59  *  |               |
60  *  +---------------+   +---------------+         +---------------+
61  *  |   conn list   + - >      conn     + ------- >      conn     + -- > ......
62  *  +---------------+   +---------------+         +---------------+
63  *  |               |     |           |             |          |
64  *  +---------------+ +---v----+  +---v----+    +---v----+ +---v----+
65  *                    |primary |  |secondary    |primary | |secondary
66  *                    |packet  |  |packet  +    |packet  | |packet  +
67  *                    +--------+  +--------+    +--------+ +--------+
68  *                        |           |             |          |
69  *                    +---v----+  +---v----+    +---v----+ +---v----+
70  *                    |primary |  |secondary    |primary | |secondary
71  *                    |packet  |  |packet  +    |packet  | |packet  +
72  *                    +--------+  +--------+    +--------+ +--------+
73  *                        |           |             |          |
74  *                    +---v----+  +---v----+    +---v----+ +---v----+
75  *                    |primary |  |secondary    |primary | |secondary
76  *                    |packet  |  |packet  +    |packet  | |packet  +
77  *                    +--------+  +--------+    +--------+ +--------+
78  */
79 typedef struct CompareState {
80     Object parent;
81 
82     char *pri_indev;
83     char *sec_indev;
84     char *outdev;
85     CharBackend chr_pri_in;
86     CharBackend chr_sec_in;
87     CharBackend chr_out;
88     SocketReadState pri_rs;
89     SocketReadState sec_rs;
90     bool vnet_hdr;
91 
92     /*
93      * Record the connection that through the NIC
94      * Element type: Connection
95      */
96     GQueue conn_list;
97     /* Record the connection without repetition */
98     GHashTable *connection_track_table;
99 
100     IOThread *iothread;
101     GMainContext *worker_context;
102     QEMUTimer *packet_check_timer;
103 
104     QEMUBH *event_bh;
105     enum colo_event event;
106 
107     QTAILQ_ENTRY(CompareState) next;
108 } CompareState;
109 
110 typedef struct CompareClass {
111     ObjectClass parent_class;
112 } CompareClass;
113 
114 enum {
115     PRIMARY_IN = 0,
116     SECONDARY_IN,
117 };
118 
119 static void colo_compare_inconsistency_notify(void)
120 {
121     notifier_list_notify(&colo_compare_notifiers,
122                 migrate_get_current());
123 }
124 
125 static int compare_chr_send(CompareState *s,
126                             const uint8_t *buf,
127                             uint32_t size,
128                             uint32_t vnet_hdr_len);
129 
130 static gint seq_sorter(Packet *a, Packet *b, gpointer data)
131 {
132     struct tcphdr *atcp, *btcp;
133 
134     atcp = (struct tcphdr *)(a->transport_header);
135     btcp = (struct tcphdr *)(b->transport_header);
136     return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
137 }
138 
139 static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
140 {
141     Packet *pkt = data;
142     struct tcphdr *tcphd;
143 
144     tcphd = (struct tcphdr *)pkt->transport_header;
145 
146     pkt->tcp_seq = ntohl(tcphd->th_seq);
147     pkt->tcp_ack = ntohl(tcphd->th_ack);
148     *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
149     pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
150                        + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
151     pkt->payload_size = pkt->size - pkt->header_size;
152     pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
153     pkt->flags = tcphd->th_flags;
154 }
155 
156 /*
157  * Return 1 on success, if return 0 means the
158  * packet will be dropped
159  */
160 static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
161 {
162     if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
163         if (pkt->ip->ip_p == IPPROTO_TCP) {
164             fill_pkt_tcp_info(pkt, max_ack);
165             g_queue_insert_sorted(queue,
166                                   pkt,
167                                   (GCompareDataFunc)seq_sorter,
168                                   NULL);
169         } else {
170             g_queue_push_tail(queue, pkt);
171         }
172         return 1;
173     }
174     return 0;
175 }
176 
177 /*
178  * Return 0 on success, if return -1 means the pkt
179  * is unsupported(arp and ipv6) and will be sent later
180  */
181 static int packet_enqueue(CompareState *s, int mode, Connection **con)
182 {
183     ConnectionKey key;
184     Packet *pkt = NULL;
185     Connection *conn;
186 
187     if (mode == PRIMARY_IN) {
188         pkt = packet_new(s->pri_rs.buf,
189                          s->pri_rs.packet_len,
190                          s->pri_rs.vnet_hdr_len);
191     } else {
192         pkt = packet_new(s->sec_rs.buf,
193                          s->sec_rs.packet_len,
194                          s->sec_rs.vnet_hdr_len);
195     }
196 
197     if (parse_packet_early(pkt)) {
198         packet_destroy(pkt, NULL);
199         pkt = NULL;
200         return -1;
201     }
202     fill_connection_key(pkt, &key);
203 
204     conn = connection_get(s->connection_track_table,
205                           &key,
206                           &s->conn_list);
207 
208     if (!conn->processing) {
209         g_queue_push_tail(&s->conn_list, conn);
210         conn->processing = true;
211     }
212 
213     if (mode == PRIMARY_IN) {
214         if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
215             error_report("colo compare primary queue size too big,"
216                          "drop packet");
217         }
218     } else {
219         if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
220             error_report("colo compare secondary queue size too big,"
221                          "drop packet");
222         }
223     }
224     *con = conn;
225 
226     return 0;
227 }
228 
229 static inline bool after(uint32_t seq1, uint32_t seq2)
230 {
231         return (int32_t)(seq1 - seq2) > 0;
232 }
233 
234 static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
235 {
236     int ret;
237     ret = compare_chr_send(s,
238                            pkt->data,
239                            pkt->size,
240                            pkt->vnet_hdr_len);
241     if (ret < 0) {
242         error_report("colo send primary packet failed");
243     }
244     trace_colo_compare_main("packet same and release packet");
245     packet_destroy(pkt, NULL);
246 }
247 
248 /*
249  * The IP packets sent by primary and secondary
250  * will be compared in here
251  * TODO support ip fragment, Out-Of-Order
252  * return:    0  means packet same
253  *            > 0 || < 0 means packet different
254  */
255 static int colo_compare_packet_payload(Packet *ppkt,
256                                        Packet *spkt,
257                                        uint16_t poffset,
258                                        uint16_t soffset,
259                                        uint16_t len)
260 
261 {
262     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
263         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
264 
265         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
266         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
267         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
268         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
269 
270         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
271                                    pri_ip_dst, spkt->size,
272                                    sec_ip_src, sec_ip_dst);
273     }
274 
275     return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
276 }
277 
278 /*
279  * return true means that the payload is consist and
280  * need to make the next comparison, false means do
281  * the checkpoint
282 */
283 static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
284                               int8_t *mark, uint32_t max_ack)
285 {
286     *mark = 0;
287 
288     if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
289         if (colo_compare_packet_payload(ppkt, spkt,
290                                         ppkt->header_size, spkt->header_size,
291                                         ppkt->payload_size)) {
292             *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
293             return true;
294         }
295     }
296     if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
297         if (colo_compare_packet_payload(ppkt, spkt,
298                                         ppkt->header_size, spkt->header_size,
299                                         ppkt->payload_size)) {
300             *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
301             return true;
302         }
303     }
304 
305     /* one part of secondary packet payload still need to be compared */
306     if (!after(ppkt->seq_end, spkt->seq_end)) {
307         if (colo_compare_packet_payload(ppkt, spkt,
308                                         ppkt->header_size + ppkt->offset,
309                                         spkt->header_size + spkt->offset,
310                                         ppkt->payload_size - ppkt->offset)) {
311             if (!after(ppkt->tcp_ack, max_ack)) {
312                 *mark = COLO_COMPARE_FREE_PRIMARY;
313                 spkt->offset += ppkt->payload_size - ppkt->offset;
314                 return true;
315             } else {
316                 /* secondary guest hasn't ack the data, don't send
317                  * out this packet
318                  */
319                 return false;
320             }
321         }
322     } else {
323         /* primary packet is longer than secondary packet, compare
324          * the same part and mark the primary packet offset
325          */
326         if (colo_compare_packet_payload(ppkt, spkt,
327                                         ppkt->header_size + ppkt->offset,
328                                         spkt->header_size + spkt->offset,
329                                         spkt->payload_size - spkt->offset)) {
330             *mark = COLO_COMPARE_FREE_SECONDARY;
331             ppkt->offset += spkt->payload_size - spkt->offset;
332             return true;
333         }
334     }
335 
336     return false;
337 }
338 
339 static void colo_compare_tcp(CompareState *s, Connection *conn)
340 {
341     Packet *ppkt = NULL, *spkt = NULL;
342     int8_t mark;
343 
344     /*
345      * If ppkt and spkt have the same payload, but ppkt's ACK
346      * is greater than spkt's ACK, in this case we can not
347      * send the ppkt because it will cause the secondary guest
348      * to miss sending some data in the next. Therefore, we
349      * record the maximum ACK in the current queue at both
350      * primary side and secondary side. Only when the ack is
351      * less than the smaller of the two maximum ack, then we
352      * can ensure that the packet's payload is acknowledged by
353      * primary and secondary.
354     */
355     uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
356 
357 pri:
358     if (g_queue_is_empty(&conn->primary_list)) {
359         return;
360     }
361     ppkt = g_queue_pop_head(&conn->primary_list);
362 sec:
363     if (g_queue_is_empty(&conn->secondary_list)) {
364         g_queue_push_head(&conn->primary_list, ppkt);
365         return;
366     }
367     spkt = g_queue_pop_head(&conn->secondary_list);
368 
369     if (ppkt->tcp_seq == ppkt->seq_end) {
370         colo_release_primary_pkt(s, ppkt);
371         ppkt = NULL;
372     }
373 
374     if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
375         trace_colo_compare_main("pri: this packet has compared");
376         colo_release_primary_pkt(s, ppkt);
377         ppkt = NULL;
378     }
379 
380     if (spkt->tcp_seq == spkt->seq_end) {
381         packet_destroy(spkt, NULL);
382         if (!ppkt) {
383             goto pri;
384         } else {
385             goto sec;
386         }
387     } else {
388         if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
389             trace_colo_compare_main("sec: this packet has compared");
390             packet_destroy(spkt, NULL);
391             if (!ppkt) {
392                 goto pri;
393             } else {
394                 goto sec;
395             }
396         }
397         if (!ppkt) {
398             g_queue_push_head(&conn->secondary_list, spkt);
399             goto pri;
400         }
401     }
402 
403     if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
404         trace_colo_compare_tcp_info("pri",
405                                     ppkt->tcp_seq, ppkt->tcp_ack,
406                                     ppkt->header_size, ppkt->payload_size,
407                                     ppkt->offset, ppkt->flags);
408 
409         trace_colo_compare_tcp_info("sec",
410                                     spkt->tcp_seq, spkt->tcp_ack,
411                                     spkt->header_size, spkt->payload_size,
412                                     spkt->offset, spkt->flags);
413 
414         if (mark == COLO_COMPARE_FREE_PRIMARY) {
415             conn->compare_seq = ppkt->seq_end;
416             colo_release_primary_pkt(s, ppkt);
417             g_queue_push_head(&conn->secondary_list, spkt);
418             goto pri;
419         }
420         if (mark == COLO_COMPARE_FREE_SECONDARY) {
421             conn->compare_seq = spkt->seq_end;
422             packet_destroy(spkt, NULL);
423             goto sec;
424         }
425         if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
426             conn->compare_seq = ppkt->seq_end;
427             colo_release_primary_pkt(s, ppkt);
428             packet_destroy(spkt, NULL);
429             goto pri;
430         }
431     } else {
432         g_queue_push_head(&conn->primary_list, ppkt);
433         g_queue_push_head(&conn->secondary_list, spkt);
434 
435         qemu_hexdump((char *)ppkt->data, stderr,
436                      "colo-compare ppkt", ppkt->size);
437         qemu_hexdump((char *)spkt->data, stderr,
438                      "colo-compare spkt", spkt->size);
439 
440         colo_compare_inconsistency_notify();
441     }
442 }
443 
444 
445 /*
446  * Called from the compare thread on the primary
447  * for compare udp packet
448  */
449 static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
450 {
451     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
452     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
453 
454     trace_colo_compare_main("compare udp");
455 
456     /*
457      * Because of ppkt and spkt are both in the same connection,
458      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
459      * same with spkt. In addition, IP header's Identification is a random
460      * field, we can handle it in IP fragmentation function later.
461      * COLO just concern the response net packet payload from primary guest
462      * and secondary guest are same or not, So we ignored all IP header include
463      * other field like TOS,TTL,IP Checksum. we only need to compare
464      * the ip payload here.
465      */
466     if (ppkt->size != spkt->size) {
467         trace_colo_compare_main("UDP: payload size of packets are different");
468         return -1;
469     }
470     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
471                                     ppkt->size - offset)) {
472         trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
473         trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
474         if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
475             qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
476                          ppkt->size);
477             qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
478                          spkt->size);
479         }
480         return -1;
481     } else {
482         return 0;
483     }
484 }
485 
486 /*
487  * Called from the compare thread on the primary
488  * for compare icmp packet
489  */
490 static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
491 {
492     uint16_t network_header_length = ppkt->ip->ip_hl << 2;
493     uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
494 
495     trace_colo_compare_main("compare icmp");
496 
497     /*
498      * Because of ppkt and spkt are both in the same connection,
499      * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
500      * same with spkt. In addition, IP header's Identification is a random
501      * field, we can handle it in IP fragmentation function later.
502      * COLO just concern the response net packet payload from primary guest
503      * and secondary guest are same or not, So we ignored all IP header include
504      * other field like TOS,TTL,IP Checksum. we only need to compare
505      * the ip payload here.
506      */
507     if (ppkt->size != spkt->size) {
508         trace_colo_compare_main("ICMP: payload size of packets are different");
509         return -1;
510     }
511     if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
512                                     ppkt->size - offset)) {
513         trace_colo_compare_icmp_miscompare("primary pkt size",
514                                            ppkt->size);
515         trace_colo_compare_icmp_miscompare("Secondary pkt size",
516                                            spkt->size);
517         if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
518             qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
519                          ppkt->size);
520             qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
521                          spkt->size);
522         }
523         return -1;
524     } else {
525         return 0;
526     }
527 }
528 
529 /*
530  * Called from the compare thread on the primary
531  * for compare other packet
532  */
533 static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
534 {
535     uint16_t offset = ppkt->vnet_hdr_len;
536 
537     trace_colo_compare_main("compare other");
538     if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
539         char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
540 
541         strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
542         strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
543         strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
544         strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
545 
546         trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
547                                    pri_ip_dst, spkt->size,
548                                    sec_ip_src, sec_ip_dst);
549     }
550 
551     if (ppkt->size != spkt->size) {
552         trace_colo_compare_main("Other: payload size of packets are different");
553         return -1;
554     }
555     return colo_compare_packet_payload(ppkt, spkt, offset, offset,
556                                        ppkt->size - offset);
557 }
558 
559 static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
560 {
561     int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
562 
563     if ((now - pkt->creation_ms) > (*check_time)) {
564         trace_colo_old_packet_check_found(pkt->creation_ms);
565         return 0;
566     } else {
567         return 1;
568     }
569 }
570 
571 void colo_compare_register_notifier(Notifier *notify)
572 {
573     notifier_list_add(&colo_compare_notifiers, notify);
574 }
575 
576 void colo_compare_unregister_notifier(Notifier *notify)
577 {
578     notifier_remove(notify);
579 }
580 
581 static int colo_old_packet_check_one_conn(Connection *conn,
582                                            void *user_data)
583 {
584     GList *result = NULL;
585     int64_t check_time = REGULAR_PACKET_CHECK_MS;
586 
587     result = g_queue_find_custom(&conn->primary_list,
588                                  &check_time,
589                                  (GCompareFunc)colo_old_packet_check_one);
590 
591     if (result) {
592         /* Do checkpoint will flush old packet */
593         colo_compare_inconsistency_notify();
594         return 0;
595     }
596 
597     return 1;
598 }
599 
600 /*
601  * Look for old packets that the secondary hasn't matched,
602  * if we have some then we have to checkpoint to wake
603  * the secondary up.
604  */
605 static void colo_old_packet_check(void *opaque)
606 {
607     CompareState *s = opaque;
608 
609     /*
610      * If we find one old packet, stop finding job and notify
611      * COLO frame do checkpoint.
612      */
613     g_queue_find_custom(&s->conn_list, NULL,
614                         (GCompareFunc)colo_old_packet_check_one_conn);
615 }
616 
617 static void colo_compare_packet(CompareState *s, Connection *conn,
618                                 int (*HandlePacket)(Packet *spkt,
619                                 Packet *ppkt))
620 {
621     Packet *pkt = NULL;
622     GList *result = NULL;
623 
624     while (!g_queue_is_empty(&conn->primary_list) &&
625            !g_queue_is_empty(&conn->secondary_list)) {
626         pkt = g_queue_pop_head(&conn->primary_list);
627         result = g_queue_find_custom(&conn->secondary_list,
628                  pkt, (GCompareFunc)HandlePacket);
629 
630         if (result) {
631             colo_release_primary_pkt(s, pkt);
632             g_queue_remove(&conn->secondary_list, result->data);
633         } else {
634             /*
635              * If one packet arrive late, the secondary_list or
636              * primary_list will be empty, so we can't compare it
637              * until next comparison. If the packets in the list are
638              * timeout, it will trigger a checkpoint request.
639              */
640             trace_colo_compare_main("packet different");
641             g_queue_push_head(&conn->primary_list, pkt);
642             colo_compare_inconsistency_notify();
643             break;
644         }
645     }
646 }
647 
648 /*
649  * Called from the compare thread on the primary
650  * for compare packet with secondary list of the
651  * specified connection when a new packet was
652  * queued to it.
653  */
654 static void colo_compare_connection(void *opaque, void *user_data)
655 {
656     CompareState *s = user_data;
657     Connection *conn = opaque;
658 
659     switch (conn->ip_proto) {
660     case IPPROTO_TCP:
661         colo_compare_tcp(s, conn);
662         break;
663     case IPPROTO_UDP:
664         colo_compare_packet(s, conn, colo_packet_compare_udp);
665         break;
666     case IPPROTO_ICMP:
667         colo_compare_packet(s, conn, colo_packet_compare_icmp);
668         break;
669     default:
670         colo_compare_packet(s, conn, colo_packet_compare_other);
671         break;
672     }
673 }
674 
675 static int compare_chr_send(CompareState *s,
676                             const uint8_t *buf,
677                             uint32_t size,
678                             uint32_t vnet_hdr_len)
679 {
680     int ret = 0;
681     uint32_t len = htonl(size);
682 
683     if (!size) {
684         return 0;
685     }
686 
687     ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
688     if (ret != sizeof(len)) {
689         goto err;
690     }
691 
692     if (s->vnet_hdr) {
693         /*
694          * We send vnet header len make other module(like filter-redirector)
695          * know how to parse net packet correctly.
696          */
697         len = htonl(vnet_hdr_len);
698         ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
699         if (ret != sizeof(len)) {
700             goto err;
701         }
702     }
703 
704     ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
705     if (ret != size) {
706         goto err;
707     }
708 
709     return 0;
710 
711 err:
712     return ret < 0 ? ret : -EIO;
713 }
714 
715 static int compare_chr_can_read(void *opaque)
716 {
717     return COMPARE_READ_LEN_MAX;
718 }
719 
720 /*
721  * Called from the main thread on the primary for packets
722  * arriving over the socket from the primary.
723  */
724 static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
725 {
726     CompareState *s = COLO_COMPARE(opaque);
727     int ret;
728 
729     ret = net_fill_rstate(&s->pri_rs, buf, size);
730     if (ret == -1) {
731         qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
732                                  NULL, NULL, true);
733         error_report("colo-compare primary_in error");
734     }
735 }
736 
737 /*
738  * Called from the main thread on the primary for packets
739  * arriving over the socket from the secondary.
740  */
741 static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
742 {
743     CompareState *s = COLO_COMPARE(opaque);
744     int ret;
745 
746     ret = net_fill_rstate(&s->sec_rs, buf, size);
747     if (ret == -1) {
748         qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
749                                  NULL, NULL, true);
750         error_report("colo-compare secondary_in error");
751     }
752 }
753 
754 /*
755  * Check old packet regularly so it can watch for any packets
756  * that the secondary hasn't produced equivalents of.
757  */
758 static void check_old_packet_regular(void *opaque)
759 {
760     CompareState *s = opaque;
761 
762     /* if have old packet we will notify checkpoint */
763     colo_old_packet_check(s);
764     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
765                 REGULAR_PACKET_CHECK_MS);
766 }
767 
768 /* Public API, Used for COLO frame to notify compare event */
769 void colo_notify_compares_event(void *opaque, int event, Error **errp)
770 {
771     CompareState *s;
772 
773     qemu_mutex_lock(&event_mtx);
774     QTAILQ_FOREACH(s, &net_compares, next) {
775         s->event = event;
776         qemu_bh_schedule(s->event_bh);
777         event_unhandled_count++;
778     }
779     /* Wait all compare threads to finish handling this event */
780     while (event_unhandled_count > 0) {
781         qemu_cond_wait(&event_complete_cond, &event_mtx);
782     }
783 
784     qemu_mutex_unlock(&event_mtx);
785 }
786 
787 static void colo_compare_timer_init(CompareState *s)
788 {
789     AioContext *ctx = iothread_get_aio_context(s->iothread);
790 
791     s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
792                                 SCALE_MS, check_old_packet_regular,
793                                 s);
794     timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
795                     REGULAR_PACKET_CHECK_MS);
796 }
797 
798 static void colo_compare_timer_del(CompareState *s)
799 {
800     if (s->packet_check_timer) {
801         timer_del(s->packet_check_timer);
802         timer_free(s->packet_check_timer);
803         s->packet_check_timer = NULL;
804     }
805  }
806 
807 static void colo_flush_packets(void *opaque, void *user_data);
808 
809 static void colo_compare_handle_event(void *opaque)
810 {
811     CompareState *s = opaque;
812 
813     switch (s->event) {
814     case COLO_EVENT_CHECKPOINT:
815         g_queue_foreach(&s->conn_list, colo_flush_packets, s);
816         break;
817     case COLO_EVENT_FAILOVER:
818         break;
819     default:
820         break;
821     }
822 
823     assert(event_unhandled_count > 0);
824 
825     qemu_mutex_lock(&event_mtx);
826     event_unhandled_count--;
827     qemu_cond_broadcast(&event_complete_cond);
828     qemu_mutex_unlock(&event_mtx);
829 }
830 
831 static void colo_compare_iothread(CompareState *s)
832 {
833     object_ref(OBJECT(s->iothread));
834     s->worker_context = iothread_get_g_main_context(s->iothread);
835 
836     qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
837                              compare_pri_chr_in, NULL, NULL,
838                              s, s->worker_context, true);
839     qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
840                              compare_sec_chr_in, NULL, NULL,
841                              s, s->worker_context, true);
842 
843     colo_compare_timer_init(s);
844     s->event_bh = qemu_bh_new(colo_compare_handle_event, s);
845 }
846 
847 static char *compare_get_pri_indev(Object *obj, Error **errp)
848 {
849     CompareState *s = COLO_COMPARE(obj);
850 
851     return g_strdup(s->pri_indev);
852 }
853 
854 static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
855 {
856     CompareState *s = COLO_COMPARE(obj);
857 
858     g_free(s->pri_indev);
859     s->pri_indev = g_strdup(value);
860 }
861 
862 static char *compare_get_sec_indev(Object *obj, Error **errp)
863 {
864     CompareState *s = COLO_COMPARE(obj);
865 
866     return g_strdup(s->sec_indev);
867 }
868 
869 static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
870 {
871     CompareState *s = COLO_COMPARE(obj);
872 
873     g_free(s->sec_indev);
874     s->sec_indev = g_strdup(value);
875 }
876 
877 static char *compare_get_outdev(Object *obj, Error **errp)
878 {
879     CompareState *s = COLO_COMPARE(obj);
880 
881     return g_strdup(s->outdev);
882 }
883 
884 static void compare_set_outdev(Object *obj, const char *value, Error **errp)
885 {
886     CompareState *s = COLO_COMPARE(obj);
887 
888     g_free(s->outdev);
889     s->outdev = g_strdup(value);
890 }
891 
892 static bool compare_get_vnet_hdr(Object *obj, Error **errp)
893 {
894     CompareState *s = COLO_COMPARE(obj);
895 
896     return s->vnet_hdr;
897 }
898 
899 static void compare_set_vnet_hdr(Object *obj,
900                                  bool value,
901                                  Error **errp)
902 {
903     CompareState *s = COLO_COMPARE(obj);
904 
905     s->vnet_hdr = value;
906 }
907 
908 static void compare_pri_rs_finalize(SocketReadState *pri_rs)
909 {
910     CompareState *s = container_of(pri_rs, CompareState, pri_rs);
911     Connection *conn = NULL;
912 
913     if (packet_enqueue(s, PRIMARY_IN, &conn)) {
914         trace_colo_compare_main("primary: unsupported packet in");
915         compare_chr_send(s,
916                          pri_rs->buf,
917                          pri_rs->packet_len,
918                          pri_rs->vnet_hdr_len);
919     } else {
920         /* compare packet in the specified connection */
921         colo_compare_connection(conn, s);
922     }
923 }
924 
925 static void compare_sec_rs_finalize(SocketReadState *sec_rs)
926 {
927     CompareState *s = container_of(sec_rs, CompareState, sec_rs);
928     Connection *conn = NULL;
929 
930     if (packet_enqueue(s, SECONDARY_IN, &conn)) {
931         trace_colo_compare_main("secondary: unsupported packet in");
932     } else {
933         /* compare packet in the specified connection */
934         colo_compare_connection(conn, s);
935     }
936 }
937 
938 
939 /*
940  * Return 0 is success.
941  * Return 1 is failed.
942  */
943 static int find_and_check_chardev(Chardev **chr,
944                                   char *chr_name,
945                                   Error **errp)
946 {
947     *chr = qemu_chr_find(chr_name);
948     if (*chr == NULL) {
949         error_setg(errp, "Device '%s' not found",
950                    chr_name);
951         return 1;
952     }
953 
954     if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
955         error_setg(errp, "chardev \"%s\" is not reconnectable",
956                    chr_name);
957         return 1;
958     }
959 
960     return 0;
961 }
962 
963 /*
964  * Called from the main thread on the primary
965  * to setup colo-compare.
966  */
967 static void colo_compare_complete(UserCreatable *uc, Error **errp)
968 {
969     CompareState *s = COLO_COMPARE(uc);
970     Chardev *chr;
971 
972     if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
973         error_setg(errp, "colo compare needs 'primary_in' ,"
974                    "'secondary_in','outdev','iothread' property set");
975         return;
976     } else if (!strcmp(s->pri_indev, s->outdev) ||
977                !strcmp(s->sec_indev, s->outdev) ||
978                !strcmp(s->pri_indev, s->sec_indev)) {
979         error_setg(errp, "'indev' and 'outdev' could not be same "
980                    "for compare module");
981         return;
982     }
983 
984     if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
985         !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
986         return;
987     }
988 
989     if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
990         !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
991         return;
992     }
993 
994     if (find_and_check_chardev(&chr, s->outdev, errp) ||
995         !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
996         return;
997     }
998 
999     net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
1000     net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
1001 
1002     QTAILQ_INSERT_TAIL(&net_compares, s, next);
1003 
1004     g_queue_init(&s->conn_list);
1005 
1006     qemu_mutex_init(&event_mtx);
1007     qemu_cond_init(&event_complete_cond);
1008 
1009     s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1010                                                       connection_key_equal,
1011                                                       g_free,
1012                                                       connection_destroy);
1013 
1014     colo_compare_iothread(s);
1015     return;
1016 }
1017 
1018 static void colo_flush_packets(void *opaque, void *user_data)
1019 {
1020     CompareState *s = user_data;
1021     Connection *conn = opaque;
1022     Packet *pkt = NULL;
1023 
1024     while (!g_queue_is_empty(&conn->primary_list)) {
1025         pkt = g_queue_pop_head(&conn->primary_list);
1026         compare_chr_send(s,
1027                          pkt->data,
1028                          pkt->size,
1029                          pkt->vnet_hdr_len);
1030         packet_destroy(pkt, NULL);
1031     }
1032     while (!g_queue_is_empty(&conn->secondary_list)) {
1033         pkt = g_queue_pop_head(&conn->secondary_list);
1034         packet_destroy(pkt, NULL);
1035     }
1036 }
1037 
1038 static void colo_compare_class_init(ObjectClass *oc, void *data)
1039 {
1040     UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1041 
1042     ucc->complete = colo_compare_complete;
1043 }
1044 
1045 static void colo_compare_init(Object *obj)
1046 {
1047     CompareState *s = COLO_COMPARE(obj);
1048 
1049     object_property_add_str(obj, "primary_in",
1050                             compare_get_pri_indev, compare_set_pri_indev,
1051                             NULL);
1052     object_property_add_str(obj, "secondary_in",
1053                             compare_get_sec_indev, compare_set_sec_indev,
1054                             NULL);
1055     object_property_add_str(obj, "outdev",
1056                             compare_get_outdev, compare_set_outdev,
1057                             NULL);
1058     object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1059                             (Object **)&s->iothread,
1060                             object_property_allow_set_link,
1061                             OBJ_PROP_LINK_STRONG, NULL);
1062 
1063     s->vnet_hdr = false;
1064     object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
1065                              compare_set_vnet_hdr, NULL);
1066 }
1067 
1068 static void colo_compare_finalize(Object *obj)
1069 {
1070     CompareState *s = COLO_COMPARE(obj);
1071     CompareState *tmp = NULL;
1072 
1073     qemu_chr_fe_deinit(&s->chr_pri_in, false);
1074     qemu_chr_fe_deinit(&s->chr_sec_in, false);
1075     qemu_chr_fe_deinit(&s->chr_out, false);
1076     if (s->iothread) {
1077         colo_compare_timer_del(s);
1078     }
1079 
1080     qemu_bh_delete(s->event_bh);
1081 
1082     QTAILQ_FOREACH(tmp, &net_compares, next) {
1083         if (tmp == s) {
1084             QTAILQ_REMOVE(&net_compares, s, next);
1085             break;
1086         }
1087     }
1088 
1089     /* Release all unhandled packets after compare thead exited */
1090     g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1091 
1092     g_queue_clear(&s->conn_list);
1093 
1094     if (s->connection_track_table) {
1095         g_hash_table_destroy(s->connection_track_table);
1096     }
1097 
1098     if (s->iothread) {
1099         object_unref(OBJECT(s->iothread));
1100     }
1101 
1102     qemu_mutex_destroy(&event_mtx);
1103     qemu_cond_destroy(&event_complete_cond);
1104 
1105     g_free(s->pri_indev);
1106     g_free(s->sec_indev);
1107     g_free(s->outdev);
1108 }
1109 
1110 static const TypeInfo colo_compare_info = {
1111     .name = TYPE_COLO_COMPARE,
1112     .parent = TYPE_OBJECT,
1113     .instance_size = sizeof(CompareState),
1114     .instance_init = colo_compare_init,
1115     .instance_finalize = colo_compare_finalize,
1116     .class_size = sizeof(CompareClass),
1117     .class_init = colo_compare_class_init,
1118     .interfaces = (InterfaceInfo[]) {
1119         { TYPE_USER_CREATABLE },
1120         { }
1121     }
1122 };
1123 
1124 static void register_types(void)
1125 {
1126     type_register_static(&colo_compare_info);
1127 }
1128 
1129 type_init(register_types);
1130