]> Git Repo - qemu.git/blob - migration/colo.c
Merge remote-tracking branch 'remotes/juanquintela/tags/migration/20170531' into...
[qemu.git] / migration / colo.c
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12
13 #include "qemu/osdep.h"
14 #include "qemu/timer.h"
15 #include "sysemu/sysemu.h"
16 #include "qemu-file-channel.h"
17 #include "migration/migration.h"
18 #include "migration/qemu-file.h"
19 #include "savevm.h"
20 #include "migration/colo.h"
21 #include "migration/block.h"
22 #include "io/channel-buffer.h"
23 #include "trace.h"
24 #include "qemu/error-report.h"
25 #include "qapi/error.h"
26 #include "migration/failover.h"
27 #include "replication.h"
28 #include "qmp-commands.h"
29
30 static bool vmstate_loading;
31
32 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
33
34 bool colo_supported(void)
35 {
36     return true;
37 }
38
39 bool migration_in_colo_state(void)
40 {
41     MigrationState *s = migrate_get_current();
42
43     return (s->state == MIGRATION_STATUS_COLO);
44 }
45
46 bool migration_incoming_in_colo_state(void)
47 {
48     MigrationIncomingState *mis = migration_incoming_get_current();
49
50     return mis && (mis->state == MIGRATION_STATUS_COLO);
51 }
52
53 static bool colo_runstate_is_stopped(void)
54 {
55     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
56 }
57
58 static void secondary_vm_do_failover(void)
59 {
60     int old_state;
61     MigrationIncomingState *mis = migration_incoming_get_current();
62
63     /* Can not do failover during the process of VM's loading VMstate, Or
64      * it will break the secondary VM.
65      */
66     if (vmstate_loading) {
67         old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
68                         FAILOVER_STATUS_RELAUNCH);
69         if (old_state != FAILOVER_STATUS_ACTIVE) {
70             error_report("Unknown error while do failover for secondary VM,"
71                          "old_state: %s", FailoverStatus_lookup[old_state]);
72         }
73         return;
74     }
75
76     migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
77                       MIGRATION_STATUS_COMPLETED);
78
79     if (!autostart) {
80         error_report("\"-S\" qemu option will be ignored in secondary side");
81         /* recover runstate to normal migration finish state */
82         autostart = true;
83     }
84     /*
85      * Make sure COLO incoming thread not block in recv or send,
86      * If mis->from_src_file and mis->to_src_file use the same fd,
87      * The second shutdown() will return -1, we ignore this value,
88      * It is harmless.
89      */
90     if (mis->from_src_file) {
91         qemu_file_shutdown(mis->from_src_file);
92     }
93     if (mis->to_src_file) {
94         qemu_file_shutdown(mis->to_src_file);
95     }
96
97     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
98                                    FAILOVER_STATUS_COMPLETED);
99     if (old_state != FAILOVER_STATUS_ACTIVE) {
100         error_report("Incorrect state (%s) while doing failover for "
101                      "secondary VM", FailoverStatus_lookup[old_state]);
102         return;
103     }
104     /* Notify COLO incoming thread that failover work is finished */
105     qemu_sem_post(&mis->colo_incoming_sem);
106     /* For Secondary VM, jump to incoming co */
107     if (mis->migration_incoming_co) {
108         qemu_coroutine_enter(mis->migration_incoming_co);
109     }
110 }
111
112 static void primary_vm_do_failover(void)
113 {
114     MigrationState *s = migrate_get_current();
115     int old_state;
116
117     migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
118                       MIGRATION_STATUS_COMPLETED);
119
120     /*
121      * Wake up COLO thread which may blocked in recv() or send(),
122      * The s->rp_state.from_dst_file and s->to_dst_file may use the
123      * same fd, but we still shutdown the fd for twice, it is harmless.
124      */
125     if (s->to_dst_file) {
126         qemu_file_shutdown(s->to_dst_file);
127     }
128     if (s->rp_state.from_dst_file) {
129         qemu_file_shutdown(s->rp_state.from_dst_file);
130     }
131
132     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
133                                    FAILOVER_STATUS_COMPLETED);
134     if (old_state != FAILOVER_STATUS_ACTIVE) {
135         error_report("Incorrect state (%s) while doing failover for Primary VM",
136                      FailoverStatus_lookup[old_state]);
137         return;
138     }
139     /* Notify COLO thread that failover work is finished */
140     qemu_sem_post(&s->colo_exit_sem);
141 }
142
143 void colo_do_failover(MigrationState *s)
144 {
145     /* Make sure VM stopped while failover happened. */
146     if (!colo_runstate_is_stopped()) {
147         vm_stop_force_state(RUN_STATE_COLO);
148     }
149
150     if (get_colo_mode() == COLO_MODE_PRIMARY) {
151         primary_vm_do_failover();
152     } else {
153         secondary_vm_do_failover();
154     }
155 }
156
157 void qmp_xen_set_replication(bool enable, bool primary,
158                              bool has_failover, bool failover,
159                              Error **errp)
160 {
161 #ifdef CONFIG_REPLICATION
162     ReplicationMode mode = primary ?
163                            REPLICATION_MODE_PRIMARY :
164                            REPLICATION_MODE_SECONDARY;
165
166     if (has_failover && enable) {
167         error_setg(errp, "Parameter 'failover' is only for"
168                    " stopping replication");
169         return;
170     }
171
172     if (enable) {
173         replication_start_all(mode, errp);
174     } else {
175         if (!has_failover) {
176             failover = NULL;
177         }
178         replication_stop_all(failover, failover ? NULL : errp);
179     }
180 #else
181     abort();
182 #endif
183 }
184
185 ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
186 {
187 #ifdef CONFIG_REPLICATION
188     Error *err = NULL;
189     ReplicationStatus *s = g_new0(ReplicationStatus, 1);
190
191     replication_get_error_all(&err);
192     if (err) {
193         s->error = true;
194         s->has_desc = true;
195         s->desc = g_strdup(error_get_pretty(err));
196     } else {
197         s->error = false;
198     }
199
200     error_free(err);
201     return s;
202 #else
203     abort();
204 #endif
205 }
206
207 void qmp_xen_colo_do_checkpoint(Error **errp)
208 {
209 #ifdef CONFIG_REPLICATION
210     replication_do_checkpoint_all(errp);
211 #else
212     abort();
213 #endif
214 }
215
216 static void colo_send_message(QEMUFile *f, COLOMessage msg,
217                               Error **errp)
218 {
219     int ret;
220
221     if (msg >= COLO_MESSAGE__MAX) {
222         error_setg(errp, "%s: Invalid message", __func__);
223         return;
224     }
225     qemu_put_be32(f, msg);
226     qemu_fflush(f);
227
228     ret = qemu_file_get_error(f);
229     if (ret < 0) {
230         error_setg_errno(errp, -ret, "Can't send COLO message");
231     }
232     trace_colo_send_message(COLOMessage_lookup[msg]);
233 }
234
235 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
236                                     uint64_t value, Error **errp)
237 {
238     Error *local_err = NULL;
239     int ret;
240
241     colo_send_message(f, msg, &local_err);
242     if (local_err) {
243         error_propagate(errp, local_err);
244         return;
245     }
246     qemu_put_be64(f, value);
247     qemu_fflush(f);
248
249     ret = qemu_file_get_error(f);
250     if (ret < 0) {
251         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
252                          COLOMessage_lookup[msg]);
253     }
254 }
255
256 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
257 {
258     COLOMessage msg;
259     int ret;
260
261     msg = qemu_get_be32(f);
262     ret = qemu_file_get_error(f);
263     if (ret < 0) {
264         error_setg_errno(errp, -ret, "Can't receive COLO message");
265         return msg;
266     }
267     if (msg >= COLO_MESSAGE__MAX) {
268         error_setg(errp, "%s: Invalid message", __func__);
269         return msg;
270     }
271     trace_colo_receive_message(COLOMessage_lookup[msg]);
272     return msg;
273 }
274
275 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
276                                        Error **errp)
277 {
278     COLOMessage msg;
279     Error *local_err = NULL;
280
281     msg = colo_receive_message(f, &local_err);
282     if (local_err) {
283         error_propagate(errp, local_err);
284         return;
285     }
286     if (msg != expect_msg) {
287         error_setg(errp, "Unexpected COLO message %d, expected %d",
288                           msg, expect_msg);
289     }
290 }
291
292 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
293                                            Error **errp)
294 {
295     Error *local_err = NULL;
296     uint64_t value;
297     int ret;
298
299     colo_receive_check_message(f, expect_msg, &local_err);
300     if (local_err) {
301         error_propagate(errp, local_err);
302         return 0;
303     }
304
305     value = qemu_get_be64(f);
306     ret = qemu_file_get_error(f);
307     if (ret < 0) {
308         error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
309                          COLOMessage_lookup[expect_msg]);
310     }
311     return value;
312 }
313
314 static int colo_do_checkpoint_transaction(MigrationState *s,
315                                           QIOChannelBuffer *bioc,
316                                           QEMUFile *fb)
317 {
318     Error *local_err = NULL;
319     int ret = -1;
320
321     colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
322                       &local_err);
323     if (local_err) {
324         goto out;
325     }
326
327     colo_receive_check_message(s->rp_state.from_dst_file,
328                     COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
329     if (local_err) {
330         goto out;
331     }
332     /* Reset channel-buffer directly */
333     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
334     bioc->usage = 0;
335
336     qemu_mutex_lock_iothread();
337     if (failover_get_state() != FAILOVER_STATUS_NONE) {
338         qemu_mutex_unlock_iothread();
339         goto out;
340     }
341     vm_stop_force_state(RUN_STATE_COLO);
342     qemu_mutex_unlock_iothread();
343     trace_colo_vm_state_change("run", "stop");
344     /*
345      * Failover request bh could be called after vm_stop_force_state(),
346      * So we need check failover_request_is_active() again.
347      */
348     if (failover_get_state() != FAILOVER_STATUS_NONE) {
349         goto out;
350     }
351
352     /* Disable block migration */
353     migrate_set_block_enabled(false, &local_err);
354     qemu_savevm_state_header(fb);
355     qemu_savevm_state_begin(fb);
356     qemu_mutex_lock_iothread();
357     qemu_savevm_state_complete_precopy(fb, false);
358     qemu_mutex_unlock_iothread();
359
360     qemu_fflush(fb);
361
362     colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
363     if (local_err) {
364         goto out;
365     }
366     /*
367      * We need the size of the VMstate data in Secondary side,
368      * With which we can decide how much data should be read.
369      */
370     colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
371                             bioc->usage, &local_err);
372     if (local_err) {
373         goto out;
374     }
375
376     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
377     qemu_fflush(s->to_dst_file);
378     ret = qemu_file_get_error(s->to_dst_file);
379     if (ret < 0) {
380         goto out;
381     }
382
383     colo_receive_check_message(s->rp_state.from_dst_file,
384                        COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
385     if (local_err) {
386         goto out;
387     }
388
389     colo_receive_check_message(s->rp_state.from_dst_file,
390                        COLO_MESSAGE_VMSTATE_LOADED, &local_err);
391     if (local_err) {
392         goto out;
393     }
394
395     ret = 0;
396
397     qemu_mutex_lock_iothread();
398     vm_start();
399     qemu_mutex_unlock_iothread();
400     trace_colo_vm_state_change("stop", "run");
401
402 out:
403     if (local_err) {
404         error_report_err(local_err);
405     }
406     return ret;
407 }
408
409 static void colo_process_checkpoint(MigrationState *s)
410 {
411     QIOChannelBuffer *bioc;
412     QEMUFile *fb = NULL;
413     int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
414     Error *local_err = NULL;
415     int ret;
416
417     failover_init_state();
418
419     s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
420     if (!s->rp_state.from_dst_file) {
421         error_report("Open QEMUFile from_dst_file failed");
422         goto out;
423     }
424
425     /*
426      * Wait for Secondary finish loading VM states and enter COLO
427      * restore.
428      */
429     colo_receive_check_message(s->rp_state.from_dst_file,
430                        COLO_MESSAGE_CHECKPOINT_READY, &local_err);
431     if (local_err) {
432         goto out;
433     }
434     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
435     fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
436     object_unref(OBJECT(bioc));
437
438     qemu_mutex_lock_iothread();
439     vm_start();
440     qemu_mutex_unlock_iothread();
441     trace_colo_vm_state_change("stop", "run");
442
443     timer_mod(s->colo_delay_timer,
444             current_time + s->parameters.x_checkpoint_delay);
445
446     while (s->state == MIGRATION_STATUS_COLO) {
447         if (failover_get_state() != FAILOVER_STATUS_NONE) {
448             error_report("failover request");
449             goto out;
450         }
451
452         qemu_sem_wait(&s->colo_checkpoint_sem);
453
454         ret = colo_do_checkpoint_transaction(s, bioc, fb);
455         if (ret < 0) {
456             goto out;
457         }
458     }
459
460 out:
461     /* Throw the unreported error message after exited from loop */
462     if (local_err) {
463         error_report_err(local_err);
464     }
465
466     if (fb) {
467         qemu_fclose(fb);
468     }
469
470     timer_del(s->colo_delay_timer);
471
472     /* Hope this not to be too long to wait here */
473     qemu_sem_wait(&s->colo_exit_sem);
474     qemu_sem_destroy(&s->colo_exit_sem);
475     /*
476      * Must be called after failover BH is completed,
477      * Or the failover BH may shutdown the wrong fd that
478      * re-used by other threads after we release here.
479      */
480     if (s->rp_state.from_dst_file) {
481         qemu_fclose(s->rp_state.from_dst_file);
482     }
483 }
484
485 void colo_checkpoint_notify(void *opaque)
486 {
487     MigrationState *s = opaque;
488     int64_t next_notify_time;
489
490     qemu_sem_post(&s->colo_checkpoint_sem);
491     s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
492     next_notify_time = s->colo_checkpoint_time +
493                     s->parameters.x_checkpoint_delay;
494     timer_mod(s->colo_delay_timer, next_notify_time);
495 }
496
497 void migrate_start_colo_process(MigrationState *s)
498 {
499     qemu_mutex_unlock_iothread();
500     qemu_sem_init(&s->colo_checkpoint_sem, 0);
501     s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
502                                 colo_checkpoint_notify, s);
503
504     qemu_sem_init(&s->colo_exit_sem, 0);
505     migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
506                       MIGRATION_STATUS_COLO);
507     colo_process_checkpoint(s);
508     qemu_mutex_lock_iothread();
509 }
510
511 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
512                                      Error **errp)
513 {
514     COLOMessage msg;
515     Error *local_err = NULL;
516
517     msg = colo_receive_message(f, &local_err);
518     if (local_err) {
519         error_propagate(errp, local_err);
520         return;
521     }
522
523     switch (msg) {
524     case COLO_MESSAGE_CHECKPOINT_REQUEST:
525         *checkpoint_request = 1;
526         break;
527     default:
528         *checkpoint_request = 0;
529         error_setg(errp, "Got unknown COLO message: %d", msg);
530         break;
531     }
532 }
533
534 void *colo_process_incoming_thread(void *opaque)
535 {
536     MigrationIncomingState *mis = opaque;
537     QEMUFile *fb = NULL;
538     QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
539     uint64_t total_size;
540     uint64_t value;
541     Error *local_err = NULL;
542
543     qemu_sem_init(&mis->colo_incoming_sem, 0);
544
545     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
546                       MIGRATION_STATUS_COLO);
547
548     failover_init_state();
549
550     mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
551     if (!mis->to_src_file) {
552         error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
553         goto out;
554     }
555     /*
556      * Note: the communication between Primary side and Secondary side
557      * should be sequential, we set the fd to unblocked in migration incoming
558      * coroutine, and here we are in the COLO incoming thread, so it is ok to
559      * set the fd back to blocked.
560      */
561     qemu_file_set_blocking(mis->from_src_file, true);
562
563     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
564     fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
565     object_unref(OBJECT(bioc));
566
567     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
568                       &local_err);
569     if (local_err) {
570         goto out;
571     }
572
573     while (mis->state == MIGRATION_STATUS_COLO) {
574         int request = 0;
575
576         colo_wait_handle_message(mis->from_src_file, &request, &local_err);
577         if (local_err) {
578             goto out;
579         }
580         assert(request);
581         if (failover_get_state() != FAILOVER_STATUS_NONE) {
582             error_report("failover request");
583             goto out;
584         }
585
586         /* FIXME: This is unnecessary for periodic checkpoint mode */
587         colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
588                      &local_err);
589         if (local_err) {
590             goto out;
591         }
592
593         colo_receive_check_message(mis->from_src_file,
594                            COLO_MESSAGE_VMSTATE_SEND, &local_err);
595         if (local_err) {
596             goto out;
597         }
598
599         value = colo_receive_message_value(mis->from_src_file,
600                                  COLO_MESSAGE_VMSTATE_SIZE, &local_err);
601         if (local_err) {
602             goto out;
603         }
604
605         /*
606          * Read VM device state data into channel buffer,
607          * It's better to re-use the memory allocated.
608          * Here we need to handle the channel buffer directly.
609          */
610         if (value > bioc->capacity) {
611             bioc->capacity = value;
612             bioc->data = g_realloc(bioc->data, bioc->capacity);
613         }
614         total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
615         if (total_size != value) {
616             error_report("Got %" PRIu64 " VMState data, less than expected"
617                         " %" PRIu64, total_size, value);
618             goto out;
619         }
620         bioc->usage = total_size;
621         qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
622
623         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
624                      &local_err);
625         if (local_err) {
626             goto out;
627         }
628
629         qemu_mutex_lock_iothread();
630         qemu_system_reset(SHUTDOWN_CAUSE_NONE);
631         vmstate_loading = true;
632         if (qemu_loadvm_state(fb) < 0) {
633             error_report("COLO: loadvm failed");
634             qemu_mutex_unlock_iothread();
635             goto out;
636         }
637
638         vmstate_loading = false;
639         qemu_mutex_unlock_iothread();
640
641         if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
642             failover_set_state(FAILOVER_STATUS_RELAUNCH,
643                             FAILOVER_STATUS_NONE);
644             failover_request_active(NULL);
645             goto out;
646         }
647
648         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
649                      &local_err);
650         if (local_err) {
651             goto out;
652         }
653     }
654
655 out:
656     vmstate_loading = false;
657     /* Throw the unreported error message after exited from loop */
658     if (local_err) {
659         error_report_err(local_err);
660     }
661
662     if (fb) {
663         qemu_fclose(fb);
664     }
665
666     /* Hope this not to be too long to loop here */
667     qemu_sem_wait(&mis->colo_incoming_sem);
668     qemu_sem_destroy(&mis->colo_incoming_sem);
669     /* Must be called after failover BH is completed */
670     if (mis->to_src_file) {
671         qemu_fclose(mis->to_src_file);
672     }
673     migration_incoming_exit_colo();
674
675     return NULL;
676 }
This page took 0.058249 seconds and 4 git commands to generate.