]> Git Repo - qemu.git/blob - migration/colo.c
replication: Make --disable-replication compile again
[qemu.git] / migration / colo.c
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12
13 #include "qemu/osdep.h"
14 #include "qemu/timer.h"
15 #include "sysemu/sysemu.h"
16 #include "migration/colo.h"
17 #include "io/channel-buffer.h"
18 #include "trace.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "migration/failover.h"
22 #include "replication.h"
23 #include "qmp-commands.h"
24
25 static bool vmstate_loading;
26
27 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
28
29 bool colo_supported(void)
30 {
31     return true;
32 }
33
34 bool migration_in_colo_state(void)
35 {
36     MigrationState *s = migrate_get_current();
37
38     return (s->state == MIGRATION_STATUS_COLO);
39 }
40
41 bool migration_incoming_in_colo_state(void)
42 {
43     MigrationIncomingState *mis = migration_incoming_get_current();
44
45     return mis && (mis->state == MIGRATION_STATUS_COLO);
46 }
47
48 static bool colo_runstate_is_stopped(void)
49 {
50     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
51 }
52
53 static void secondary_vm_do_failover(void)
54 {
55     int old_state;
56     MigrationIncomingState *mis = migration_incoming_get_current();
57
58     /* Can not do failover during the process of VM's loading VMstate, Or
59      * it will break the secondary VM.
60      */
61     if (vmstate_loading) {
62         old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
63                         FAILOVER_STATUS_RELAUNCH);
64         if (old_state != FAILOVER_STATUS_ACTIVE) {
65             error_report("Unknown error while do failover for secondary VM,"
66                          "old_state: %s", FailoverStatus_lookup[old_state]);
67         }
68         return;
69     }
70
71     migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
72                       MIGRATION_STATUS_COMPLETED);
73
74     if (!autostart) {
75         error_report("\"-S\" qemu option will be ignored in secondary side");
76         /* recover runstate to normal migration finish state */
77         autostart = true;
78     }
79     /*
80      * Make sure COLO incoming thread not block in recv or send,
81      * If mis->from_src_file and mis->to_src_file use the same fd,
82      * The second shutdown() will return -1, we ignore this value,
83      * It is harmless.
84      */
85     if (mis->from_src_file) {
86         qemu_file_shutdown(mis->from_src_file);
87     }
88     if (mis->to_src_file) {
89         qemu_file_shutdown(mis->to_src_file);
90     }
91
92     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
93                                    FAILOVER_STATUS_COMPLETED);
94     if (old_state != FAILOVER_STATUS_ACTIVE) {
95         error_report("Incorrect state (%s) while doing failover for "
96                      "secondary VM", FailoverStatus_lookup[old_state]);
97         return;
98     }
99     /* Notify COLO incoming thread that failover work is finished */
100     qemu_sem_post(&mis->colo_incoming_sem);
101     /* For Secondary VM, jump to incoming co */
102     if (mis->migration_incoming_co) {
103         qemu_coroutine_enter(mis->migration_incoming_co);
104     }
105 }
106
107 static void primary_vm_do_failover(void)
108 {
109     MigrationState *s = migrate_get_current();
110     int old_state;
111
112     migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
113                       MIGRATION_STATUS_COMPLETED);
114
115     /*
116      * Wake up COLO thread which may blocked in recv() or send(),
117      * The s->rp_state.from_dst_file and s->to_dst_file may use the
118      * same fd, but we still shutdown the fd for twice, it is harmless.
119      */
120     if (s->to_dst_file) {
121         qemu_file_shutdown(s->to_dst_file);
122     }
123     if (s->rp_state.from_dst_file) {
124         qemu_file_shutdown(s->rp_state.from_dst_file);
125     }
126
127     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
128                                    FAILOVER_STATUS_COMPLETED);
129     if (old_state != FAILOVER_STATUS_ACTIVE) {
130         error_report("Incorrect state (%s) while doing failover for Primary VM",
131                      FailoverStatus_lookup[old_state]);
132         return;
133     }
134     /* Notify COLO thread that failover work is finished */
135     qemu_sem_post(&s->colo_exit_sem);
136 }
137
138 void colo_do_failover(MigrationState *s)
139 {
140     /* Make sure VM stopped while failover happened. */
141     if (!colo_runstate_is_stopped()) {
142         vm_stop_force_state(RUN_STATE_COLO);
143     }
144
145     if (get_colo_mode() == COLO_MODE_PRIMARY) {
146         primary_vm_do_failover();
147     } else {
148         secondary_vm_do_failover();
149     }
150 }
151
152 void qmp_xen_set_replication(bool enable, bool primary,
153                              bool has_failover, bool failover,
154                              Error **errp)
155 {
156 #ifdef CONFIG_REPLICATION
157     ReplicationMode mode = primary ?
158                            REPLICATION_MODE_PRIMARY :
159                            REPLICATION_MODE_SECONDARY;
160
161     if (has_failover && enable) {
162         error_setg(errp, "Parameter 'failover' is only for"
163                    " stopping replication");
164         return;
165     }
166
167     if (enable) {
168         replication_start_all(mode, errp);
169     } else {
170         if (!has_failover) {
171             failover = NULL;
172         }
173         replication_stop_all(failover, failover ? NULL : errp);
174     }
175 #else
176     abort();
177 #endif
178 }
179
180 ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
181 {
182 #ifdef CONFIG_REPLICATION
183     Error *err = NULL;
184     ReplicationStatus *s = g_new0(ReplicationStatus, 1);
185
186     replication_get_error_all(&err);
187     if (err) {
188         s->error = true;
189         s->has_desc = true;
190         s->desc = g_strdup(error_get_pretty(err));
191     } else {
192         s->error = false;
193     }
194
195     error_free(err);
196     return s;
197 #else
198     abort();
199 #endif
200 }
201
202 void qmp_xen_colo_do_checkpoint(Error **errp)
203 {
204 #ifdef CONFIG_REPLICATION
205     replication_do_checkpoint_all(errp);
206 #else
207     abort();
208 #endif
209 }
210
211 static void colo_send_message(QEMUFile *f, COLOMessage msg,
212                               Error **errp)
213 {
214     int ret;
215
216     if (msg >= COLO_MESSAGE__MAX) {
217         error_setg(errp, "%s: Invalid message", __func__);
218         return;
219     }
220     qemu_put_be32(f, msg);
221     qemu_fflush(f);
222
223     ret = qemu_file_get_error(f);
224     if (ret < 0) {
225         error_setg_errno(errp, -ret, "Can't send COLO message");
226     }
227     trace_colo_send_message(COLOMessage_lookup[msg]);
228 }
229
230 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
231                                     uint64_t value, Error **errp)
232 {
233     Error *local_err = NULL;
234     int ret;
235
236     colo_send_message(f, msg, &local_err);
237     if (local_err) {
238         error_propagate(errp, local_err);
239         return;
240     }
241     qemu_put_be64(f, value);
242     qemu_fflush(f);
243
244     ret = qemu_file_get_error(f);
245     if (ret < 0) {
246         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
247                          COLOMessage_lookup[msg]);
248     }
249 }
250
251 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
252 {
253     COLOMessage msg;
254     int ret;
255
256     msg = qemu_get_be32(f);
257     ret = qemu_file_get_error(f);
258     if (ret < 0) {
259         error_setg_errno(errp, -ret, "Can't receive COLO message");
260         return msg;
261     }
262     if (msg >= COLO_MESSAGE__MAX) {
263         error_setg(errp, "%s: Invalid message", __func__);
264         return msg;
265     }
266     trace_colo_receive_message(COLOMessage_lookup[msg]);
267     return msg;
268 }
269
270 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
271                                        Error **errp)
272 {
273     COLOMessage msg;
274     Error *local_err = NULL;
275
276     msg = colo_receive_message(f, &local_err);
277     if (local_err) {
278         error_propagate(errp, local_err);
279         return;
280     }
281     if (msg != expect_msg) {
282         error_setg(errp, "Unexpected COLO message %d, expected %d",
283                           msg, expect_msg);
284     }
285 }
286
287 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
288                                            Error **errp)
289 {
290     Error *local_err = NULL;
291     uint64_t value;
292     int ret;
293
294     colo_receive_check_message(f, expect_msg, &local_err);
295     if (local_err) {
296         error_propagate(errp, local_err);
297         return 0;
298     }
299
300     value = qemu_get_be64(f);
301     ret = qemu_file_get_error(f);
302     if (ret < 0) {
303         error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
304                          COLOMessage_lookup[expect_msg]);
305     }
306     return value;
307 }
308
309 static int colo_do_checkpoint_transaction(MigrationState *s,
310                                           QIOChannelBuffer *bioc,
311                                           QEMUFile *fb)
312 {
313     Error *local_err = NULL;
314     int ret = -1;
315
316     colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
317                       &local_err);
318     if (local_err) {
319         goto out;
320     }
321
322     colo_receive_check_message(s->rp_state.from_dst_file,
323                     COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
324     if (local_err) {
325         goto out;
326     }
327     /* Reset channel-buffer directly */
328     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
329     bioc->usage = 0;
330
331     qemu_mutex_lock_iothread();
332     if (failover_get_state() != FAILOVER_STATUS_NONE) {
333         qemu_mutex_unlock_iothread();
334         goto out;
335     }
336     vm_stop_force_state(RUN_STATE_COLO);
337     qemu_mutex_unlock_iothread();
338     trace_colo_vm_state_change("run", "stop");
339     /*
340      * Failover request bh could be called after vm_stop_force_state(),
341      * So we need check failover_request_is_active() again.
342      */
343     if (failover_get_state() != FAILOVER_STATUS_NONE) {
344         goto out;
345     }
346
347     /* Disable block migration */
348     s->params.blk = 0;
349     s->params.shared = 0;
350     qemu_savevm_state_header(fb);
351     qemu_savevm_state_begin(fb, &s->params);
352     qemu_mutex_lock_iothread();
353     qemu_savevm_state_complete_precopy(fb, false);
354     qemu_mutex_unlock_iothread();
355
356     qemu_fflush(fb);
357
358     colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
359     if (local_err) {
360         goto out;
361     }
362     /*
363      * We need the size of the VMstate data in Secondary side,
364      * With which we can decide how much data should be read.
365      */
366     colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
367                             bioc->usage, &local_err);
368     if (local_err) {
369         goto out;
370     }
371
372     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
373     qemu_fflush(s->to_dst_file);
374     ret = qemu_file_get_error(s->to_dst_file);
375     if (ret < 0) {
376         goto out;
377     }
378
379     colo_receive_check_message(s->rp_state.from_dst_file,
380                        COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
381     if (local_err) {
382         goto out;
383     }
384
385     colo_receive_check_message(s->rp_state.from_dst_file,
386                        COLO_MESSAGE_VMSTATE_LOADED, &local_err);
387     if (local_err) {
388         goto out;
389     }
390
391     ret = 0;
392
393     qemu_mutex_lock_iothread();
394     vm_start();
395     qemu_mutex_unlock_iothread();
396     trace_colo_vm_state_change("stop", "run");
397
398 out:
399     if (local_err) {
400         error_report_err(local_err);
401     }
402     return ret;
403 }
404
405 static void colo_process_checkpoint(MigrationState *s)
406 {
407     QIOChannelBuffer *bioc;
408     QEMUFile *fb = NULL;
409     int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
410     Error *local_err = NULL;
411     int ret;
412
413     failover_init_state();
414
415     s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
416     if (!s->rp_state.from_dst_file) {
417         error_report("Open QEMUFile from_dst_file failed");
418         goto out;
419     }
420
421     /*
422      * Wait for Secondary finish loading VM states and enter COLO
423      * restore.
424      */
425     colo_receive_check_message(s->rp_state.from_dst_file,
426                        COLO_MESSAGE_CHECKPOINT_READY, &local_err);
427     if (local_err) {
428         goto out;
429     }
430     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
431     fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
432     object_unref(OBJECT(bioc));
433
434     qemu_mutex_lock_iothread();
435     vm_start();
436     qemu_mutex_unlock_iothread();
437     trace_colo_vm_state_change("stop", "run");
438
439     timer_mod(s->colo_delay_timer,
440             current_time + s->parameters.x_checkpoint_delay);
441
442     while (s->state == MIGRATION_STATUS_COLO) {
443         if (failover_get_state() != FAILOVER_STATUS_NONE) {
444             error_report("failover request");
445             goto out;
446         }
447
448         qemu_sem_wait(&s->colo_checkpoint_sem);
449
450         ret = colo_do_checkpoint_transaction(s, bioc, fb);
451         if (ret < 0) {
452             goto out;
453         }
454     }
455
456 out:
457     /* Throw the unreported error message after exited from loop */
458     if (local_err) {
459         error_report_err(local_err);
460     }
461
462     if (fb) {
463         qemu_fclose(fb);
464     }
465
466     timer_del(s->colo_delay_timer);
467
468     /* Hope this not to be too long to wait here */
469     qemu_sem_wait(&s->colo_exit_sem);
470     qemu_sem_destroy(&s->colo_exit_sem);
471     /*
472      * Must be called after failover BH is completed,
473      * Or the failover BH may shutdown the wrong fd that
474      * re-used by other threads after we release here.
475      */
476     if (s->rp_state.from_dst_file) {
477         qemu_fclose(s->rp_state.from_dst_file);
478     }
479 }
480
481 void colo_checkpoint_notify(void *opaque)
482 {
483     MigrationState *s = opaque;
484     int64_t next_notify_time;
485
486     qemu_sem_post(&s->colo_checkpoint_sem);
487     s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
488     next_notify_time = s->colo_checkpoint_time +
489                     s->parameters.x_checkpoint_delay;
490     timer_mod(s->colo_delay_timer, next_notify_time);
491 }
492
493 void migrate_start_colo_process(MigrationState *s)
494 {
495     qemu_mutex_unlock_iothread();
496     qemu_sem_init(&s->colo_checkpoint_sem, 0);
497     s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
498                                 colo_checkpoint_notify, s);
499
500     qemu_sem_init(&s->colo_exit_sem, 0);
501     migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
502                       MIGRATION_STATUS_COLO);
503     colo_process_checkpoint(s);
504     qemu_mutex_lock_iothread();
505 }
506
507 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
508                                      Error **errp)
509 {
510     COLOMessage msg;
511     Error *local_err = NULL;
512
513     msg = colo_receive_message(f, &local_err);
514     if (local_err) {
515         error_propagate(errp, local_err);
516         return;
517     }
518
519     switch (msg) {
520     case COLO_MESSAGE_CHECKPOINT_REQUEST:
521         *checkpoint_request = 1;
522         break;
523     default:
524         *checkpoint_request = 0;
525         error_setg(errp, "Got unknown COLO message: %d", msg);
526         break;
527     }
528 }
529
530 void *colo_process_incoming_thread(void *opaque)
531 {
532     MigrationIncomingState *mis = opaque;
533     QEMUFile *fb = NULL;
534     QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
535     uint64_t total_size;
536     uint64_t value;
537     Error *local_err = NULL;
538
539     qemu_sem_init(&mis->colo_incoming_sem, 0);
540
541     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
542                       MIGRATION_STATUS_COLO);
543
544     failover_init_state();
545
546     mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
547     if (!mis->to_src_file) {
548         error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
549         goto out;
550     }
551     /*
552      * Note: the communication between Primary side and Secondary side
553      * should be sequential, we set the fd to unblocked in migration incoming
554      * coroutine, and here we are in the COLO incoming thread, so it is ok to
555      * set the fd back to blocked.
556      */
557     qemu_file_set_blocking(mis->from_src_file, true);
558
559     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
560     fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
561     object_unref(OBJECT(bioc));
562
563     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
564                       &local_err);
565     if (local_err) {
566         goto out;
567     }
568
569     while (mis->state == MIGRATION_STATUS_COLO) {
570         int request = 0;
571
572         colo_wait_handle_message(mis->from_src_file, &request, &local_err);
573         if (local_err) {
574             goto out;
575         }
576         assert(request);
577         if (failover_get_state() != FAILOVER_STATUS_NONE) {
578             error_report("failover request");
579             goto out;
580         }
581
582         /* FIXME: This is unnecessary for periodic checkpoint mode */
583         colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
584                      &local_err);
585         if (local_err) {
586             goto out;
587         }
588
589         colo_receive_check_message(mis->from_src_file,
590                            COLO_MESSAGE_VMSTATE_SEND, &local_err);
591         if (local_err) {
592             goto out;
593         }
594
595         value = colo_receive_message_value(mis->from_src_file,
596                                  COLO_MESSAGE_VMSTATE_SIZE, &local_err);
597         if (local_err) {
598             goto out;
599         }
600
601         /*
602          * Read VM device state data into channel buffer,
603          * It's better to re-use the memory allocated.
604          * Here we need to handle the channel buffer directly.
605          */
606         if (value > bioc->capacity) {
607             bioc->capacity = value;
608             bioc->data = g_realloc(bioc->data, bioc->capacity);
609         }
610         total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
611         if (total_size != value) {
612             error_report("Got %" PRIu64 " VMState data, less than expected"
613                         " %" PRIu64, total_size, value);
614             goto out;
615         }
616         bioc->usage = total_size;
617         qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
618
619         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
620                      &local_err);
621         if (local_err) {
622             goto out;
623         }
624
625         qemu_mutex_lock_iothread();
626         qemu_system_reset(VMRESET_SILENT);
627         vmstate_loading = true;
628         if (qemu_loadvm_state(fb) < 0) {
629             error_report("COLO: loadvm failed");
630             qemu_mutex_unlock_iothread();
631             goto out;
632         }
633
634         vmstate_loading = false;
635         qemu_mutex_unlock_iothread();
636
637         if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
638             failover_set_state(FAILOVER_STATUS_RELAUNCH,
639                             FAILOVER_STATUS_NONE);
640             failover_request_active(NULL);
641             goto out;
642         }
643
644         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
645                      &local_err);
646         if (local_err) {
647             goto out;
648         }
649     }
650
651 out:
652     vmstate_loading = false;
653     /* Throw the unreported error message after exited from loop */
654     if (local_err) {
655         error_report_err(local_err);
656     }
657
658     if (fb) {
659         qemu_fclose(fb);
660     }
661
662     /* Hope this not to be too long to loop here */
663     qemu_sem_wait(&mis->colo_incoming_sem);
664     qemu_sem_destroy(&mis->colo_incoming_sem);
665     /* Must be called after failover BH is completed */
666     if (mis->to_src_file) {
667         qemu_fclose(mis->to_src_file);
668     }
669     migration_incoming_exit_colo();
670
671     return NULL;
672 }
This page took 0.061319 seconds and 4 git commands to generate.