]> Git Repo - qemu.git/blob - migration/colo.c
Merge remote-tracking branch 'sthibault/tags/samuel-thibault' into staging
[qemu.git] / migration / colo.c
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12
13 #include "qemu/osdep.h"
14 #include "qemu/timer.h"
15 #include "sysemu/sysemu.h"
16 #include "qemu-file-channel.h"
17 #include "migration/migration.h"
18 #include "migration/qemu-file.h"
19 #include "migration/colo.h"
20 #include "migration/block.h"
21 #include "io/channel-buffer.h"
22 #include "trace.h"
23 #include "qemu/error-report.h"
24 #include "qapi/error.h"
25 #include "migration/failover.h"
26 #include "replication.h"
27 #include "qmp-commands.h"
28
29 static bool vmstate_loading;
30
31 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
32
33 bool colo_supported(void)
34 {
35     return true;
36 }
37
38 bool migration_in_colo_state(void)
39 {
40     MigrationState *s = migrate_get_current();
41
42     return (s->state == MIGRATION_STATUS_COLO);
43 }
44
45 bool migration_incoming_in_colo_state(void)
46 {
47     MigrationIncomingState *mis = migration_incoming_get_current();
48
49     return mis && (mis->state == MIGRATION_STATUS_COLO);
50 }
51
52 static bool colo_runstate_is_stopped(void)
53 {
54     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
55 }
56
57 static void secondary_vm_do_failover(void)
58 {
59     int old_state;
60     MigrationIncomingState *mis = migration_incoming_get_current();
61
62     /* Can not do failover during the process of VM's loading VMstate, Or
63      * it will break the secondary VM.
64      */
65     if (vmstate_loading) {
66         old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
67                         FAILOVER_STATUS_RELAUNCH);
68         if (old_state != FAILOVER_STATUS_ACTIVE) {
69             error_report("Unknown error while do failover for secondary VM,"
70                          "old_state: %s", FailoverStatus_lookup[old_state]);
71         }
72         return;
73     }
74
75     migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
76                       MIGRATION_STATUS_COMPLETED);
77
78     if (!autostart) {
79         error_report("\"-S\" qemu option will be ignored in secondary side");
80         /* recover runstate to normal migration finish state */
81         autostart = true;
82     }
83     /*
84      * Make sure COLO incoming thread not block in recv or send,
85      * If mis->from_src_file and mis->to_src_file use the same fd,
86      * The second shutdown() will return -1, we ignore this value,
87      * It is harmless.
88      */
89     if (mis->from_src_file) {
90         qemu_file_shutdown(mis->from_src_file);
91     }
92     if (mis->to_src_file) {
93         qemu_file_shutdown(mis->to_src_file);
94     }
95
96     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
97                                    FAILOVER_STATUS_COMPLETED);
98     if (old_state != FAILOVER_STATUS_ACTIVE) {
99         error_report("Incorrect state (%s) while doing failover for "
100                      "secondary VM", FailoverStatus_lookup[old_state]);
101         return;
102     }
103     /* Notify COLO incoming thread that failover work is finished */
104     qemu_sem_post(&mis->colo_incoming_sem);
105     /* For Secondary VM, jump to incoming co */
106     if (mis->migration_incoming_co) {
107         qemu_coroutine_enter(mis->migration_incoming_co);
108     }
109 }
110
111 static void primary_vm_do_failover(void)
112 {
113     MigrationState *s = migrate_get_current();
114     int old_state;
115
116     migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
117                       MIGRATION_STATUS_COMPLETED);
118
119     /*
120      * Wake up COLO thread which may blocked in recv() or send(),
121      * The s->rp_state.from_dst_file and s->to_dst_file may use the
122      * same fd, but we still shutdown the fd for twice, it is harmless.
123      */
124     if (s->to_dst_file) {
125         qemu_file_shutdown(s->to_dst_file);
126     }
127     if (s->rp_state.from_dst_file) {
128         qemu_file_shutdown(s->rp_state.from_dst_file);
129     }
130
131     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
132                                    FAILOVER_STATUS_COMPLETED);
133     if (old_state != FAILOVER_STATUS_ACTIVE) {
134         error_report("Incorrect state (%s) while doing failover for Primary VM",
135                      FailoverStatus_lookup[old_state]);
136         return;
137     }
138     /* Notify COLO thread that failover work is finished */
139     qemu_sem_post(&s->colo_exit_sem);
140 }
141
142 void colo_do_failover(MigrationState *s)
143 {
144     /* Make sure VM stopped while failover happened. */
145     if (!colo_runstate_is_stopped()) {
146         vm_stop_force_state(RUN_STATE_COLO);
147     }
148
149     if (get_colo_mode() == COLO_MODE_PRIMARY) {
150         primary_vm_do_failover();
151     } else {
152         secondary_vm_do_failover();
153     }
154 }
155
156 void qmp_xen_set_replication(bool enable, bool primary,
157                              bool has_failover, bool failover,
158                              Error **errp)
159 {
160 #ifdef CONFIG_REPLICATION
161     ReplicationMode mode = primary ?
162                            REPLICATION_MODE_PRIMARY :
163                            REPLICATION_MODE_SECONDARY;
164
165     if (has_failover && enable) {
166         error_setg(errp, "Parameter 'failover' is only for"
167                    " stopping replication");
168         return;
169     }
170
171     if (enable) {
172         replication_start_all(mode, errp);
173     } else {
174         if (!has_failover) {
175             failover = NULL;
176         }
177         replication_stop_all(failover, failover ? NULL : errp);
178     }
179 #else
180     abort();
181 #endif
182 }
183
184 ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
185 {
186 #ifdef CONFIG_REPLICATION
187     Error *err = NULL;
188     ReplicationStatus *s = g_new0(ReplicationStatus, 1);
189
190     replication_get_error_all(&err);
191     if (err) {
192         s->error = true;
193         s->has_desc = true;
194         s->desc = g_strdup(error_get_pretty(err));
195     } else {
196         s->error = false;
197     }
198
199     error_free(err);
200     return s;
201 #else
202     abort();
203 #endif
204 }
205
206 void qmp_xen_colo_do_checkpoint(Error **errp)
207 {
208 #ifdef CONFIG_REPLICATION
209     replication_do_checkpoint_all(errp);
210 #else
211     abort();
212 #endif
213 }
214
215 static void colo_send_message(QEMUFile *f, COLOMessage msg,
216                               Error **errp)
217 {
218     int ret;
219
220     if (msg >= COLO_MESSAGE__MAX) {
221         error_setg(errp, "%s: Invalid message", __func__);
222         return;
223     }
224     qemu_put_be32(f, msg);
225     qemu_fflush(f);
226
227     ret = qemu_file_get_error(f);
228     if (ret < 0) {
229         error_setg_errno(errp, -ret, "Can't send COLO message");
230     }
231     trace_colo_send_message(COLOMessage_lookup[msg]);
232 }
233
234 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
235                                     uint64_t value, Error **errp)
236 {
237     Error *local_err = NULL;
238     int ret;
239
240     colo_send_message(f, msg, &local_err);
241     if (local_err) {
242         error_propagate(errp, local_err);
243         return;
244     }
245     qemu_put_be64(f, value);
246     qemu_fflush(f);
247
248     ret = qemu_file_get_error(f);
249     if (ret < 0) {
250         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
251                          COLOMessage_lookup[msg]);
252     }
253 }
254
255 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
256 {
257     COLOMessage msg;
258     int ret;
259
260     msg = qemu_get_be32(f);
261     ret = qemu_file_get_error(f);
262     if (ret < 0) {
263         error_setg_errno(errp, -ret, "Can't receive COLO message");
264         return msg;
265     }
266     if (msg >= COLO_MESSAGE__MAX) {
267         error_setg(errp, "%s: Invalid message", __func__);
268         return msg;
269     }
270     trace_colo_receive_message(COLOMessage_lookup[msg]);
271     return msg;
272 }
273
274 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
275                                        Error **errp)
276 {
277     COLOMessage msg;
278     Error *local_err = NULL;
279
280     msg = colo_receive_message(f, &local_err);
281     if (local_err) {
282         error_propagate(errp, local_err);
283         return;
284     }
285     if (msg != expect_msg) {
286         error_setg(errp, "Unexpected COLO message %d, expected %d",
287                           msg, expect_msg);
288     }
289 }
290
291 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
292                                            Error **errp)
293 {
294     Error *local_err = NULL;
295     uint64_t value;
296     int ret;
297
298     colo_receive_check_message(f, expect_msg, &local_err);
299     if (local_err) {
300         error_propagate(errp, local_err);
301         return 0;
302     }
303
304     value = qemu_get_be64(f);
305     ret = qemu_file_get_error(f);
306     if (ret < 0) {
307         error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
308                          COLOMessage_lookup[expect_msg]);
309     }
310     return value;
311 }
312
313 static int colo_do_checkpoint_transaction(MigrationState *s,
314                                           QIOChannelBuffer *bioc,
315                                           QEMUFile *fb)
316 {
317     Error *local_err = NULL;
318     int ret = -1;
319
320     colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
321                       &local_err);
322     if (local_err) {
323         goto out;
324     }
325
326     colo_receive_check_message(s->rp_state.from_dst_file,
327                     COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
328     if (local_err) {
329         goto out;
330     }
331     /* Reset channel-buffer directly */
332     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
333     bioc->usage = 0;
334
335     qemu_mutex_lock_iothread();
336     if (failover_get_state() != FAILOVER_STATUS_NONE) {
337         qemu_mutex_unlock_iothread();
338         goto out;
339     }
340     vm_stop_force_state(RUN_STATE_COLO);
341     qemu_mutex_unlock_iothread();
342     trace_colo_vm_state_change("run", "stop");
343     /*
344      * Failover request bh could be called after vm_stop_force_state(),
345      * So we need check failover_request_is_active() again.
346      */
347     if (failover_get_state() != FAILOVER_STATUS_NONE) {
348         goto out;
349     }
350
351     /* Disable block migration */
352     migrate_set_block_enabled(false, &local_err);
353     qemu_savevm_state_header(fb);
354     qemu_savevm_state_begin(fb);
355     qemu_mutex_lock_iothread();
356     qemu_savevm_state_complete_precopy(fb, false);
357     qemu_mutex_unlock_iothread();
358
359     qemu_fflush(fb);
360
361     colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
362     if (local_err) {
363         goto out;
364     }
365     /*
366      * We need the size of the VMstate data in Secondary side,
367      * With which we can decide how much data should be read.
368      */
369     colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
370                             bioc->usage, &local_err);
371     if (local_err) {
372         goto out;
373     }
374
375     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
376     qemu_fflush(s->to_dst_file);
377     ret = qemu_file_get_error(s->to_dst_file);
378     if (ret < 0) {
379         goto out;
380     }
381
382     colo_receive_check_message(s->rp_state.from_dst_file,
383                        COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
384     if (local_err) {
385         goto out;
386     }
387
388     colo_receive_check_message(s->rp_state.from_dst_file,
389                        COLO_MESSAGE_VMSTATE_LOADED, &local_err);
390     if (local_err) {
391         goto out;
392     }
393
394     ret = 0;
395
396     qemu_mutex_lock_iothread();
397     vm_start();
398     qemu_mutex_unlock_iothread();
399     trace_colo_vm_state_change("stop", "run");
400
401 out:
402     if (local_err) {
403         error_report_err(local_err);
404     }
405     return ret;
406 }
407
408 static void colo_process_checkpoint(MigrationState *s)
409 {
410     QIOChannelBuffer *bioc;
411     QEMUFile *fb = NULL;
412     int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
413     Error *local_err = NULL;
414     int ret;
415
416     failover_init_state();
417
418     s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
419     if (!s->rp_state.from_dst_file) {
420         error_report("Open QEMUFile from_dst_file failed");
421         goto out;
422     }
423
424     /*
425      * Wait for Secondary finish loading VM states and enter COLO
426      * restore.
427      */
428     colo_receive_check_message(s->rp_state.from_dst_file,
429                        COLO_MESSAGE_CHECKPOINT_READY, &local_err);
430     if (local_err) {
431         goto out;
432     }
433     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
434     fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
435     object_unref(OBJECT(bioc));
436
437     qemu_mutex_lock_iothread();
438     vm_start();
439     qemu_mutex_unlock_iothread();
440     trace_colo_vm_state_change("stop", "run");
441
442     timer_mod(s->colo_delay_timer,
443             current_time + s->parameters.x_checkpoint_delay);
444
445     while (s->state == MIGRATION_STATUS_COLO) {
446         if (failover_get_state() != FAILOVER_STATUS_NONE) {
447             error_report("failover request");
448             goto out;
449         }
450
451         qemu_sem_wait(&s->colo_checkpoint_sem);
452
453         ret = colo_do_checkpoint_transaction(s, bioc, fb);
454         if (ret < 0) {
455             goto out;
456         }
457     }
458
459 out:
460     /* Throw the unreported error message after exited from loop */
461     if (local_err) {
462         error_report_err(local_err);
463     }
464
465     if (fb) {
466         qemu_fclose(fb);
467     }
468
469     timer_del(s->colo_delay_timer);
470
471     /* Hope this not to be too long to wait here */
472     qemu_sem_wait(&s->colo_exit_sem);
473     qemu_sem_destroy(&s->colo_exit_sem);
474     /*
475      * Must be called after failover BH is completed,
476      * Or the failover BH may shutdown the wrong fd that
477      * re-used by other threads after we release here.
478      */
479     if (s->rp_state.from_dst_file) {
480         qemu_fclose(s->rp_state.from_dst_file);
481     }
482 }
483
484 void colo_checkpoint_notify(void *opaque)
485 {
486     MigrationState *s = opaque;
487     int64_t next_notify_time;
488
489     qemu_sem_post(&s->colo_checkpoint_sem);
490     s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
491     next_notify_time = s->colo_checkpoint_time +
492                     s->parameters.x_checkpoint_delay;
493     timer_mod(s->colo_delay_timer, next_notify_time);
494 }
495
496 void migrate_start_colo_process(MigrationState *s)
497 {
498     qemu_mutex_unlock_iothread();
499     qemu_sem_init(&s->colo_checkpoint_sem, 0);
500     s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
501                                 colo_checkpoint_notify, s);
502
503     qemu_sem_init(&s->colo_exit_sem, 0);
504     migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
505                       MIGRATION_STATUS_COLO);
506     colo_process_checkpoint(s);
507     qemu_mutex_lock_iothread();
508 }
509
510 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
511                                      Error **errp)
512 {
513     COLOMessage msg;
514     Error *local_err = NULL;
515
516     msg = colo_receive_message(f, &local_err);
517     if (local_err) {
518         error_propagate(errp, local_err);
519         return;
520     }
521
522     switch (msg) {
523     case COLO_MESSAGE_CHECKPOINT_REQUEST:
524         *checkpoint_request = 1;
525         break;
526     default:
527         *checkpoint_request = 0;
528         error_setg(errp, "Got unknown COLO message: %d", msg);
529         break;
530     }
531 }
532
533 void *colo_process_incoming_thread(void *opaque)
534 {
535     MigrationIncomingState *mis = opaque;
536     QEMUFile *fb = NULL;
537     QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
538     uint64_t total_size;
539     uint64_t value;
540     Error *local_err = NULL;
541
542     qemu_sem_init(&mis->colo_incoming_sem, 0);
543
544     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
545                       MIGRATION_STATUS_COLO);
546
547     failover_init_state();
548
549     mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
550     if (!mis->to_src_file) {
551         error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
552         goto out;
553     }
554     /*
555      * Note: the communication between Primary side and Secondary side
556      * should be sequential, we set the fd to unblocked in migration incoming
557      * coroutine, and here we are in the COLO incoming thread, so it is ok to
558      * set the fd back to blocked.
559      */
560     qemu_file_set_blocking(mis->from_src_file, true);
561
562     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
563     fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
564     object_unref(OBJECT(bioc));
565
566     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
567                       &local_err);
568     if (local_err) {
569         goto out;
570     }
571
572     while (mis->state == MIGRATION_STATUS_COLO) {
573         int request = 0;
574
575         colo_wait_handle_message(mis->from_src_file, &request, &local_err);
576         if (local_err) {
577             goto out;
578         }
579         assert(request);
580         if (failover_get_state() != FAILOVER_STATUS_NONE) {
581             error_report("failover request");
582             goto out;
583         }
584
585         /* FIXME: This is unnecessary for periodic checkpoint mode */
586         colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
587                      &local_err);
588         if (local_err) {
589             goto out;
590         }
591
592         colo_receive_check_message(mis->from_src_file,
593                            COLO_MESSAGE_VMSTATE_SEND, &local_err);
594         if (local_err) {
595             goto out;
596         }
597
598         value = colo_receive_message_value(mis->from_src_file,
599                                  COLO_MESSAGE_VMSTATE_SIZE, &local_err);
600         if (local_err) {
601             goto out;
602         }
603
604         /*
605          * Read VM device state data into channel buffer,
606          * It's better to re-use the memory allocated.
607          * Here we need to handle the channel buffer directly.
608          */
609         if (value > bioc->capacity) {
610             bioc->capacity = value;
611             bioc->data = g_realloc(bioc->data, bioc->capacity);
612         }
613         total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
614         if (total_size != value) {
615             error_report("Got %" PRIu64 " VMState data, less than expected"
616                         " %" PRIu64, total_size, value);
617             goto out;
618         }
619         bioc->usage = total_size;
620         qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
621
622         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
623                      &local_err);
624         if (local_err) {
625             goto out;
626         }
627
628         qemu_mutex_lock_iothread();
629         qemu_system_reset(SHUTDOWN_CAUSE_NONE);
630         vmstate_loading = true;
631         if (qemu_loadvm_state(fb) < 0) {
632             error_report("COLO: loadvm failed");
633             qemu_mutex_unlock_iothread();
634             goto out;
635         }
636
637         vmstate_loading = false;
638         qemu_mutex_unlock_iothread();
639
640         if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
641             failover_set_state(FAILOVER_STATUS_RELAUNCH,
642                             FAILOVER_STATUS_NONE);
643             failover_request_active(NULL);
644             goto out;
645         }
646
647         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
648                      &local_err);
649         if (local_err) {
650             goto out;
651         }
652     }
653
654 out:
655     vmstate_loading = false;
656     /* Throw the unreported error message after exited from loop */
657     if (local_err) {
658         error_report_err(local_err);
659     }
660
661     if (fb) {
662         qemu_fclose(fb);
663     }
664
665     /* Hope this not to be too long to loop here */
666     qemu_sem_wait(&mis->colo_incoming_sem);
667     qemu_sem_destroy(&mis->colo_incoming_sem);
668     /* Must be called after failover BH is completed */
669     if (mis->to_src_file) {
670         qemu_fclose(mis->to_src_file);
671     }
672     migration_incoming_exit_colo();
673
674     return NULL;
675 }
This page took 0.059434 seconds and 4 git commands to generate.