]> Git Repo - qemu.git/blame - net/colo-compare.c
trace: avoid SystemTap "char const" warnings
[qemu.git] / net / colo-compare.c
CommitLineData
7dce4e6f
ZC
1/*
2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3 * (a.k.a. Fault Tolerance or Continuous Replication)
4 *
5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6 * Copyright (c) 2016 FUJITSU LIMITED
7 * Copyright (c) 2016 Intel Corporation
8 *
9 * Author: Zhang Chen <[email protected]>
10 *
11 * This work is licensed under the terms of the GNU GPL, version 2 or
12 * later. See the COPYING file in the top-level directory.
13 */
14
15#include "qemu/osdep.h"
16#include "qemu/error-report.h"
59509ec1 17#include "trace.h"
7dce4e6f 18#include "qemu-common.h"
7dce4e6f
ZC
19#include "qapi/error.h"
20#include "net/net.h"
f4b61836 21#include "net/eth.h"
7dce4e6f
ZC
22#include "qom/object_interfaces.h"
23#include "qemu/iov.h"
24#include "qom/object.h"
7dce4e6f 25#include "net/queue.h"
4d43a603 26#include "chardev/char-fe.h"
7dce4e6f
ZC
27#include "qemu/sockets.h"
28#include "qapi-visit.h"
59509ec1 29#include "net/colo.h"
dd321ecf 30#include "sysemu/iothread.h"
7dce4e6f
ZC
31
32#define TYPE_COLO_COMPARE "colo-compare"
33#define COLO_COMPARE(obj) \
34 OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
35
0682e15b 36#define COMPARE_READ_LEN_MAX NET_BUFSIZE
b6540d40
ZC
37#define MAX_QUEUE_SIZE 1024
38
f449c9e5
MZ
39#define COLO_COMPARE_FREE_PRIMARY 0x01
40#define COLO_COMPARE_FREE_SECONDARY 0x02
41
0682e15b
ZC
42/* TODO: Should be configurable */
43#define REGULAR_PACKET_CHECK_MS 3000
44
59509ec1 45/*
61c5f469
ZC
46 * + CompareState ++
47 * | |
48 * +---------------+ +---------------+ +---------------+
49 * | conn list + - > conn + ------- > conn + -- > ......
50 * +---------------+ +---------------+ +---------------+
51 * | | | | | |
52 * +---------------+ +---v----+ +---v----+ +---v----+ +---v----+
53 * |primary | |secondary |primary | |secondary
54 * |packet | |packet + |packet | |packet +
55 * +--------+ +--------+ +--------+ +--------+
56 * | | | |
57 * +---v----+ +---v----+ +---v----+ +---v----+
58 * |primary | |secondary |primary | |secondary
59 * |packet | |packet + |packet | |packet +
60 * +--------+ +--------+ +--------+ +--------+
61 * | | | |
62 * +---v----+ +---v----+ +---v----+ +---v----+
63 * |primary | |secondary |primary | |secondary
64 * |packet | |packet + |packet | |packet +
65 * +--------+ +--------+ +--------+ +--------+
66 */
7dce4e6f
ZC
67typedef struct CompareState {
68 Object parent;
69
70 char *pri_indev;
71 char *sec_indev;
72 char *outdev;
32a6ebec
MAL
73 CharBackend chr_pri_in;
74 CharBackend chr_sec_in;
75 CharBackend chr_out;
7dce4e6f
ZC
76 SocketReadState pri_rs;
77 SocketReadState sec_rs;
aa3a7032 78 bool vnet_hdr;
59509ec1 79
61c5f469
ZC
80 /*
81 * Record the connection that through the NIC
82 * Element type: Connection
b6540d40
ZC
83 */
84 GQueue conn_list;
61c5f469 85 /* Record the connection without repetition */
59509ec1 86 GHashTable *connection_track_table;
dfd917a9 87
dd321ecf 88 IOThread *iothread;
b43decb0 89 GMainContext *worker_context;
dd321ecf 90 QEMUTimer *packet_check_timer;
7dce4e6f
ZC
91} CompareState;
92
93typedef struct CompareClass {
94 ObjectClass parent_class;
95} CompareClass;
96
59509ec1
ZC
97enum {
98 PRIMARY_IN = 0,
99 SECONDARY_IN,
100};
101
3037e7a5 102static int compare_chr_send(CompareState *s,
59509ec1 103 const uint8_t *buf,
aa3a7032
ZC
104 uint32_t size,
105 uint32_t vnet_hdr_len);
59509ec1 106
a935cc31
ZC
107static gint seq_sorter(Packet *a, Packet *b, gpointer data)
108{
109 struct tcphdr *atcp, *btcp;
110
111 atcp = (struct tcphdr *)(a->transport_header);
112 btcp = (struct tcphdr *)(b->transport_header);
113 return ntohl(atcp->th_seq) - ntohl(btcp->th_seq);
114}
115
f449c9e5
MZ
116static void fill_pkt_tcp_info(void *data, uint32_t *max_ack)
117{
118 Packet *pkt = data;
119 struct tcphdr *tcphd;
120
121 tcphd = (struct tcphdr *)pkt->transport_header;
122
123 pkt->tcp_seq = ntohl(tcphd->th_seq);
124 pkt->tcp_ack = ntohl(tcphd->th_ack);
125 *max_ack = *max_ack > pkt->tcp_ack ? *max_ack : pkt->tcp_ack;
126 pkt->header_size = pkt->transport_header - (uint8_t *)pkt->data
127 + (tcphd->th_off << 2) - pkt->vnet_hdr_len;
128 pkt->payload_size = pkt->size - pkt->header_size;
129 pkt->seq_end = pkt->tcp_seq + pkt->payload_size;
130 pkt->flags = tcphd->th_flags;
131}
132
8850d4ca
MZ
133/*
134 * Return 1 on success, if return 0 means the
135 * packet will be dropped
136 */
f449c9e5 137static int colo_insert_packet(GQueue *queue, Packet *pkt, uint32_t *max_ack)
8850d4ca
MZ
138{
139 if (g_queue_get_length(queue) <= MAX_QUEUE_SIZE) {
140 if (pkt->ip->ip_p == IPPROTO_TCP) {
f449c9e5 141 fill_pkt_tcp_info(pkt, max_ack);
8850d4ca
MZ
142 g_queue_insert_sorted(queue,
143 pkt,
144 (GCompareDataFunc)seq_sorter,
145 NULL);
146 } else {
147 g_queue_push_tail(queue, pkt);
148 }
149 return 1;
150 }
151 return 0;
152}
153
59509ec1
ZC
154/*
155 * Return 0 on success, if return -1 means the pkt
156 * is unsupported(arp and ipv6) and will be sent later
157 */
8ec14402 158static int packet_enqueue(CompareState *s, int mode, Connection **con)
59509ec1 159{
b6540d40 160 ConnectionKey key;
59509ec1 161 Packet *pkt = NULL;
b6540d40 162 Connection *conn;
59509ec1
ZC
163
164 if (mode == PRIMARY_IN) {
ada1a33f
ZC
165 pkt = packet_new(s->pri_rs.buf,
166 s->pri_rs.packet_len,
167 s->pri_rs.vnet_hdr_len);
59509ec1 168 } else {
ada1a33f
ZC
169 pkt = packet_new(s->sec_rs.buf,
170 s->sec_rs.packet_len,
171 s->sec_rs.vnet_hdr_len);
59509ec1
ZC
172 }
173
174 if (parse_packet_early(pkt)) {
175 packet_destroy(pkt, NULL);
176 pkt = NULL;
177 return -1;
178 }
b6540d40 179 fill_connection_key(pkt, &key);
59509ec1 180
b6540d40
ZC
181 conn = connection_get(s->connection_track_table,
182 &key,
183 &s->conn_list);
59509ec1 184
b6540d40
ZC
185 if (!conn->processing) {
186 g_queue_push_tail(&s->conn_list, conn);
187 conn->processing = true;
188 }
189
190 if (mode == PRIMARY_IN) {
f449c9e5 191 if (!colo_insert_packet(&conn->primary_list, pkt, &conn->pack)) {
b6540d40
ZC
192 error_report("colo compare primary queue size too big,"
193 "drop packet");
194 }
195 } else {
f449c9e5 196 if (!colo_insert_packet(&conn->secondary_list, pkt, &conn->sack)) {
b6540d40
ZC
197 error_report("colo compare secondary queue size too big,"
198 "drop packet");
199 }
200 }
4d366235 201 *con = conn;
59509ec1
ZC
202
203 return 0;
204}
205
f449c9e5
MZ
206static inline bool after(uint32_t seq1, uint32_t seq2)
207{
208 return (int32_t)(seq1 - seq2) > 0;
209}
210
211static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
212{
213 int ret;
214 ret = compare_chr_send(s,
215 pkt->data,
216 pkt->size,
217 pkt->vnet_hdr_len);
218 if (ret < 0) {
219 error_report("colo send primary packet failed");
220 }
221 trace_colo_compare_main("packet same and release packet");
222 packet_destroy(pkt, NULL);
223}
224
0682e15b
ZC
225/*
226 * The IP packets sent by primary and secondary
227 * will be compared in here
228 * TODO support ip fragment, Out-Of-Order
229 * return: 0 means packet same
230 * > 0 || < 0 means packet different
231 */
9394133f
MZ
232static int colo_compare_packet_payload(Packet *ppkt,
233 Packet *spkt,
234 uint16_t poffset,
235 uint16_t soffset,
236 uint16_t len)
237
0682e15b 238{
d87aa138 239 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
e630b2bf
ZC
240 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
241
242 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
243 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
244 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
245 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
246
247 trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
248 pri_ip_dst, spkt->size,
249 sec_ip_src, sec_ip_dst);
250 }
0682e15b 251
9394133f 252 return memcmp(ppkt->data + poffset, spkt->data + soffset, len);
0682e15b
ZC
253}
254
f4b61836 255/*
f449c9e5
MZ
256 * return true means that the payload is consist and
257 * need to make the next comparison, false means do
258 * the checkpoint
259*/
260static bool colo_mark_tcp_pkt(Packet *ppkt, Packet *spkt,
261 int8_t *mark, uint32_t max_ack)
0682e15b 262{
f449c9e5
MZ
263 *mark = 0;
264
265 if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
266 if (colo_compare_packet_payload(ppkt, spkt,
267 ppkt->header_size, spkt->header_size,
268 ppkt->payload_size)) {
269 *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
270 return true;
271 }
272 }
273 if (ppkt->tcp_seq == spkt->tcp_seq && ppkt->seq_end == spkt->seq_end) {
274 if (colo_compare_packet_payload(ppkt, spkt,
275 ppkt->header_size, spkt->header_size,
276 ppkt->payload_size)) {
277 *mark = COLO_COMPARE_FREE_SECONDARY | COLO_COMPARE_FREE_PRIMARY;
278 return true;
279 }
280 }
f4b61836 281
f449c9e5
MZ
282 /* one part of secondary packet payload still need to be compared */
283 if (!after(ppkt->seq_end, spkt->seq_end)) {
284 if (colo_compare_packet_payload(ppkt, spkt,
285 ppkt->header_size + ppkt->offset,
286 spkt->header_size + spkt->offset,
287 ppkt->payload_size - ppkt->offset)) {
288 if (!after(ppkt->tcp_ack, max_ack)) {
289 *mark = COLO_COMPARE_FREE_PRIMARY;
290 spkt->offset += ppkt->payload_size - ppkt->offset;
291 return true;
292 } else {
293 /* secondary guest hasn't ack the data, don't send
294 * out this packet
295 */
296 return false;
297 }
298 }
299 } else {
300 /* primary packet is longer than secondary packet, compare
301 * the same part and mark the primary packet offset
302 */
303 if (colo_compare_packet_payload(ppkt, spkt,
304 ppkt->header_size + ppkt->offset,
305 spkt->header_size + spkt->offset,
306 spkt->payload_size - spkt->offset)) {
307 *mark = COLO_COMPARE_FREE_SECONDARY;
308 ppkt->offset += spkt->payload_size - spkt->offset;
309 return true;
310 }
311 }
312
313 return false;
314}
2ad7ca4c 315
f449c9e5
MZ
316static void colo_compare_tcp(CompareState *s, Connection *conn)
317{
318 Packet *ppkt = NULL, *spkt = NULL;
319 int8_t mark;
f4b61836
ZC
320
321 /*
f449c9e5
MZ
322 * If ppkt and spkt have the same payload, but ppkt's ACK
323 * is greater than spkt's ACK, in this case we can not
324 * send the ppkt because it will cause the secondary guest
325 * to miss sending some data in the next. Therefore, we
326 * record the maximum ACK in the current queue at both
327 * primary side and secondary side. Only when the ack is
328 * less than the smaller of the two maximum ack, then we
329 * can ensure that the packet's payload is acknowledged by
330 * primary and secondary.
331 */
332 uint32_t min_ack = conn->pack > conn->sack ? conn->sack : conn->pack;
333
334pri:
335 if (g_queue_is_empty(&conn->primary_list)) {
336 return;
337 }
338 ppkt = g_queue_pop_head(&conn->primary_list);
339sec:
340 if (g_queue_is_empty(&conn->secondary_list)) {
341 g_queue_push_head(&conn->primary_list, ppkt);
342 return;
f4b61836 343 }
f449c9e5 344 spkt = g_queue_pop_head(&conn->secondary_list);
f4b61836 345
f449c9e5
MZ
346 if (ppkt->tcp_seq == ppkt->seq_end) {
347 colo_release_primary_pkt(s, ppkt);
348 ppkt = NULL;
349 }
9394133f 350
f449c9e5
MZ
351 if (ppkt && conn->compare_seq && !after(ppkt->seq_end, conn->compare_seq)) {
352 trace_colo_compare_main("pri: this packet has compared");
353 colo_release_primary_pkt(s, ppkt);
354 ppkt = NULL;
355 }
9394133f 356
f449c9e5
MZ
357 if (spkt->tcp_seq == spkt->seq_end) {
358 packet_destroy(spkt, NULL);
359 if (!ppkt) {
360 goto pri;
361 } else {
362 goto sec;
363 }
6efeb328 364 } else {
f449c9e5
MZ
365 if (conn->compare_seq && !after(spkt->seq_end, conn->compare_seq)) {
366 trace_colo_compare_main("sec: this packet has compared");
367 packet_destroy(spkt, NULL);
368 if (!ppkt) {
369 goto pri;
370 } else {
371 goto sec;
372 }
373 }
374 if (!ppkt) {
375 g_queue_push_head(&conn->secondary_list, spkt);
376 goto pri;
377 }
6efeb328 378 }
f4b61836 379
f449c9e5
MZ
380 if (colo_mark_tcp_pkt(ppkt, spkt, &mark, min_ack)) {
381 trace_colo_compare_tcp_info("pri",
382 ppkt->tcp_seq, ppkt->tcp_ack,
383 ppkt->header_size, ppkt->payload_size,
384 ppkt->offset, ppkt->flags);
385
386 trace_colo_compare_tcp_info("sec",
387 spkt->tcp_seq, spkt->tcp_ack,
388 spkt->header_size, spkt->payload_size,
389 spkt->offset, spkt->flags);
390
391 if (mark == COLO_COMPARE_FREE_PRIMARY) {
392 conn->compare_seq = ppkt->seq_end;
393 colo_release_primary_pkt(s, ppkt);
394 g_queue_push_head(&conn->secondary_list, spkt);
395 goto pri;
396 }
397 if (mark == COLO_COMPARE_FREE_SECONDARY) {
398 conn->compare_seq = spkt->seq_end;
399 packet_destroy(spkt, NULL);
400 goto sec;
401 }
402 if (mark == (COLO_COMPARE_FREE_PRIMARY | COLO_COMPARE_FREE_SECONDARY)) {
403 conn->compare_seq = ppkt->seq_end;
404 colo_release_primary_pkt(s, ppkt);
405 packet_destroy(spkt, NULL);
406 goto pri;
407 }
408 } else {
409 g_queue_push_head(&conn->primary_list, ppkt);
410 g_queue_push_head(&conn->secondary_list, spkt);
2061c14c
ZC
411
412 qemu_hexdump((char *)ppkt->data, stderr,
413 "colo-compare ppkt", ppkt->size);
414 qemu_hexdump((char *)spkt->data, stderr,
415 "colo-compare spkt", spkt->size);
f4b61836 416
f449c9e5
MZ
417 /*
418 * colo_compare_inconsistent_notify();
419 * TODO: notice to checkpoint();
420 */
421 }
f4b61836
ZC
422}
423
f449c9e5 424
f4b61836
ZC
425/*
426 * Called from the compare thread on the primary
427 * for compare udp packet
428 */
429static int colo_packet_compare_udp(Packet *spkt, Packet *ppkt)
430{
9394133f
MZ
431 uint16_t network_header_length = ppkt->ip->ip_hl << 2;
432 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
f4b61836
ZC
433
434 trace_colo_compare_main("compare udp");
2ad7ca4c 435
6efeb328
ZC
436 /*
437 * Because of ppkt and spkt are both in the same connection,
438 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
439 * same with spkt. In addition, IP header's Identification is a random
440 * field, we can handle it in IP fragmentation function later.
441 * COLO just concern the response net packet payload from primary guest
442 * and secondary guest are same or not, So we ignored all IP header include
443 * other field like TOS,TTL,IP Checksum. we only need to compare
444 * the ip payload here.
445 */
9394133f
MZ
446 if (ppkt->size != spkt->size) {
447 trace_colo_compare_main("UDP: payload size of packets are different");
448 return -1;
449 }
450 if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
451 ppkt->size - offset)) {
f4b61836 452 trace_colo_compare_udp_miscompare("primary pkt size", ppkt->size);
f4b61836 453 trace_colo_compare_udp_miscompare("Secondary pkt size", spkt->size);
d87aa138 454 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
1723a7f7
ZC
455 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
456 ppkt->size);
457 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
458 spkt->size);
459 }
9394133f
MZ
460 return -1;
461 } else {
462 return 0;
f4b61836 463 }
f4b61836
ZC
464}
465
466/*
467 * Called from the compare thread on the primary
468 * for compare icmp packet
469 */
470static int colo_packet_compare_icmp(Packet *spkt, Packet *ppkt)
471{
9394133f
MZ
472 uint16_t network_header_length = ppkt->ip->ip_hl << 2;
473 uint16_t offset = network_header_length + ETH_HLEN + ppkt->vnet_hdr_len;
6efeb328 474
f4b61836 475 trace_colo_compare_main("compare icmp");
f4b61836 476
6efeb328
ZC
477 /*
478 * Because of ppkt and spkt are both in the same connection,
479 * The ppkt's src ip, dst ip, src port, dst port, ip_proto all are
480 * same with spkt. In addition, IP header's Identification is a random
481 * field, we can handle it in IP fragmentation function later.
482 * COLO just concern the response net packet payload from primary guest
483 * and secondary guest are same or not, So we ignored all IP header include
484 * other field like TOS,TTL,IP Checksum. we only need to compare
485 * the ip payload here.
486 */
9394133f
MZ
487 if (ppkt->size != spkt->size) {
488 trace_colo_compare_main("ICMP: payload size of packets are different");
489 return -1;
490 }
491 if (colo_compare_packet_payload(ppkt, spkt, offset, offset,
492 ppkt->size - offset)) {
f4b61836
ZC
493 trace_colo_compare_icmp_miscompare("primary pkt size",
494 ppkt->size);
f4b61836
ZC
495 trace_colo_compare_icmp_miscompare("Secondary pkt size",
496 spkt->size);
d87aa138 497 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
1723a7f7
ZC
498 qemu_hexdump((char *)ppkt->data, stderr, "colo-compare pri pkt",
499 ppkt->size);
500 qemu_hexdump((char *)spkt->data, stderr, "colo-compare sec pkt",
501 spkt->size);
502 }
f4b61836
ZC
503 return -1;
504 } else {
505 return 0;
506 }
507}
508
509/*
510 * Called from the compare thread on the primary
511 * for compare other packet
512 */
513static int colo_packet_compare_other(Packet *spkt, Packet *ppkt)
514{
9394133f
MZ
515 uint16_t offset = ppkt->vnet_hdr_len;
516
f4b61836 517 trace_colo_compare_main("compare other");
d87aa138 518 if (trace_event_get_state_backends(TRACE_COLO_COMPARE_MISCOMPARE)) {
e630b2bf
ZC
519 char pri_ip_src[20], pri_ip_dst[20], sec_ip_src[20], sec_ip_dst[20];
520
521 strcpy(pri_ip_src, inet_ntoa(ppkt->ip->ip_src));
522 strcpy(pri_ip_dst, inet_ntoa(ppkt->ip->ip_dst));
523 strcpy(sec_ip_src, inet_ntoa(spkt->ip->ip_src));
524 strcpy(sec_ip_dst, inet_ntoa(spkt->ip->ip_dst));
525
526 trace_colo_compare_ip_info(ppkt->size, pri_ip_src,
527 pri_ip_dst, spkt->size,
528 sec_ip_src, sec_ip_dst);
529 }
530
9394133f
MZ
531 if (ppkt->size != spkt->size) {
532 trace_colo_compare_main("Other: payload size of packets are different");
533 return -1;
534 }
535 return colo_compare_packet_payload(ppkt, spkt, offset, offset,
536 ppkt->size - offset);
0682e15b
ZC
537}
538
539static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time)
540{
541 int64_t now = qemu_clock_get_ms(QEMU_CLOCK_HOST);
542
543 if ((now - pkt->creation_ms) > (*check_time)) {
544 trace_colo_old_packet_check_found(pkt->creation_ms);
545 return 0;
546 } else {
547 return 1;
548 }
549}
550
d25a7dab
ZC
551static int colo_old_packet_check_one_conn(Connection *conn,
552 void *user_data)
0682e15b 553{
0682e15b
ZC
554 GList *result = NULL;
555 int64_t check_time = REGULAR_PACKET_CHECK_MS;
556
557 result = g_queue_find_custom(&conn->primary_list,
558 &check_time,
559 (GCompareFunc)colo_old_packet_check_one);
560
561 if (result) {
61c5f469
ZC
562 /* Do checkpoint will flush old packet */
563 /*
564 * TODO: Notify colo frame to do checkpoint.
565 * colo_compare_inconsistent_notify();
566 */
d25a7dab 567 return 0;
0682e15b 568 }
d25a7dab
ZC
569
570 return 1;
0682e15b
ZC
571}
572
573/*
574 * Look for old packets that the secondary hasn't matched,
575 * if we have some then we have to checkpoint to wake
576 * the secondary up.
577 */
578static void colo_old_packet_check(void *opaque)
579{
580 CompareState *s = opaque;
581
d25a7dab
ZC
582 /*
583 * If we find one old packet, stop finding job and notify
584 * COLO frame do checkpoint.
585 */
586 g_queue_find_custom(&s->conn_list, NULL,
587 (GCompareFunc)colo_old_packet_check_one_conn);
0682e15b
ZC
588}
589
f449c9e5
MZ
590static void colo_compare_packet(CompareState *s, Connection *conn,
591 int (*HandlePacket)(Packet *spkt,
592 Packet *ppkt))
0682e15b 593{
0682e15b
ZC
594 Packet *pkt = NULL;
595 GList *result = NULL;
0682e15b
ZC
596
597 while (!g_queue_is_empty(&conn->primary_list) &&
598 !g_queue_is_empty(&conn->secondary_list)) {
626bba98 599 pkt = g_queue_pop_head(&conn->primary_list);
f449c9e5
MZ
600 result = g_queue_find_custom(&conn->secondary_list,
601 pkt, (GCompareFunc)HandlePacket);
0682e15b
ZC
602
603 if (result) {
f449c9e5 604 colo_release_primary_pkt(s, pkt);
0682e15b 605 g_queue_remove(&conn->secondary_list, result->data);
0682e15b
ZC
606 } else {
607 /*
608 * If one packet arrive late, the secondary_list or
609 * primary_list will be empty, so we can't compare it
610 * until next comparison.
611 */
612 trace_colo_compare_main("packet different");
626bba98 613 g_queue_push_head(&conn->primary_list, pkt);
0682e15b
ZC
614 /* TODO: colo_notify_checkpoint();*/
615 break;
616 }
617 }
618}
619
f449c9e5
MZ
620/*
621 * Called from the compare thread on the primary
622 * for compare packet with secondary list of the
623 * specified connection when a new packet was
624 * queued to it.
625 */
626static void colo_compare_connection(void *opaque, void *user_data)
627{
628 CompareState *s = user_data;
629 Connection *conn = opaque;
630
631 switch (conn->ip_proto) {
632 case IPPROTO_TCP:
633 colo_compare_tcp(s, conn);
634 break;
635 case IPPROTO_UDP:
636 colo_compare_packet(s, conn, colo_packet_compare_udp);
637 break;
638 case IPPROTO_ICMP:
639 colo_compare_packet(s, conn, colo_packet_compare_icmp);
640 break;
641 default:
642 colo_compare_packet(s, conn, colo_packet_compare_other);
643 break;
644 }
645}
646
3037e7a5 647static int compare_chr_send(CompareState *s,
59509ec1 648 const uint8_t *buf,
aa3a7032
ZC
649 uint32_t size,
650 uint32_t vnet_hdr_len)
59509ec1
ZC
651{
652 int ret = 0;
653 uint32_t len = htonl(size);
654
655 if (!size) {
656 return 0;
657 }
658
3037e7a5 659 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
59509ec1
ZC
660 if (ret != sizeof(len)) {
661 goto err;
662 }
663
aa3a7032
ZC
664 if (s->vnet_hdr) {
665 /*
666 * We send vnet header len make other module(like filter-redirector)
667 * know how to parse net packet correctly.
668 */
669 len = htonl(vnet_hdr_len);
670 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len));
671 if (ret != sizeof(len)) {
672 goto err;
673 }
674 }
675
3037e7a5 676 ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
59509ec1
ZC
677 if (ret != size) {
678 goto err;
679 }
680
681 return 0;
682
683err:
684 return ret < 0 ? ret : -EIO;
685}
686
0682e15b
ZC
687static int compare_chr_can_read(void *opaque)
688{
689 return COMPARE_READ_LEN_MAX;
690}
691
692/*
693 * Called from the main thread on the primary for packets
694 * arriving over the socket from the primary.
695 */
696static void compare_pri_chr_in(void *opaque, const uint8_t *buf, int size)
697{
698 CompareState *s = COLO_COMPARE(opaque);
699 int ret;
700
701 ret = net_fill_rstate(&s->pri_rs, buf, size);
702 if (ret == -1) {
81517ba3 703 qemu_chr_fe_set_handlers(&s->chr_pri_in, NULL, NULL, NULL, NULL,
39ab61c6 704 NULL, NULL, true);
0682e15b
ZC
705 error_report("colo-compare primary_in error");
706 }
707}
708
709/*
710 * Called from the main thread on the primary for packets
711 * arriving over the socket from the secondary.
712 */
713static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size)
714{
715 CompareState *s = COLO_COMPARE(opaque);
716 int ret;
717
718 ret = net_fill_rstate(&s->sec_rs, buf, size);
719 if (ret == -1) {
81517ba3 720 qemu_chr_fe_set_handlers(&s->chr_sec_in, NULL, NULL, NULL, NULL,
39ab61c6 721 NULL, NULL, true);
0682e15b
ZC
722 error_report("colo-compare secondary_in error");
723 }
724}
725
66d2a242
HZ
726/*
727 * Check old packet regularly so it can watch for any packets
728 * that the secondary hasn't produced equivalents of.
729 */
dd321ecf 730static void check_old_packet_regular(void *opaque)
66d2a242
HZ
731{
732 CompareState *s = opaque;
733
734 /* if have old packet we will notify checkpoint */
735 colo_old_packet_check(s);
dd321ecf
WY
736 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
737 REGULAR_PACKET_CHECK_MS);
738}
739
740static void colo_compare_timer_init(CompareState *s)
741{
742 AioContext *ctx = iothread_get_aio_context(s->iothread);
66d2a242 743
dd321ecf
WY
744 s->packet_check_timer = aio_timer_new(ctx, QEMU_CLOCK_VIRTUAL,
745 SCALE_MS, check_old_packet_regular,
746 s);
747 timer_mod(s->packet_check_timer, qemu_clock_get_ms(QEMU_CLOCK_VIRTUAL) +
748 REGULAR_PACKET_CHECK_MS);
66d2a242
HZ
749}
750
dd321ecf 751static void colo_compare_timer_del(CompareState *s)
0682e15b 752{
dd321ecf
WY
753 if (s->packet_check_timer) {
754 timer_del(s->packet_check_timer);
755 timer_free(s->packet_check_timer);
756 s->packet_check_timer = NULL;
757 }
758 }
0682e15b 759
dd321ecf
WY
760static void colo_compare_iothread(CompareState *s)
761{
762 object_ref(OBJECT(s->iothread));
763 s->worker_context = iothread_get_g_main_context(s->iothread);
0682e15b 764
5345fdb4 765 qemu_chr_fe_set_handlers(&s->chr_pri_in, compare_chr_can_read,
81517ba3
AN
766 compare_pri_chr_in, NULL, NULL,
767 s, s->worker_context, true);
5345fdb4 768 qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read,
81517ba3
AN
769 compare_sec_chr_in, NULL, NULL,
770 s, s->worker_context, true);
0682e15b 771
dd321ecf 772 colo_compare_timer_init(s);
0682e15b
ZC
773}
774
7dce4e6f
ZC
775static char *compare_get_pri_indev(Object *obj, Error **errp)
776{
777 CompareState *s = COLO_COMPARE(obj);
778
779 return g_strdup(s->pri_indev);
780}
781
782static void compare_set_pri_indev(Object *obj, const char *value, Error **errp)
783{
784 CompareState *s = COLO_COMPARE(obj);
785
786 g_free(s->pri_indev);
787 s->pri_indev = g_strdup(value);
788}
789
790static char *compare_get_sec_indev(Object *obj, Error **errp)
791{
792 CompareState *s = COLO_COMPARE(obj);
793
794 return g_strdup(s->sec_indev);
795}
796
797static void compare_set_sec_indev(Object *obj, const char *value, Error **errp)
798{
799 CompareState *s = COLO_COMPARE(obj);
800
801 g_free(s->sec_indev);
802 s->sec_indev = g_strdup(value);
803}
804
805static char *compare_get_outdev(Object *obj, Error **errp)
806{
807 CompareState *s = COLO_COMPARE(obj);
808
809 return g_strdup(s->outdev);
810}
811
812static void compare_set_outdev(Object *obj, const char *value, Error **errp)
813{
814 CompareState *s = COLO_COMPARE(obj);
815
816 g_free(s->outdev);
817 s->outdev = g_strdup(value);
818}
819
aa3a7032
ZC
820static bool compare_get_vnet_hdr(Object *obj, Error **errp)
821{
822 CompareState *s = COLO_COMPARE(obj);
823
824 return s->vnet_hdr;
825}
826
827static void compare_set_vnet_hdr(Object *obj,
828 bool value,
829 Error **errp)
830{
831 CompareState *s = COLO_COMPARE(obj);
832
833 s->vnet_hdr = value;
834}
835
7dce4e6f
ZC
836static void compare_pri_rs_finalize(SocketReadState *pri_rs)
837{
59509ec1 838 CompareState *s = container_of(pri_rs, CompareState, pri_rs);
8ec14402 839 Connection *conn = NULL;
59509ec1 840
8ec14402 841 if (packet_enqueue(s, PRIMARY_IN, &conn)) {
59509ec1 842 trace_colo_compare_main("primary: unsupported packet in");
aa3a7032
ZC
843 compare_chr_send(s,
844 pri_rs->buf,
845 pri_rs->packet_len,
846 pri_rs->vnet_hdr_len);
0682e15b 847 } else {
3463218c 848 /* compare packet in the specified connection */
8ec14402 849 colo_compare_connection(conn, s);
59509ec1 850 }
7dce4e6f
ZC
851}
852
853static void compare_sec_rs_finalize(SocketReadState *sec_rs)
854{
59509ec1 855 CompareState *s = container_of(sec_rs, CompareState, sec_rs);
8ec14402 856 Connection *conn = NULL;
59509ec1 857
8ec14402 858 if (packet_enqueue(s, SECONDARY_IN, &conn)) {
59509ec1 859 trace_colo_compare_main("secondary: unsupported packet in");
0682e15b 860 } else {
3463218c 861 /* compare packet in the specified connection */
8ec14402 862 colo_compare_connection(conn, s);
59509ec1 863 }
7dce4e6f
ZC
864}
865
7dce4e6f
ZC
866
867/*
868 * Return 0 is success.
869 * Return 1 is failed.
870 */
0ec7b3e7 871static int find_and_check_chardev(Chardev **chr,
7dce4e6f
ZC
872 char *chr_name,
873 Error **errp)
874{
7dce4e6f
ZC
875 *chr = qemu_chr_find(chr_name);
876 if (*chr == NULL) {
877 error_setg(errp, "Device '%s' not found",
878 chr_name);
879 return 1;
880 }
881
0a73336d
DB
882 if (!qemu_chr_has_feature(*chr, QEMU_CHAR_FEATURE_RECONNECTABLE)) {
883 error_setg(errp, "chardev \"%s\" is not reconnectable",
7dce4e6f
ZC
884 chr_name);
885 return 1;
886 }
fbf3cc3a 887
7dce4e6f
ZC
888 return 0;
889}
890
891/*
892 * Called from the main thread on the primary
893 * to setup colo-compare.
894 */
895static void colo_compare_complete(UserCreatable *uc, Error **errp)
896{
897 CompareState *s = COLO_COMPARE(uc);
0ec7b3e7 898 Chardev *chr;
7dce4e6f 899
dd321ecf 900 if (!s->pri_indev || !s->sec_indev || !s->outdev || !s->iothread) {
7dce4e6f 901 error_setg(errp, "colo compare needs 'primary_in' ,"
dd321ecf 902 "'secondary_in','outdev','iothread' property set");
7dce4e6f
ZC
903 return;
904 } else if (!strcmp(s->pri_indev, s->outdev) ||
905 !strcmp(s->sec_indev, s->outdev) ||
906 !strcmp(s->pri_indev, s->sec_indev)) {
907 error_setg(errp, "'indev' and 'outdev' could not be same "
908 "for compare module");
909 return;
910 }
911
5345fdb4
MAL
912 if (find_and_check_chardev(&chr, s->pri_indev, errp) ||
913 !qemu_chr_fe_init(&s->chr_pri_in, chr, errp)) {
7dce4e6f
ZC
914 return;
915 }
916
5345fdb4
MAL
917 if (find_and_check_chardev(&chr, s->sec_indev, errp) ||
918 !qemu_chr_fe_init(&s->chr_sec_in, chr, errp)) {
7dce4e6f
ZC
919 return;
920 }
921
5345fdb4
MAL
922 if (find_and_check_chardev(&chr, s->outdev, errp) ||
923 !qemu_chr_fe_init(&s->chr_out, chr, errp)) {
7dce4e6f
ZC
924 return;
925 }
926
aa3a7032
ZC
927 net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr);
928 net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr);
7dce4e6f 929
b6540d40
ZC
930 g_queue_init(&s->conn_list);
931
932 s->connection_track_table = g_hash_table_new_full(connection_key_hash,
933 connection_key_equal,
934 g_free,
935 connection_destroy);
59509ec1 936
dd321ecf 937 colo_compare_iothread(s);
7dce4e6f
ZC
938 return;
939}
940
dfd917a9
HZ
941static void colo_flush_packets(void *opaque, void *user_data)
942{
943 CompareState *s = user_data;
944 Connection *conn = opaque;
945 Packet *pkt = NULL;
946
947 while (!g_queue_is_empty(&conn->primary_list)) {
948 pkt = g_queue_pop_head(&conn->primary_list);
aa3a7032
ZC
949 compare_chr_send(s,
950 pkt->data,
951 pkt->size,
952 pkt->vnet_hdr_len);
dfd917a9
HZ
953 packet_destroy(pkt, NULL);
954 }
955 while (!g_queue_is_empty(&conn->secondary_list)) {
956 pkt = g_queue_pop_head(&conn->secondary_list);
957 packet_destroy(pkt, NULL);
958 }
959}
960
7dce4e6f
ZC
961static void colo_compare_class_init(ObjectClass *oc, void *data)
962{
963 UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc);
964
965 ucc->complete = colo_compare_complete;
966}
967
968static void colo_compare_init(Object *obj)
969{
aa3a7032
ZC
970 CompareState *s = COLO_COMPARE(obj);
971
7dce4e6f
ZC
972 object_property_add_str(obj, "primary_in",
973 compare_get_pri_indev, compare_set_pri_indev,
974 NULL);
975 object_property_add_str(obj, "secondary_in",
976 compare_get_sec_indev, compare_set_sec_indev,
977 NULL);
978 object_property_add_str(obj, "outdev",
979 compare_get_outdev, compare_set_outdev,
980 NULL);
dd321ecf
WY
981 object_property_add_link(obj, "iothread", TYPE_IOTHREAD,
982 (Object **)&s->iothread,
983 object_property_allow_set_link,
984 OBJ_PROP_LINK_UNREF_ON_RELEASE, NULL);
aa3a7032
ZC
985
986 s->vnet_hdr = false;
987 object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr,
988 compare_set_vnet_hdr, NULL);
7dce4e6f
ZC
989}
990
991static void colo_compare_finalize(Object *obj)
992{
993 CompareState *s = COLO_COMPARE(obj);
994
1ce2610c
MAL
995 qemu_chr_fe_deinit(&s->chr_pri_in, false);
996 qemu_chr_fe_deinit(&s->chr_sec_in, false);
997 qemu_chr_fe_deinit(&s->chr_out, false);
dd321ecf
WY
998 if (s->iothread) {
999 colo_compare_timer_del(s);
1000 }
dfd917a9
HZ
1001 /* Release all unhandled packets after compare thead exited */
1002 g_queue_foreach(&s->conn_list, colo_flush_packets, s);
1003
727c2d76 1004 g_queue_clear(&s->conn_list);
0682e15b 1005
dd321ecf
WY
1006 if (s->connection_track_table) {
1007 g_hash_table_destroy(s->connection_track_table);
1008 }
1009
1010 if (s->iothread) {
1011 object_unref(OBJECT(s->iothread));
1012 }
7dce4e6f
ZC
1013 g_free(s->pri_indev);
1014 g_free(s->sec_indev);
1015 g_free(s->outdev);
1016}
1017
1018static const TypeInfo colo_compare_info = {
1019 .name = TYPE_COLO_COMPARE,
1020 .parent = TYPE_OBJECT,
1021 .instance_size = sizeof(CompareState),
1022 .instance_init = colo_compare_init,
1023 .instance_finalize = colo_compare_finalize,
1024 .class_size = sizeof(CompareClass),
1025 .class_init = colo_compare_class_init,
1026 .interfaces = (InterfaceInfo[]) {
1027 { TYPE_USER_CREATABLE },
1028 { }
1029 }
1030};
1031
1032static void register_types(void)
1033{
1034 type_register_static(&colo_compare_info);
1035}
1036
1037type_init(register_types);
This page took 0.227151 seconds and 4 git commands to generate.