]> Git Repo - qemu.git/blame - net/colo-compare.c
net/colo-compare.c: Correct ordering in complete and finalize
[qemu.git] / net / colo-compare.c
CommitLineData
7dce4e6f
ZC
1/*
2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3 * (a.k.a. Fault Tolerance or Continuous Replication)
4 *
5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6 * Copyright (c) 2016 FUJITSU LIMITED
7 * Copyright (c) 2016 Intel Corporation
8 *
9 * Author: Zhang Chen <[email protected]>
10 *
11 * This work is licensed under the terms of the GNU GPL, version 2 or
12 * later. See the COPYING file in the top-level directory.
13 */
14
15#include "qemu/osdep.h"
a8d25326 16#include "qemu-common.h"
7dce4e6f 17#include "qemu/error-report.h"
59509ec1 18#include "trace.h"
7dce4e6f
ZC
19#include "qapi/error.h"
20#include "net/net.h"
f4b61836 21#include "net/eth.h"
7dce4e6f
ZC
22#include "qom/object_interfaces.h"
23#include "qemu/iov.h"
24#include "qom/object.h"
7dce4e6f 25#include "net/queue.h"
4d43a603 26#include "chardev/char-fe.h"
7dce4e6f 27#include "qemu/sockets.h"
f27f01db 28#include "colo.h"
dd321ecf 29#include "sysemu/iothread.h"
0ffcece3
ZC
30#include "net/colo-compare.h"
31#include "migration/colo.h"
dccd0313 32#include "migration/migration.h"
e05ae1d9 33#include "util.h"
7dce4e6f 34
9c55fe94
LS
35#include "block/aio-wait.h"
36#include "qemu/coroutine.h"
37
7dce4e6f
ZC
38#define TYPE_COLO_COMPARE "colo-compare"
39#define COLO_COMPARE(obj) \
40 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
41
0ffcece3
ZC
42static QTAILQ_HEAD(, CompareState) net_compares =
43 QTAILQ_HEAD_INITIALIZER(net_compares);
44
dccd0313
ZC
45static NotifierList colo_compare_notifiers =
46 NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers);
47
0682e15b 48#define COMPARE_READ_LEN_MAX NET_BUFSIZE
b6540d40
ZC
49#define MAX_QUEUE_SIZE 1024
50
f449c9e5
MZ
51#define COLO_COMPARE_FREE_PRIMARY 0x01
52#define COLO_COMPARE_FREE_SECONDARY 0x02
53
0682e15b 54#define REGULAR_PACKET_CHECK_MS 3000
9cc43c94 55#define DEFAULT_TIME_OUT_MS 3000
0682e15b 56
45942b79
LS
57static QemuMutex colo_compare_mutex;
58static bool colo_compare_active;
0ffcece3
ZC
59static QemuMutex event_mtx;
60static QemuCond event_complete_cond;
61static int event_unhandled_count;
62
59509ec1 63/*
61c5f469
ZC
64 * + CompareState ++
65 * | |
66 * +---------------+ +---------------+ +---------------+
67 * | conn list + - > conn + ------- > conn + -- > ......
68 * +---------------+ +---------------+ +---------------+
69 * | | | | | |
70 * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
71 * |primary | |secondary |primary | |secondary
72 * |packet | |packet + |packet | |packet +
73 * +--------+ +--------+ +--------+ +--------+
74 * | | | |
75 * +---v----+ +---v----+ +---v----+ +---v----+
76 * |primary | |secondary |primary | |secondary
77 * |packet | |packet + |packet | |packet +
78 * +--------+ +--------+ +--------+ +--------+
79 * | | | |
80 * +---v----+ +---v----+ +---v----+ +---v----+
81 * |primary | |secondary |primary | |secondary
82 * |packet | |packet + |packet | |packet +
83 * +--------+ +--------+ +--------+ +--------+
84 */
9c55fe94
LS
85
86typedef struct SendCo {
87 Coroutine *co;
88 struct CompareState *s;
89 CharBackend *chr;
90 GQueue send_list;
91 bool notify_remote_frame;
92 bool done;
93 int ret;
94} SendCo;
95
96typedef struct SendEntry {
97 uint32_t size;
98 uint32_t vnet_hdr_len;
99 uint8_t *buf;
100} SendEntry;
101
7dce4e6f
ZC
102typedef struct CompareState {
103 Object parent;
104
105 char *pri_indev;
106 char *sec_indev;
107 char *outdev;
cf6af766 108 char *notify_dev;
32a6ebec
MAL
109 CharBackend chr_pri_in;
110 CharBackend chr_sec_in;
111 CharBackend chr_out;
13025fee 112 CharBackend chr_notify_dev;
7dce4e6f
ZC
113 SocketReadState pri_rs;
114 SocketReadState sec_rs;
13025fee 115 SocketReadState notify_rs;
9c55fe94
LS
116 SendCo out_sendco;
117 SendCo notify_sendco;
aa3a7032 118 bool vnet_hdr;
9cc43c94 119 uint32_t compare_timeout;
cca35ac4 120 uint32_t expired_scan_cycle;
59509ec1 121
61c5f469
ZC
122 /*
123 * Record the connection that through the NIC
124 * Element type: Connection
b6540d40
ZC
125 */
126 GQueue conn_list;
61c5f469 127 /* Record the connection without repetition */
59509ec1 128 GHashTable *connection_track_table;
dfd917a9 129
dd321ecf 130 IOThread *iothread;
b43decb0 131 GMainContext *worker_context;
dd321ecf 132 QEMUTimer *packet_check_timer;
0ffcece3
ZC
133
134 QEMUBH *event_bh;
135 enum colo_event event;
136
137 QTAILQ_ENTRY(CompareState) next;
7dce4e6f
ZC
138} CompareState;
139
140typedef struct CompareClass {
141 ObjectClass parent_class;
142} CompareClass;
143
59509ec1
ZC
144enum {
145 PRIMARY_IN = 0,
146 SECONDARY_IN,
147};
148
24525e93 149
3037e7a5 150static int compare_chr_send(CompareState *s,
9c55fe94 151 uint8_t *buf,
aa3a7032 152 uint32_t size,
30685c00 153 uint32_t vnet_hdr_len,
9c55fe94
LS
154 bool notify_remote_frame,
155 bool zero_copy);
59509ec1 156
f77bed14
ZC
157static bool packet_matches_str(const char *str,
158 const uint8_t *buf,
159 uint32_t packet_len)
160{
161 if (packet_len != strlen(str)) {
162 return false;
163 }
164
165 return !memcmp(str, buf, strlen(str));
166}
167
1d09f700
ZC
168static void notify_remote_frame(CompareState *s)
169{
170 char msg[] = "DO_CHECKPOINT";
171 int ret = 0;
172
9c55fe94 173 ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
1d09f700
ZC
174 if (ret < 0) {
175 error_report("Notify Xen COLO-frame failed");
176 }
177}
178
179static void colo_compare_inconsistency_notify(CompareState *s)
180{
181 if (s->notify_dev) {
182 notify_remote_frame(s);
183 } else {
184 notifier_list_notify(&colo_compare_notifiers,
185 migrate_get_current());
186 }
187}
188
a935cc31
ZC
189static gint seq_sorter(Packet *a, Packet *b, gpointer data)
190{
e05ae1d9 191 struct tcp_hdr *atcp, *btcp;
a935cc31 192
e05ae1d9
MAL
193 atcp = (struct tcp_hdr *)(a->transport_header);
194 btcp = (struct tcp_hdr *)(b->transport_header);
a935cc31
ZC
195 return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
196}
197
f449c9e5
MZ
198static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
199{
200 Packet *pkt = data;
e05ae1d9 201 struct tcp_hdr *tcphd;
f449c9e5 202
e05ae1d9 203 tcphd = (struct tcp_hdr *)pkt->transport_header;
f449c9e5
MZ
204
205 pkt->tcp_seq = ntohl(tcphd->th_seq);
206 pkt->tcp_ack = ntohl(tcphd->th_ack);
207 *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
208 pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
209 + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
210 pkt->payload_size = pkt->size - pkt->header_size;
211 pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
212 pkt->flags = tcphd->th_flags;
213}
214
8850d4ca
MZ
215/*
216 * Return 1 on success, if return 0 means the
217 * packet will be dropped
218 */
f449c9e5 219static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
8850d4ca
MZ
220{
221 if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
222 if (pkt->ip->ip_p == IPPROTO_TCP) {
f449c9e5 223 fill_pkt_tcp_info(pkt, max_ack);
8850d4ca
MZ
224 g_queue_insert_sorted(queue,
225 pkt,
226 (GCompareDataFunc)seq_sorter,
227 NULL);
228 } else {
229 g_queue_push_tail(queue, pkt);
230 }
231 return 1;
232 }
233 return 0;
234}
235
59509ec1
ZC
236/*
237 * Return 0 on success, if return -1 means the pkt
238 * is unsupported(arp and ipv6) and will be sent later
239 */
8ec14402 240static int packet_enqueue(CompareState *s, int mode, Connection **con)
59509ec1 241{
b6540d40 242 ConnectionKey key;
59509ec1 243 Packet *pkt = NULL;
b6540d40 244 Connection *conn;
59509ec1
ZC
245
246 if (mode == PRIMARY_IN) {
ada1a33f
ZC
247 pkt = packet_new(s->pri_rs.buf,
248 s->pri_rs.packet_len,
249 s->pri_rs.vnet_hdr_len);
59509ec1 250 } else {
ada1a33f
ZC
251 pkt = packet_new(s->sec_rs.buf,
252 s->sec_rs.packet_len,
253 s->sec_rs.vnet_hdr_len);
59509ec1
ZC
254 }
255
256 if (parse_packet_early(pkt)) {
257 packet_destroy(pkt, NULL);
258 pkt = NULL;
259 return -1;
260 }
b6540d40 261 fill_connection_key(pkt, &key);
59509ec1 262
b6540d40
ZC
263 conn = connection_get(s->connection_track_table,
264 &key,
265 &s->conn_list);
59509ec1 266
b6540d40
ZC
267 if (!conn->processing) {
268 g_queue_push_tail(&s->conn_list, conn);
269 conn->processing = true;
270 }
271
272 if (mode == PRIMARY_IN) {
f449c9e5 273 if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
b6540d40
ZC
274 error_report("colo compare primary queue size too big,"
275 "drop packet");
276 }
277 } else {
f449c9e5 278 if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
b6540d40
ZC
279 error_report("colo compare secondary queue size too big,"
280 "drop packet");
281 }
282 }
4d366235 283 *con = conn;
59509ec1
ZC
284
285 return 0;
286}
287
f449c9e5
MZ
288static inline bool after(uint32_t seq1, uint32_t seq2)
289{
290 return (int32_t)(seq1 - seq2) > 0;
291}
292
293static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
294{
295 int ret;
296 ret = compare_chr_send(s,
297 pkt->data,
298 pkt->size,
30685c00 299 pkt->vnet_hdr_len,
9c55fe94
LS
300 false,
301 true);
f449c9e5
MZ
302 if (ret < 0) {
303 error_report("colo send primary packet failed");
304 }
305 trace_colo_compare_main("packet same and release packet");
9c55fe94 306 packet_destroy_partial(pkt, NULL);
f449c9e5
MZ
307}
308
0682e15b
ZC
309/*
310 * The IP packets sent by primary and secondary
311 * will be compared in here
312 * TODO support ip fragment, Out-Of-Order
313 * return: 0 means packet same
314 * > 0 || < 0 means packet different
315 */
9394133f
MZ
316static int colo_compare_packet_payload(Packet *ppkt,
317 Packet *spkt,
318 uint16_t poffset,
319 uint16_t soffset,
320 uint16_t len)
321
0682e15b 322{
d87aa138 323 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
e630b2bf
ZC
324 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
325
326 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
327 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
328 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
329 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
330
331 trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
332 pri_ip_dst, spkt->size,
333 sec_ip_src, sec_ip_dst);
334 }
0682e15b 335
9394133f 336 return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
0682e15b
ZC
337}
338
f4b61836 339/*
f449c9e5
MZ
340 * return true means that the payload is consist and
341 * need to make the next comparison, false means do
342 * the checkpoint
343*/
344static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
345 int8_t *mark, uint32_t max_ack)
0682e15b 346{
f449c9e5
MZ
347 *mark = 0;
348
f449c9e5 349 if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
1e907a32 350 if (!colo_compare_packet_payload(ppkt, spkt,
f449c9e5
MZ
351 ppkt->header_size, spkt->header_size,
352 ppkt->payload_size)) {
353 *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
354 return true;
355 }
356 }
f4b61836 357
f449c9e5
MZ
358 /* one part of secondary packet payload still need to be compared */
359 if (!after(ppkt->seq_end, spkt->seq_end)) {
1e907a32 360 if (!colo_compare_packet_payload(ppkt, spkt,
f449c9e5
MZ
361 ppkt->header_size + ppkt->offset,
362 spkt->header_size + spkt->offset,
363 ppkt->payload_size - ppkt->offset)) {
364 if (!after(ppkt->tcp_ack, max_ack)) {
365 *mark = COLO_COMPARE_FREE_PRIMARY;
366 spkt->offset += ppkt->payload_size - ppkt->offset;
367 return true;
368 } else {
369 /* secondary guest hasn't ack the data, don't send
370 * out this packet
371 */
372 return false;
373 }
374 }
375 } else {
376 /* primary packet is longer than secondary packet, compare
377 * the same part and mark the primary packet offset
378 */
1e907a32 379 if (!colo_compare_packet_payload(ppkt, spkt,
f449c9e5
MZ
380 ppkt->header_size + ppkt->offset,
381 spkt->header_size + spkt->offset,
382 spkt->payload_size - spkt->offset)) {
383 *mark = COLO_COMPARE_FREE_SECONDARY;
384 ppkt->offset += spkt->payload_size - spkt->offset;
385 return true;
386 }
387 }
388
389 return false;
390}
2ad7ca4c 391
f449c9e5
MZ
392static void colo_compare_tcp(CompareState *s, Connection *conn)
393{
394 Packet *ppkt = NULL, *spkt = NULL;
395 int8_t mark;
f4b61836
ZC
396
397 /*
f449c9e5
MZ
398 * If ppkt and spkt have the same payload, but ppkt's ACK
399 * is greater than spkt's ACK, in this case we can not
400 * send the ppkt because it will cause the secondary guest
401 * to miss sending some data in the next. Therefore, we
402 * record the maximum ACK in the current queue at both
403 * primary side and secondary side. Only when the ack is
404 * less than the smaller of the two maximum ack, then we
405 * can ensure that the packet's payload is acknowledged by
406 * primary and secondary.
407 */
408 uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
409
410pri:
411 if (g_queue_is_empty(&conn->primary_list)) {
412 return;
413 }
414 ppkt = g_queue_pop_head(&conn->primary_list);
415sec:
416 if (g_queue_is_empty(&conn->secondary_list)) {
417 g_queue_push_head(&conn->primary_list, ppkt);
418 return;
f4b61836 419 }
f449c9e5 420 spkt = g_queue_pop_head(&conn->secondary_list);
f4b61836 421
f449c9e5
MZ
422 if (ppkt->tcp_seq == ppkt->seq_end) {
423 colo_release_primary_pkt(s, ppkt);
424 ppkt = NULL;
425 }
9394133f 426
f449c9e5
MZ
427 if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
428 trace_colo_compare_main("pri: this packet has compared");
429 colo_release_primary_pkt(s, ppkt);
430 ppkt = NULL;
431 }
9394133f 432
f449c9e5
MZ
433 if (spkt->tcp_seq == spkt->seq_end) {
434 packet_destroy(spkt, NULL);
435 if (!ppkt) {
436 goto pri;
437 } else {
438 goto sec;
439 }
6efeb328 440 } else {
f449c9e5
MZ
441 if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
442 trace_colo_compare_main("sec: this packet has compared");
443 packet_destroy(spkt, NULL);
444 if (!ppkt) {
445 goto pri;
446 } else {
447 goto sec;
448 }
449 }
450 if (!ppkt) {
451 g_queue_push_head(&conn->secondary_list, spkt);
452 goto pri;
453 }
6efeb328 454 }
f4b61836 455
f449c9e5
MZ
456 if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
457 trace_colo_compare_tcp_info("pri",
458 ppkt->tcp_seq, ppkt->tcp_ack,
459 ppkt->header_size, ppkt->payload_size,
460 ppkt->offset, ppkt->flags);
461
462 trace_colo_compare_tcp_info("sec",
463 spkt->tcp_seq, spkt->tcp_ack,
464 spkt->header_size, spkt->payload_size,
465 spkt->offset, spkt->flags);
466
467 if (mark == COLO_COMPARE_FREE_PRIMARY) {
468 conn->compare_seq = ppkt->seq_end;
469 colo_release_primary_pkt(s, ppkt);
470 g_queue_push_head(&conn->secondary_list, spkt);
471 goto pri;
472 }
473 if (mark == COLO_COMPARE_FREE_SECONDARY) {
474 conn->compare_seq = spkt->seq_end;
475 packet_destroy(spkt, NULL);
476 goto sec;
477 }
478 if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
479 conn->compare_seq = ppkt->seq_end;
480 colo_release_primary_pkt(s, ppkt);
481 packet_destroy(spkt, NULL);
482 goto pri;
483 }
484 } else {
485 g_queue_push_head(&conn->primary_list, ppkt);
486 g_queue_push_head(&conn->secondary_list, spkt);
2061c14c 487
76658541
LS
488 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
489 qemu_hexdump((char *)ppkt->data, stderr,
490 "colo-compare ppkt", ppkt->size);
491 qemu_hexdump((char *)spkt->data, stderr,
492 "colo-compare spkt", spkt->size);
493 }
f4b61836 494
1d09f700 495 colo_compare_inconsistency_notify(s);
f449c9e5 496 }
f4b61836
ZC
497}
498
f449c9e5 499
f4b61836
ZC
500/*
501 * Called from the compare thread on the primary
502 * for compare udp packet
503 */
504static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
505{
9394133f
MZ
506 uint16_t network_header_length = ppkt->ip->ip_hl << 2;
507 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
f4b61836
ZC
508
509 trace_colo_compare_main("compare udp");
2ad7ca4c 510
6efeb328
ZC
511 /*
512 * Because of ppkt and spkt are both in the same connection,
513 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
514 * same with spkt. In addition, IP header's Identification is a random
515 * field, we can handle it in IP fragmentation function later.
516 * COLO just concern the response net packet payload from primary guest
517 * and secondary guest are same or not, So we ignored all IP header include
518 * other field like TOS,TTL,IP Checksum. we only need to compare
519 * the ip payload here.
520 */
9394133f
MZ
521 if (ppkt->size != spkt->size) {
522 trace_colo_compare_main("UDP: payload size of packets are different");
523 return -1;
524 }
525 if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
526 ppkt->size - offset)) {
f4b61836 527 trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
f4b61836 528 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
d87aa138 529 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
1723a7f7
ZC
530 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
531 ppkt->size);
532 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
533 spkt->size);
534 }
9394133f
MZ
535 return -1;
536 } else {
537 return 0;
f4b61836 538 }
f4b61836
ZC
539}
540
541/*
542 * Called from the compare thread on the primary
543 * for compare icmp packet
544 */
545static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
546{
9394133f
MZ
547 uint16_t network_header_length = ppkt->ip->ip_hl << 2;
548 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
6efeb328 549
f4b61836 550 trace_colo_compare_main("compare icmp");
f4b61836 551
6efeb328
ZC
552 /*
553 * Because of ppkt and spkt are both in the same connection,
554 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
555 * same with spkt. In addition, IP header's Identification is a random
556 * field, we can handle it in IP fragmentation function later.
557 * COLO just concern the response net packet payload from primary guest
558 * and secondary guest are same or not, So we ignored all IP header include
559 * other field like TOS,TTL,IP Checksum. we only need to compare
560 * the ip payload here.
561 */
9394133f
MZ
562 if (ppkt->size != spkt->size) {
563 trace_colo_compare_main("ICMP: payload size of packets are different");
564 return -1;
565 }
566 if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
567 ppkt->size - offset)) {
f4b61836
ZC
568 trace_colo_compare_icmp_miscompare("primary pkt size",
569 ppkt->size);
f4b61836
ZC
570 trace_colo_compare_icmp_miscompare("Secondary pkt size",
571 spkt->size);
d87aa138 572 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
1723a7f7
ZC
573 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
574 ppkt->size);
575 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
576 spkt->size);
577 }
f4b61836
ZC
578 return -1;
579 } else {
580 return 0;
581 }
582}
583
584/*
585 * Called from the compare thread on the primary
586 * for compare other packet
587 */
588static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
589{
9394133f
MZ
590 uint16_t offset = ppkt->vnet_hdr_len;
591
f4b61836 592 trace_colo_compare_main("compare other");
d87aa138 593 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
e630b2bf
ZC
594 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
595
596 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
597 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
598 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
599 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
600
601 trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
602 pri_ip_dst, spkt->size,
603 sec_ip_src, sec_ip_dst);
604 }
605
9394133f
MZ
606 if (ppkt->size != spkt->size) {
607 trace_colo_compare_main("Other: payload size of packets are different");
608 return -1;
609 }
610 return colo_compare_packet_payload(ppkt, spkt, offset, offset,
611 ppkt->size - offset);
0682e15b
ZC
612}
613
614static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
615{
616 int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
617
618 if ((now - pkt->creation_ms) > (*check_time)) {
619 trace_colo_old_packet_check_found(pkt->creation_ms);
620 return 0;
621 } else {
622 return 1;
623 }
624}
625
dccd0313
ZC
626void colo_compare_register_notifier(Notifier *notify)
627{
628 notifier_list_add(&colo_compare_notifiers, notify);
629}
630
631void colo_compare_unregister_notifier(Notifier *notify)
632{
633 notifier_remove(notify);
634}
635
d25a7dab 636static int colo_old_packet_check_one_conn(Connection *conn,
1d09f700 637 CompareState *s)
0682e15b 638{
0682e15b 639 GList *result = NULL;
0682e15b
ZC
640
641 result = g_queue_find_custom(&conn->primary_list,
9cc43c94 642 &s->compare_timeout,
0682e15b
ZC
643 (GCompareFunc)colo_old_packet_check_one);
644
645 if (result) {
61c5f469 646 /* Do checkpoint will flush old packet */
1d09f700 647 colo_compare_inconsistency_notify(s);
d25a7dab 648 return 0;
0682e15b 649 }
d25a7dab
ZC
650
651 return 1;
0682e15b
ZC
652}
653
654/*
655 * Look for old packets that the secondary hasn't matched,
656 * if we have some then we have to checkpoint to wake
657 * the secondary up.
658 */
659static void colo_old_packet_check(void *opaque)
660{
661 CompareState *s = opaque;
662
d25a7dab
ZC
663 /*
664 * If we find one old packet, stop finding job and notify
665 * COLO frame do checkpoint.
666 */
1d09f700 667 g_queue_find_custom(&s->conn_list, s,
d25a7dab 668 (GCompareFunc)colo_old_packet_check_one_conn);
0682e15b
ZC
669}
670
f449c9e5
MZ
671static void colo_compare_packet(CompareState *s, Connection *conn,
672 int (*HandlePacket)(Packet *spkt,
673 Packet *ppkt))
0682e15b 674{
0682e15b
ZC
675 Packet *pkt = NULL;
676 GList *result = NULL;
0682e15b
ZC
677
678 while (!g_queue_is_empty(&conn->primary_list) &&
679 !g_queue_is_empty(&conn->secondary_list)) {
626bba98 680 pkt = g_queue_pop_head(&conn->primary_list);
f449c9e5
MZ
681 result = g_queue_find_custom(&conn->secondary_list,
682 pkt, (GCompareFunc)HandlePacket);
0682e15b
ZC
683
684 if (result) {
f449c9e5 685 colo_release_primary_pkt(s, pkt);
0682e15b 686 g_queue_remove(&conn->secondary_list, result->data);
0682e15b
ZC
687 } else {
688 /*
689 * If one packet arrive late, the secondary_list or
690 * primary_list will be empty, so we can't compare it
dccd0313
ZC
691 * until next comparison. If the packets in the list are
692 * timeout, it will trigger a checkpoint request.
0682e15b
ZC
693 */
694 trace_colo_compare_main("packet different");
626bba98 695 g_queue_push_head(&conn->primary_list, pkt);
1d09f700
ZC
696
697 colo_compare_inconsistency_notify(s);
0682e15b
ZC
698 break;
699 }
700 }
701}
702
f449c9e5
MZ
703/*
704 * Called from the compare thread on the primary
705 * for compare packet with secondary list of the
706 * specified connection when a new packet was
707 * queued to it.
708 */
709static void colo_compare_connection(void *opaque, void *user_data)
710{
711 CompareState *s = user_data;
712 Connection *conn = opaque;
713
714 switch (conn->ip_proto) {
715 case IPPROTO_TCP:
716 colo_compare_tcp(s, conn);
717 break;
718 case IPPROTO_UDP:
719 colo_compare_packet(s, conn, colo_packet_compare_udp);
720 break;
721 case IPPROTO_ICMP:
722 colo_compare_packet(s, conn, colo_packet_compare_icmp);
723 break;
724 default:
725 colo_compare_packet(s, conn, colo_packet_compare_other);
726 break;
727 }
728}
729
9c55fe94 730static void coroutine_fn _compare_chr_send(void *opaque)
59509ec1 731{
9c55fe94
LS
732 SendCo *sendco = opaque;
733 CompareState *s = sendco->s;
59509ec1 734 int ret = 0;
59509ec1 735
9c55fe94
LS
736 while (!g_queue_is_empty(&sendco->send_list)) {
737 SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
738 uint32_t len = htonl(entry->size);
59509ec1 739
9c55fe94 740 ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
30685c00 741
9c55fe94
LS
742 if (ret != sizeof(len)) {
743 g_free(entry->buf);
744 g_slice_free(SendEntry, entry);
745 goto err;
746 }
59509ec1 747
9c55fe94
LS
748 if (!sendco->notify_remote_frame && s->vnet_hdr) {
749 /*
750 * We send vnet header len make other module(like filter-redirector)
751 * know how to parse net packet correctly.
752 */
753 len = htonl(entry->vnet_hdr_len);
30685c00 754
9c55fe94 755 ret = qemu_chr_fe_write_all(sendco->chr,
30685c00
ZC
756 (uint8_t *)&len,
757 sizeof(len));
9c55fe94
LS
758
759 if (ret != sizeof(len)) {
760 g_free(entry->buf);
761 g_slice_free(SendEntry, entry);
762 goto err;
763 }
30685c00
ZC
764 }
765
9c55fe94
LS
766 ret = qemu_chr_fe_write_all(sendco->chr,
767 (uint8_t *)entry->buf,
768 entry->size);
769
770 if (ret != entry->size) {
771 g_free(entry->buf);
772 g_slice_free(SendEntry, entry);
aa3a7032
ZC
773 goto err;
774 }
9c55fe94
LS
775
776 g_free(entry->buf);
777 g_slice_free(SendEntry, entry);
aa3a7032
ZC
778 }
779
9c55fe94
LS
780 sendco->ret = 0;
781 goto out;
782
783err:
784 while (!g_queue_is_empty(&sendco->send_list)) {
785 SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
786 g_free(entry->buf);
787 g_slice_free(SendEntry, entry);
788 }
789 sendco->ret = ret < 0 ? ret : -EIO;
790out:
791 sendco->co = NULL;
792 sendco->done = true;
793 aio_wait_kick();
794}
795
796static int compare_chr_send(CompareState *s,
797 uint8_t *buf,
798 uint32_t size,
799 uint32_t vnet_hdr_len,
800 bool notify_remote_frame,
801 bool zero_copy)
802{
803 SendCo *sendco;
804 SendEntry *entry;
805
30685c00 806 if (notify_remote_frame) {
9c55fe94 807 sendco = &s->notify_sendco;
30685c00 808 } else {
9c55fe94 809 sendco = &s->out_sendco;
30685c00
ZC
810 }
811
9c55fe94
LS
812 if (!size) {
813 return 0;
59509ec1
ZC
814 }
815
9c55fe94
LS
816 entry = g_slice_new(SendEntry);
817 entry->size = size;
818 entry->vnet_hdr_len = vnet_hdr_len;
819 if (zero_copy) {
820 entry->buf = buf;
821 } else {
822 entry->buf = g_malloc(size);
823 memcpy(entry->buf, buf, size);
824 }
825 g_queue_push_head(&sendco->send_list, entry);
826
827 if (sendco->done) {
828 sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
829 sendco->done = false;
830 qemu_coroutine_enter(sendco->co);
831 if (sendco->done) {
832 /* report early errors */
833 return sendco->ret;
834 }
835 }
59509ec1 836
9c55fe94
LS
837 /* assume success */
838 return 0;
59509ec1
ZC
839}
840
0682e15b
ZC
841static int compare_chr_can_read(void *opaque)
842{
843 return COMPARE_READ_LEN_MAX;
844}
845
846/*
847 * Called from the main thread on the primary for packets
848 * arriving over the socket from the primary.
849 */
850static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
851{
852 CompareState *s = COLO_COMPARE(opaque);
853 int ret;
854
855 ret = net_fill_rstate(&s->pri_rs, buf, size);
856 if (ret == -1) {
81517ba3 857 qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
39ab61c6 858 NULL, NULL, true);
0682e15b
ZC
859 error_report("colo-compare primary_in error");
860 }
861}
862
863/*
864 * Called from the main thread on the primary for packets
865 * arriving over the socket from the secondary.
866 */
867static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
868{
869 CompareState *s = COLO_COMPARE(opaque);
870 int ret;
871
872 ret = net_fill_rstate(&s->sec_rs, buf, size);
873 if (ret == -1) {
81517ba3 874 qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
39ab61c6 875 NULL, NULL, true);
0682e15b
ZC
876 error_report("colo-compare secondary_in error");
877 }
878}
879
13025fee
ZC
880static void compare_notify_chr(void *opaque, const uint8_t *buf, int size)
881{
882 CompareState *s = COLO_COMPARE(opaque);
883 int ret;
884
885 ret = net_fill_rstate(&s->notify_rs, buf, size);
886 if (ret == -1) {
887 qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL,
888 NULL, NULL, true);
889 error_report("colo-compare notify_dev error");
890 }
891}
892
66d2a242
HZ
893/*
894 * Check old packet regularly so it can watch for any packets
895 * that the secondary hasn't produced equivalents of.
896 */
dd321ecf 897static void check_old_packet_regular(void *opaque)
66d2a242
HZ
898{
899 CompareState *s = opaque;
900
901 /* if have old packet we will notify checkpoint */
902 colo_old_packet_check(s);
dd321ecf 903 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
cca35ac4 904 s->expired_scan_cycle);
dd321ecf
WY
905}
906
0ffcece3
ZC
907/* Public API, Used for COLO frame to notify compare event */
908void colo_notify_compares_event(void *opaque, int event, Error **errp)
909{
910 CompareState *s;
45942b79
LS
911 qemu_mutex_lock(&colo_compare_mutex);
912
913 if (!colo_compare_active) {
914 qemu_mutex_unlock(&colo_compare_mutex);
915 return;
916 }
0ffcece3
ZC
917
918 qemu_mutex_lock(&event_mtx);
919 QTAILQ_FOREACH(s, &net_compares, next) {
920 s->event = event;
921 qemu_bh_schedule(s->event_bh);
922 event_unhandled_count++;
923 }
924 /* Wait all compare threads to finish handling this event */
925 while (event_unhandled_count > 0) {
926 qemu_cond_wait(&event_complete_cond, &event_mtx);
927 }
928
929 qemu_mutex_unlock(&event_mtx);
45942b79 930 qemu_mutex_unlock(&colo_compare_mutex);
0ffcece3
ZC
931}
932
dd321ecf
WY
933static void colo_compare_timer_init(CompareState *s)
934{
935 AioContext *ctx = iothread_get_aio_context(s->iothread);
66d2a242 936
dd321ecf
WY
937 s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
938 SCALE_MS, check_old_packet_regular,
939 s);
940 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
cca35ac4 941 s->expired_scan_cycle);
66d2a242
HZ
942}
943
dd321ecf 944static void colo_compare_timer_del(CompareState *s)
0682e15b 945{
dd321ecf
WY
946 if (s->packet_check_timer) {
947 timer_del(s->packet_check_timer);
948 timer_free(s->packet_check_timer);
949 s->packet_check_timer = NULL;
950 }
951 }
0682e15b 952
0ffcece3
ZC
953static void colo_flush_packets(void *opaque, void *user_data);
954
955static void colo_compare_handle_event(void *opaque)
956{
957 CompareState *s = opaque;
958
959 switch (s->event) {
960 case COLO_EVENT_CHECKPOINT:
961 g_queue_foreach(&s->conn_list, colo_flush_packets, s);
962 break;
963 case COLO_EVENT_FAILOVER:
964 break;
965 default:
966 break;
967 }
968
0ffcece3 969 qemu_mutex_lock(&event_mtx);
78e4f446 970 assert(event_unhandled_count > 0);
0ffcece3
ZC
971 event_unhandled_count--;
972 qemu_cond_broadcast(&event_complete_cond);
973 qemu_mutex_unlock(&event_mtx);
974}
975
dd321ecf
WY
976static void colo_compare_iothread(CompareState *s)
977{
5893c738 978 AioContext *ctx = iothread_get_aio_context(s->iothread);
dd321ecf
WY
979 object_ref(OBJECT(s->iothread));
980 s->worker_context = iothread_get_g_main_context(s->iothread);
0682e15b 981
5345fdb4 982 qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
81517ba3
AN
983 compare_pri_chr_in, NULL, NULL,
984 s, s->worker_context, true);
5345fdb4 985 qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
81517ba3
AN
986 compare_sec_chr_in, NULL, NULL,
987 s, s->worker_context, true);
13025fee
ZC
988 if (s->notify_dev) {
989 qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read,
990 compare_notify_chr, NULL, NULL,
991 s, s->worker_context, true);
992 }
0682e15b 993
dd321ecf 994 colo_compare_timer_init(s);
5893c738 995 s->event_bh = aio_bh_new(ctx, colo_compare_handle_event, s);
0682e15b
ZC
996}
997
7dce4e6f
ZC
998static char *compare_get_pri_indev(Object *obj, Error **errp)
999{
1000 CompareState *s = COLO_COMPARE(obj);
1001
1002 return g_strdup(s->pri_indev);
1003}
1004
1005static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
1006{
1007 CompareState *s = COLO_COMPARE(obj);
1008
1009 g_free(s->pri_indev);
1010 s->pri_indev = g_strdup(value);
1011}
1012
1013static char *compare_get_sec_indev(Object *obj, Error **errp)
1014{
1015 CompareState *s = COLO_COMPARE(obj);
1016
1017 return g_strdup(s->sec_indev);
1018}
1019
1020static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
1021{
1022 CompareState *s = COLO_COMPARE(obj);
1023
1024 g_free(s->sec_indev);
1025 s->sec_indev = g_strdup(value);
1026}
1027
1028static char *compare_get_outdev(Object *obj, Error **errp)
1029{
1030 CompareState *s = COLO_COMPARE(obj);
1031
1032 return g_strdup(s->outdev);
1033}
1034
1035static void compare_set_outdev(Object *obj, const char *value, Error **errp)
1036{
1037 CompareState *s = COLO_COMPARE(obj);
1038
1039 g_free(s->outdev);
1040 s->outdev = g_strdup(value);
1041}
1042
aa3a7032
ZC
1043static bool compare_get_vnet_hdr(Object *obj, Error **errp)
1044{
1045 CompareState *s = COLO_COMPARE(obj);
1046
1047 return s->vnet_hdr;
1048}
1049
1050static void compare_set_vnet_hdr(Object *obj,
1051 bool value,
1052 Error **errp)
1053{
1054 CompareState *s = COLO_COMPARE(obj);
1055
1056 s->vnet_hdr = value;
1057}
1058
cf6af766
ZC
1059static char *compare_get_notify_dev(Object *obj, Error **errp)
1060{
1061 CompareState *s = COLO_COMPARE(obj);
1062
1063 return g_strdup(s->notify_dev);
1064}
1065
1066static void compare_set_notify_dev(Object *obj, const char *value, Error **errp)
1067{
1068 CompareState *s = COLO_COMPARE(obj);
1069
1070 g_free(s->notify_dev);
1071 s->notify_dev = g_strdup(value);
1072}
1073
9cc43c94
ZC
1074static void compare_get_timeout(Object *obj, Visitor *v,
1075 const char *name, void *opaque,
1076 Error **errp)
1077{
1078 CompareState *s = COLO_COMPARE(obj);
1079 uint32_t value = s->compare_timeout;
1080
1081 visit_type_uint32(v, name, &value, errp);
1082}
1083
1084static void compare_set_timeout(Object *obj, Visitor *v,
1085 const char *name, void *opaque,
1086 Error **errp)
1087{
1088 CompareState *s = COLO_COMPARE(obj);
1089 Error *local_err = NULL;
1090 uint32_t value;
1091
1092 visit_type_uint32(v, name, &value, &local_err);
1093 if (local_err) {
1094 goto out;
1095 }
1096 if (!value) {
1097 error_setg(&local_err, "Property '%s.%s' requires a positive value",
1098 object_get_typename(obj), name);
1099 goto out;
1100 }
1101 s->compare_timeout = value;
1102
1103out:
1104 error_propagate(errp, local_err);
1105}
1106
cca35ac4
ZC
1107static void compare_get_expired_scan_cycle(Object *obj, Visitor *v,
1108 const char *name, void *opaque,
1109 Error **errp)
1110{
1111 CompareState *s = COLO_COMPARE(obj);
1112 uint32_t value = s->expired_scan_cycle;
1113
1114 visit_type_uint32(v, name, &value, errp);
1115}
1116
1117static void compare_set_expired_scan_cycle(Object *obj, Visitor *v,
1118 const char *name, void *opaque,
1119 Error **errp)
1120{
1121 CompareState *s = COLO_COMPARE(obj);
1122 Error *local_err = NULL;
1123 uint32_t value;
1124
1125 visit_type_uint32(v, name, &value, &local_err);
1126 if (local_err) {
1127 goto out;
1128 }
1129 if (!value) {
1130 error_setg(&local_err, "Property '%s.%s' requires a positive value",
1131 object_get_typename(obj), name);
1132 goto out;
1133 }
1134 s->expired_scan_cycle = value;
1135
1136out:
1137 error_propagate(errp, local_err);
1138}
1139
7dce4e6f
ZC
1140static void compare_pri_rs_finalize(SocketReadState *pri_rs)
1141{
59509ec1 1142 CompareState *s = container_of(pri_rs, CompareState, pri_rs);
8ec14402 1143 Connection *conn = NULL;
59509ec1 1144
8ec14402 1145 if (packet_enqueue(s, PRIMARY_IN, &conn)) {
59509ec1 1146 trace_colo_compare_main("primary: unsupported packet in");
aa3a7032
ZC
1147 compare_chr_send(s,
1148 pri_rs->buf,
1149 pri_rs->packet_len,
30685c00 1150 pri_rs->vnet_hdr_len,
9c55fe94 1151 false,
30685c00 1152 false);
0682e15b 1153 } else {
3463218c 1154 /* compare packet in the specified connection */
8ec14402 1155 colo_compare_connection(conn, s);
59509ec1 1156 }
7dce4e6f
ZC
1157}
1158
1159static void compare_sec_rs_finalize(SocketReadState *sec_rs)
1160{
59509ec1 1161 CompareState *s = container_of(sec_rs, CompareState, sec_rs);
8ec14402 1162 Connection *conn = NULL;
59509ec1 1163
8ec14402 1164 if (packet_enqueue(s, SECONDARY_IN, &conn)) {
59509ec1 1165 trace_colo_compare_main("secondary: unsupported packet in");
0682e15b 1166 } else {
3463218c 1167 /* compare packet in the specified connection */
8ec14402 1168 colo_compare_connection(conn, s);
59509ec1 1169 }
7dce4e6f
ZC
1170}
1171
13025fee
ZC
1172static void compare_notify_rs_finalize(SocketReadState *notify_rs)
1173{
1d09f700
ZC
1174 CompareState *s = container_of(notify_rs, CompareState, notify_rs);
1175
f77bed14 1176 const char msg[] = "COLO_COMPARE_GET_XEN_INIT";
1d09f700
ZC
1177 int ret;
1178
f77bed14
ZC
1179 if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
1180 notify_rs->buf,
1181 notify_rs->packet_len)) {
9c55fe94 1182 ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
1d09f700
ZC
1183 if (ret < 0) {
1184 error_report("Notify Xen COLO-frame INIT failed");
1185 }
f77bed14
ZC
1186 } else if (packet_matches_str("COLO_CHECKPOINT",
1187 notify_rs->buf,
1188 notify_rs->packet_len)) {
1d09f700
ZC
1189 /* colo-compare do checkpoint, flush pri packet and remove sec packet */
1190 g_queue_foreach(&s->conn_list, colo_flush_packets, s);
f77bed14
ZC
1191 } else {
1192 error_report("COLO compare got unsupported instruction");
1d09f700 1193 }
13025fee 1194}
7dce4e6f
ZC
1195
1196/*
1197 * Return 0 is success.
1198 * Return 1 is failed.
1199 */
0ec7b3e7 1200static int find_and_check_chardev(Chardev **chr,
7dce4e6f
ZC
1201 char *chr_name,
1202 Error **errp)
1203{
7dce4e6f
ZC
1204 *chr = qemu_chr_find(chr_name);
1205 if (*chr == NULL) {
1206 error_setg(errp, "Device '%s' not found",
1207 chr_name);
1208 return 1;
1209 }
1210
0a73336d
DB
1211 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
1212 error_setg(errp, "chardev \"%s\" is not reconnectable",
7dce4e6f
ZC
1213 chr_name);
1214 return 1;
1215 }
fbf3cc3a 1216
269d25cd
MAL
1217 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_GCONTEXT)) {
1218 error_setg(errp, "chardev \"%s\" cannot switch context",
1219 chr_name);
1220 return 1;
1221 }
1222
7dce4e6f
ZC
1223 return 0;
1224}
1225
1226/*
1227 * Called from the main thread on the primary
1228 * to setup colo-compare.
1229 */
1230static void colo_compare_complete(UserCreatable *uc, Error **errp)
1231{
1232 CompareState *s = COLO_COMPARE(uc);
0ec7b3e7 1233 Chardev *chr;
7dce4e6f 1234
dd321ecf 1235 if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
7dce4e6f 1236 error_setg(errp, "colo compare needs 'primary_in' ,"
dd321ecf 1237 "'secondary_in','outdev','iothread' property set");
7dce4e6f
ZC
1238 return;
1239 } else if (!strcmp(s->pri_indev, s->outdev) ||
1240 !strcmp(s->sec_indev, s->outdev) ||
1241 !strcmp(s->pri_indev, s->sec_indev)) {
1242 error_setg(errp, "'indev' and 'outdev' could not be same "
1243 "for compare module");
1244 return;
1245 }
1246
9cc43c94
ZC
1247 if (!s->compare_timeout) {
1248 /* Set default value to 3000 MS */
1249 s->compare_timeout = DEFAULT_TIME_OUT_MS;
1250 }
1251
cca35ac4
ZC
1252 if (!s->expired_scan_cycle) {
1253 /* Set default value to 3000 MS */
1254 s->expired_scan_cycle = REGULAR_PACKET_CHECK_MS;
1255 }
1256
5345fdb4
MAL
1257 if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
1258 !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
7dce4e6f
ZC
1259 return;
1260 }
1261
5345fdb4
MAL
1262 if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
1263 !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
7dce4e6f
ZC
1264 return;
1265 }
1266
5345fdb4
MAL
1267 if (find_and_check_chardev(&chr, s->outdev, errp) ||
1268 !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
7dce4e6f
ZC
1269 return;
1270 }
1271
aa3a7032
ZC
1272 net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
1273 net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
7dce4e6f 1274
13025fee
ZC
1275 /* Try to enable remote notify chardev, currently just for Xen COLO */
1276 if (s->notify_dev) {
1277 if (find_and_check_chardev(&chr, s->notify_dev, errp) ||
1278 !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) {
1279 return;
1280 }
1281
1282 net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize,
1283 s->vnet_hdr);
1284 }
1285
9c55fe94
LS
1286 s->out_sendco.s = s;
1287 s->out_sendco.chr = &s->chr_out;
1288 s->out_sendco.notify_remote_frame = false;
1289 s->out_sendco.done = true;
1290 g_queue_init(&s->out_sendco.send_list);
1291
1292 if (s->notify_dev) {
1293 s->notify_sendco.s = s;
1294 s->notify_sendco.chr = &s->chr_notify_dev;
1295 s->notify_sendco.notify_remote_frame = true;
1296 s->notify_sendco.done = true;
1297 g_queue_init(&s->notify_sendco.send_list);
1298 }
1299
b6540d40
ZC
1300 g_queue_init(&s->conn_list);
1301
1302 s->connection_track_table = g_hash_table_new_full(connection_key_hash,
1303 connection_key_equal,
1304 g_free,
1305 connection_destroy);
59509ec1 1306
dd321ecf 1307 colo_compare_iothread(s);
5bd57eba
LS
1308
1309 qemu_mutex_lock(&colo_compare_mutex);
1310 if (!colo_compare_active) {
1311 qemu_mutex_init(&event_mtx);
1312 qemu_cond_init(&event_complete_cond);
1313 colo_compare_active = true;
1314 }
1315 QTAILQ_INSERT_TAIL(&net_compares, s, next);
1316 qemu_mutex_unlock(&colo_compare_mutex);
1317
7dce4e6f
ZC
1318 return;
1319}
1320
dfd917a9
HZ
1321static void colo_flush_packets(void *opaque, void *user_data)
1322{
1323 CompareState *s = user_data;
1324 Connection *conn = opaque;
1325 Packet *pkt = NULL;
1326
1327 while (!g_queue_is_empty(&conn->primary_list)) {
1328 pkt = g_queue_pop_head(&conn->primary_list);
aa3a7032
ZC
1329 compare_chr_send(s,
1330 pkt->data,
1331 pkt->size,
30685c00 1332 pkt->vnet_hdr_len,
9c55fe94
LS
1333 false,
1334 true);
1335 packet_destroy_partial(pkt, NULL);
dfd917a9
HZ
1336 }
1337 while (!g_queue_is_empty(&conn->secondary_list)) {
1338 pkt = g_queue_pop_head(&conn->secondary_list);
1339 packet_destroy(pkt, NULL);
1340 }
1341}
1342
7dce4e6f
ZC
1343static void colo_compare_class_init(ObjectClass *oc, void *data)
1344{
1345 UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
1346
1347 ucc->complete = colo_compare_complete;
1348}
1349
1350static void colo_compare_init(Object *obj)
1351{
aa3a7032
ZC
1352 CompareState *s = COLO_COMPARE(obj);
1353
7dce4e6f 1354 object_property_add_str(obj, "primary_in",
d2623129 1355 compare_get_pri_indev, compare_set_pri_indev);
7dce4e6f 1356 object_property_add_str(obj, "secondary_in",
d2623129 1357 compare_get_sec_indev, compare_set_sec_indev);
7dce4e6f 1358 object_property_add_str(obj, "outdev",
d2623129 1359 compare_get_outdev, compare_set_outdev);
dd321ecf
WY
1360 object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
1361 (Object **)&s->iothread,
1362 object_property_allow_set_link,
d2623129 1363 OBJ_PROP_LINK_STRONG);
cf6af766
ZC
1364 /* This parameter just for Xen COLO */
1365 object_property_add_str(obj, "notify_dev",
d2623129 1366 compare_get_notify_dev, compare_set_notify_dev);
aa3a7032 1367
9cc43c94
ZC
1368 object_property_add(obj, "compare_timeout", "uint32",
1369 compare_get_timeout,
d2623129 1370 compare_set_timeout, NULL, NULL);
9cc43c94 1371
cca35ac4
ZC
1372 object_property_add(obj, "expired_scan_cycle", "uint32",
1373 compare_get_expired_scan_cycle,
d2623129 1374 compare_set_expired_scan_cycle, NULL, NULL);
cca35ac4 1375
aa3a7032
ZC
1376 s->vnet_hdr = false;
1377 object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
d2623129 1378 compare_set_vnet_hdr);
7dce4e6f
ZC
1379}
1380
1381static void colo_compare_finalize(Object *obj)
1382{
1383 CompareState *s = COLO_COMPARE(obj);
0ffcece3 1384 CompareState *tmp = NULL;
7dce4e6f 1385
45942b79 1386 qemu_mutex_lock(&colo_compare_mutex);
0ffcece3
ZC
1387 QTAILQ_FOREACH(tmp, &net_compares, next) {
1388 if (tmp == s) {
1389 QTAILQ_REMOVE(&net_compares, s, next);
1390 break;
1391 }
1392 }
45942b79
LS
1393 if (QTAILQ_EMPTY(&net_compares)) {
1394 colo_compare_active = false;
1395 qemu_mutex_destroy(&event_mtx);
1396 qemu_cond_destroy(&event_complete_cond);
1397 }
1398 qemu_mutex_unlock(&colo_compare_mutex);
0ffcece3 1399
5bd57eba
LS
1400 qemu_chr_fe_deinit(&s->chr_pri_in, false);
1401 qemu_chr_fe_deinit(&s->chr_sec_in, false);
1402 qemu_chr_fe_deinit(&s->chr_out, false);
1403 if (s->notify_dev) {
1404 qemu_chr_fe_deinit(&s->chr_notify_dev, false);
1405 }
1406
1407 if (s->iothread) {
1408 colo_compare_timer_del(s);
1409 }
1410
1411 qemu_bh_delete(s->event_bh);
1412
9c55fe94
LS
1413 AioContext *ctx = iothread_get_aio_context(s->iothread);
1414 aio_context_acquire(ctx);
1415 AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
1416 if (s->notify_dev) {
1417 AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
1418 }
1419 aio_context_release(ctx);
1420
dfd917a9
HZ
1421 /* Release all unhandled packets after compare thead exited */
1422 g_queue_foreach(&s->conn_list, colo_flush_packets, s);
9c55fe94 1423 AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
dfd917a9 1424
727c2d76 1425 g_queue_clear(&s->conn_list);
9c55fe94
LS
1426 g_queue_clear(&s->out_sendco.send_list);
1427 if (s->notify_dev) {
1428 g_queue_clear(&s->notify_sendco.send_list);
1429 }
0682e15b 1430
dd321ecf
WY
1431 if (s->connection_track_table) {
1432 g_hash_table_destroy(s->connection_track_table);
1433 }
1434
1435 if (s->iothread) {
1436 object_unref(OBJECT(s->iothread));
1437 }
0ffcece3 1438
7dce4e6f
ZC
1439 g_free(s->pri_indev);
1440 g_free(s->sec_indev);
1441 g_free(s->outdev);
cf6af766 1442 g_free(s->notify_dev);
7dce4e6f
ZC
1443}
1444
45942b79
LS
1445static void __attribute__((__constructor__)) colo_compare_init_globals(void)
1446{
1447 colo_compare_active = false;
1448 qemu_mutex_init(&colo_compare_mutex);
1449}
1450
7dce4e6f
ZC
1451static const TypeInfo colo_compare_info = {
1452 .name = TYPE_COLO_COMPARE,
1453 .parent = TYPE_OBJECT,
1454 .instance_size = sizeof(CompareState),
1455 .instance_init = colo_compare_init,
1456 .instance_finalize = colo_compare_finalize,
1457 .class_size = sizeof(CompareClass),
1458 .class_init = colo_compare_class_init,
1459 .interfaces = (InterfaceInfo[]) {
1460 { TYPE_USER_CREATABLE },
1461 { }
1462 }
1463};
1464
1465static void register_types(void)
1466{
1467 type_register_static(&colo_compare_info);
1468}
1469
1470type_init(register_types);
This page took 0.39335 seconds and 4 git commands to generate.