]> Git Repo - qemu.git/blobdiff - migration/migration.c
migration: init dst in migration_object_init too
[qemu.git] / migration / migration.c
index 0fdb2e410ddbb7da9adbef221ccf3899b1bc2841..c1832b02638bbd4f88cf49d0fe6acbd63379062e 100644 (file)
 #include "qemu-file.h"
 #include "migration/vmstate.h"
 #include "block/block.h"
+#include "qapi/error.h"
+#include "qapi/qapi-commands-migration.h"
+#include "qapi/qapi-events-migration.h"
 #include "qapi/qmp/qerror.h"
+#include "qapi/qmp/qnull.h"
 #include "qemu/rcu.h"
 #include "block.h"
 #include "postcopy-ram.h"
 #include "qemu/thread.h"
-#include "qmp-commands.h"
 #include "trace.h"
-#include "qapi-event.h"
 #include "exec/target_page.h"
 #include "io/channel-buffer.h"
 #include "migration/colo.h"
@@ -93,6 +95,8 @@ enum mig_rp_message_type {
 
     MIG_RP_MSG_REQ_PAGES_ID, /* data (start: be64, len: be32, id: string) */
     MIG_RP_MSG_REQ_PAGES,    /* data (start: be64, len: be32) */
+    MIG_RP_MSG_RECV_BITMAP,  /* send recved_bitmap back to source */
+    MIG_RP_MSG_RESUME_ACK,   /* tell source that we are ready to resume */
 
     MIG_RP_MSG_MAX
 };
@@ -102,6 +106,7 @@ enum mig_rp_message_type {
    dynamic creation of migration */
 
 static MigrationState *current_migration;
+static MigrationIncomingState *current_incoming;
 
 static bool migration_object_check(MigrationState *ms, Error **errp);
 static int migration_maybe_pause(MigrationState *s,
@@ -117,6 +122,22 @@ void migration_object_init(void)
     assert(!current_migration);
     current_migration = MIGRATION_OBJ(object_new(TYPE_MIGRATION));
 
+    /*
+     * Init the migrate incoming object as well no matter whether
+     * we'll use it or not.
+     */
+    assert(!current_incoming);
+    current_incoming = g_new0(MigrationIncomingState, 1);
+    current_incoming->state = MIGRATION_STATUS_NONE;
+    current_incoming->postcopy_remote_fds =
+        g_array_new(FALSE, TRUE, sizeof(struct PostCopyFD));
+    qemu_mutex_init(&current_incoming->rp_mutex);
+    qemu_event_init(&current_incoming->main_thread_load_event, false);
+    qemu_sem_init(&current_incoming->postcopy_pause_sem_dst, 0);
+    qemu_sem_init(&current_incoming->postcopy_pause_sem_fault, 0);
+
+    init_dirty_bitmap_incoming_migration();
+
     if (!migration_object_check(current_migration, &err)) {
         error_report_err(err);
         exit(1);
@@ -147,17 +168,8 @@ MigrationState *migrate_get_current(void)
 
 MigrationIncomingState *migration_incoming_get_current(void)
 {
-    static bool once;
-    static MigrationIncomingState mis_current;
-
-    if (!once) {
-        mis_current.state = MIGRATION_STATUS_NONE;
-        memset(&mis_current, 0, sizeof(MigrationIncomingState));
-        qemu_mutex_init(&mis_current.rp_mutex);
-        qemu_event_init(&mis_current.main_thread_load_event, false);
-        once = true;
-    }
-    return &mis_current;
+    assert(current_incoming);
+    return current_incoming;
 }
 
 void migration_incoming_state_destroy(void)
@@ -175,6 +187,10 @@ void migration_incoming_state_destroy(void)
         qemu_fclose(mis->from_src_file);
         mis->from_src_file = NULL;
     }
+    if (mis->postcopy_remote_fds) {
+        g_array_free(mis->postcopy_remote_fds, TRUE);
+        mis->postcopy_remote_fds = NULL;
+    }
 
     qemu_event_reset(&mis->main_thread_load_event);
 }
@@ -203,17 +219,35 @@ static void deferred_incoming_migration(Error **errp)
  * Send a message on the return channel back to the source
  * of the migration.
  */
-static void migrate_send_rp_message(MigrationIncomingState *mis,
-                                    enum mig_rp_message_type message_type,
-                                    uint16_t len, void *data)
+static int migrate_send_rp_message(MigrationIncomingState *mis,
+                                   enum mig_rp_message_type message_type,
+                                   uint16_t len, void *data)
 {
+    int ret = 0;
+
     trace_migrate_send_rp_message((int)message_type, len);
     qemu_mutex_lock(&mis->rp_mutex);
+
+    /*
+     * It's possible that the file handle got lost due to network
+     * failures.
+     */
+    if (!mis->to_src_file) {
+        ret = -EIO;
+        goto error;
+    }
+
     qemu_put_be16(mis->to_src_file, (unsigned int)message_type);
     qemu_put_be16(mis->to_src_file, len);
     qemu_put_buffer(mis->to_src_file, data, len);
     qemu_fflush(mis->to_src_file);
+
+    /* It's possible that qemu file got error during sending */
+    ret = qemu_file_get_error(mis->to_src_file);
+
+error:
     qemu_mutex_unlock(&mis->rp_mutex);
+    return ret;
 }
 
 /* Request a range of pages from the source VM at the given
@@ -223,11 +257,12 @@ static void migrate_send_rp_message(MigrationIncomingState *mis,
  *   Start: Address offset within the RB
  *   Len: Length in bytes required - must be a multiple of pagesize
  */
-void migrate_send_rp_req_pages(MigrationIncomingState *mis, const char *rbname,
-                               ram_addr_t start, size_t len)
+int migrate_send_rp_req_pages(MigrationIncomingState *mis, const char *rbname,
+                              ram_addr_t start, size_t len)
 {
     uint8_t bufc[12 + 1 + 255]; /* start (8), len (4), rbname up to 256 */
     size_t msglen = 12; /* start + len */
+    enum mig_rp_message_type msg_type;
 
     *(uint64_t *)bufc = cpu_to_be64((uint64_t)start);
     *(uint32_t *)(bufc + 8) = cpu_to_be32((uint32_t)len);
@@ -239,10 +274,12 @@ void migrate_send_rp_req_pages(MigrationIncomingState *mis, const char *rbname,
         bufc[msglen++] = rbname_len;
         memcpy(bufc + msglen, rbname, rbname_len);
         msglen += rbname_len;
-        migrate_send_rp_message(mis, MIG_RP_MSG_REQ_PAGES_ID, msglen, bufc);
+        msg_type = MIG_RP_MSG_REQ_PAGES_ID;
     } else {
-        migrate_send_rp_message(mis, MIG_RP_MSG_REQ_PAGES, msglen, bufc);
+        msg_type = MIG_RP_MSG_REQ_PAGES;
     }
+
+    return migrate_send_rp_message(mis, msg_type, msglen, bufc);
 }
 
 void qemu_start_incoming_migration(const char *uri, Error **errp)
@@ -297,6 +334,8 @@ static void process_incoming_migration_bh(void *opaque)
        state, we need to obey autostart. Any other state is set with
        runstate_set. */
 
+    dirty_bitmap_mig_before_vm_start();
+
     if (!global_state_received() ||
         global_state_get_runstate() == RUN_STATE_RUNNING) {
         if (autostart) {
@@ -396,7 +435,7 @@ static void migration_incoming_setup(QEMUFile *f)
     qemu_file_set_blocking(f, false);
 }
 
-static void migration_incoming_process(void)
+void migration_incoming_process(void)
 {
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
     qemu_coroutine_enter(co);
@@ -404,8 +443,34 @@ static void migration_incoming_process(void)
 
 void migration_fd_process_incoming(QEMUFile *f)
 {
-    migration_incoming_setup(f);
-    migration_incoming_process();
+    MigrationIncomingState *mis = migration_incoming_get_current();
+
+    if (mis->state == MIGRATION_STATUS_POSTCOPY_PAUSED) {
+        /* Resumed from a paused postcopy migration */
+
+        mis->from_src_file = f;
+        /* Postcopy has standalone thread to do vm load */
+        qemu_file_set_blocking(f, true);
+
+        /* Re-configure the return path */
+        mis->to_src_file = qemu_file_get_return_path(f);
+
+        migrate_set_state(&mis->state, MIGRATION_STATUS_POSTCOPY_PAUSED,
+                          MIGRATION_STATUS_POSTCOPY_RECOVER);
+
+        /*
+         * Here, we only wake up the main loading thread (while the
+         * fault thread will still be waiting), so that we can receive
+         * commands from source now, and answer it if needed. The
+         * fault thread will be woken up afterwards until we are sure
+         * that source is ready to reply to page requests.
+         */
+        qemu_sem_post(&mis->postcopy_pause_sem_dst);
+    } else {
+        /* New incoming migration */
+        migration_incoming_setup(f);
+        migration_incoming_process();
+    }
 }
 
 void migration_ioc_process_incoming(QIOChannel *ioc)
@@ -414,9 +479,10 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
 
     if (!mis->from_src_file) {
         QEMUFile *f = qemu_fopen_channel_input(ioc);
-        migration_fd_process_incoming(f);
+        migration_incoming_setup(f);
+        return;
     }
-    /* We still only have a single channel.  Nothing to do here yet */
+    multifd_recv_new_channel(ioc);
 }
 
 /**
@@ -427,7 +493,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
  */
 bool migration_has_all_channels(void)
 {
-    return true;
+    bool all_channels;
+
+    all_channels = multifd_recv_all_channels_created();
+
+    return all_channels;
 }
 
 /*
@@ -457,6 +527,53 @@ void migrate_send_rp_pong(MigrationIncomingState *mis,
     migrate_send_rp_message(mis, MIG_RP_MSG_PONG, sizeof(buf), &buf);
 }
 
+void migrate_send_rp_recv_bitmap(MigrationIncomingState *mis,
+                                 char *block_name)
+{
+    char buf[512];
+    int len;
+    int64_t res;
+
+    /*
+     * First, we send the header part. It contains only the len of
+     * idstr, and the idstr itself.
+     */
+    len = strlen(block_name);
+    buf[0] = len;
+    memcpy(buf + 1, block_name, len);
+
+    if (mis->state != MIGRATION_STATUS_POSTCOPY_RECOVER) {
+        error_report("%s: MSG_RP_RECV_BITMAP only used for recovery",
+                     __func__);
+        return;
+    }
+
+    migrate_send_rp_message(mis, MIG_RP_MSG_RECV_BITMAP, len + 1, buf);
+
+    /*
+     * Next, we dump the received bitmap to the stream.
+     *
+     * TODO: currently we are safe since we are the only one that is
+     * using the to_src_file handle (fault thread is still paused),
+     * and it's ok even not taking the mutex. However the best way is
+     * to take the lock before sending the message header, and release
+     * the lock after sending the bitmap.
+     */
+    qemu_mutex_lock(&mis->rp_mutex);
+    res = ramblock_recv_bitmap_send(mis->to_src_file, block_name);
+    qemu_mutex_unlock(&mis->rp_mutex);
+
+    trace_migrate_send_rp_recv_bitmap(block_name, res);
+}
+
+void migrate_send_rp_resume_ack(MigrationIncomingState *mis, uint32_t value)
+{
+    uint32_t buf;
+
+    buf = cpu_to_be32(value);
+    migrate_send_rp_message(mis, MIG_RP_MSG_RESUME_ACK, sizeof(buf), &buf);
+}
+
 MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp)
 {
     MigrationCapabilityStatusList *head = NULL;
@@ -535,6 +652,8 @@ static bool migration_is_setup_or_active(int state)
     switch (state) {
     case MIGRATION_STATUS_ACTIVE:
     case MIGRATION_STATUS_POSTCOPY_ACTIVE:
+    case MIGRATION_STATUS_POSTCOPY_PAUSED:
+    case MIGRATION_STATUS_POSTCOPY_RECOVER:
     case MIGRATION_STATUS_SETUP:
     case MIGRATION_STATUS_PRE_SWITCHOVER:
     case MIGRATION_STATUS_DEVICE:
@@ -596,14 +715,15 @@ static void populate_disk_info(MigrationInfo *info)
     }
 }
 
-MigrationInfo *qmp_query_migrate(Error **errp)
+static void fill_source_migration_info(MigrationInfo *info)
 {
-    MigrationInfo *info = g_malloc0(sizeof(*info));
     MigrationState *s = migrate_get_current();
 
     switch (s->state) {
     case MIGRATION_STATUS_NONE:
         /* no migration has happened ever */
+        /* do not overwrite destination migration status */
+        return;
         break;
     case MIGRATION_STATUS_SETUP:
         info->has_status = true;
@@ -614,6 +734,8 @@ MigrationInfo *qmp_query_migrate(Error **errp)
     case MIGRATION_STATUS_POSTCOPY_ACTIVE:
     case MIGRATION_STATUS_PRE_SWITCHOVER:
     case MIGRATION_STATUS_DEVICE:
+    case MIGRATION_STATUS_POSTCOPY_PAUSED:
+    case MIGRATION_STATUS_POSTCOPY_RECOVER:
          /* TODO add some postcopy stats */
         info->has_status = true;
         info->has_total_time = true;
@@ -654,8 +776,6 @@ MigrationInfo *qmp_query_migrate(Error **errp)
         break;
     }
     info->status = s->state;
-
-    return info;
 }
 
 /**
@@ -719,18 +839,55 @@ static bool migrate_caps_check(bool *cap_list,
     return true;
 }
 
+static void fill_destination_migration_info(MigrationInfo *info)
+{
+    MigrationIncomingState *mis = migration_incoming_get_current();
+
+    switch (mis->state) {
+    case MIGRATION_STATUS_NONE:
+        return;
+        break;
+    case MIGRATION_STATUS_SETUP:
+    case MIGRATION_STATUS_CANCELLING:
+    case MIGRATION_STATUS_CANCELLED:
+    case MIGRATION_STATUS_ACTIVE:
+    case MIGRATION_STATUS_POSTCOPY_ACTIVE:
+    case MIGRATION_STATUS_FAILED:
+    case MIGRATION_STATUS_COLO:
+        info->has_status = true;
+        break;
+    case MIGRATION_STATUS_COMPLETED:
+        info->has_status = true;
+        fill_destination_postcopy_migration_info(info);
+        break;
+    }
+    info->status = mis->state;
+}
+
+MigrationInfo *qmp_query_migrate(Error **errp)
+{
+    MigrationInfo *info = g_malloc0(sizeof(*info));
+
+    fill_destination_migration_info(info);
+    fill_source_migration_info(info);
+
+    return info;
+}
+
 void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
                                   Error **errp)
 {
     MigrationState *s = migrate_get_current();
     MigrationCapabilityStatusList *cap;
+    bool cap_list[MIGRATION_CAPABILITY__MAX];
 
     if (migration_is_setup_or_active(s->state)) {
         error_setg(errp, QERR_MIGRATION_ACTIVE);
         return;
     }
 
-    if (!migrate_caps_check(s->enabled_capabilities, params, errp)) {
+    memcpy(cap_list, s->enabled_capabilities, sizeof(cap_list));
+    if (!migrate_caps_check(cap_list, params, errp)) {
         return;
     }
 
@@ -970,14 +1127,14 @@ void qmp_migrate_set_parameters(MigrateSetParameters *params, Error **errp)
     /* TODO Rewrite "" to null instead */
     if (params->has_tls_creds
         && params->tls_creds->type == QTYPE_QNULL) {
-        QDECREF(params->tls_creds->u.n);
+        qobject_unref(params->tls_creds->u.n);
         params->tls_creds->type = QTYPE_QSTRING;
         params->tls_creds->u.s = strdup("");
     }
     /* TODO Rewrite "" to null instead */
     if (params->has_tls_hostname
         && params->tls_hostname->type == QTYPE_QNULL) {
-        QDECREF(params->tls_hostname->u.n);
+        qobject_unref(params->tls_hostname->u.n);
         params->tls_hostname->type = QTYPE_QSTRING;
         params->tls_hostname->u.s = strdup("");
     }
@@ -997,7 +1154,7 @@ void qmp_migrate_start_postcopy(Error **errp)
 {
     MigrationState *s = migrate_get_current();
 
-    if (!migrate_postcopy_ram()) {
+    if (!migrate_postcopy()) {
         error_setg(errp, "Enable postcopy with migrate_set_capability before"
                          " the start of migration");
         return;
@@ -1235,10 +1392,8 @@ bool migration_is_idle(void)
     return false;
 }
 
-MigrationState *migrate_init(void)
+void migrate_init(MigrationState *s)
 {
-    MigrationState *s = migrate_get_current();
-
     /*
      * Reinitialise all migration state, except
      * parameters/capabilities that the user set, and
@@ -1268,7 +1423,6 @@ MigrationState *migrate_init(void)
     s->vm_was_running = false;
     s->iteration_initial_bytes = 0;
     s->threshold_size = 0;
-    return s;
 }
 
 static GSList *migration_blockers;
@@ -1335,48 +1489,74 @@ bool migration_is_blocked(Error **errp)
     return false;
 }
 
-void qmp_migrate(const char *uri, bool has_blk, bool blk,
-                 bool has_inc, bool inc, bool has_detach, bool detach,
-                 Error **errp)
+/* Returns true if continue to migrate, or false if error detected */
+static bool migrate_prepare(MigrationState *s, bool blk, bool blk_inc,
+                            bool resume, Error **errp)
 {
     Error *local_err = NULL;
-    MigrationState *s = migrate_get_current();
-    const char *p;
+
+    if (resume) {
+        if (s->state != MIGRATION_STATUS_POSTCOPY_PAUSED) {
+            error_setg(errp, "Cannot resume if there is no "
+                       "paused migration");
+            return false;
+        }
+        /* This is a resume, skip init status */
+        return true;
+    }
 
     if (migration_is_setup_or_active(s->state) ||
         s->state == MIGRATION_STATUS_CANCELLING ||
         s->state == MIGRATION_STATUS_COLO) {
         error_setg(errp, QERR_MIGRATION_ACTIVE);
-        return;
+        return false;
     }
+
     if (runstate_check(RUN_STATE_INMIGRATE)) {
         error_setg(errp, "Guest is waiting for an incoming migration");
-        return;
+        return false;
     }
 
     if (migration_is_blocked(errp)) {
-        return;
+        return false;
     }
 
-    if ((has_blk && blk) || (has_inc && inc)) {
+    if (blk || blk_inc) {
         if (migrate_use_block() || migrate_use_block_incremental()) {
             error_setg(errp, "Command options are incompatible with "
                        "current migration capabilities");
-            return;
+            return false;
         }
         migrate_set_block_enabled(true, &local_err);
         if (local_err) {
             error_propagate(errp, local_err);
-            return;
+            return false;
         }
         s->must_remove_block_options = true;
     }
 
-    if (has_inc && inc) {
+    if (blk_inc) {
         migrate_set_block_incremental(s, true);
     }
 
-    s = migrate_init();
+    migrate_init(s);
+
+    return true;
+}
+
+void qmp_migrate(const char *uri, bool has_blk, bool blk,
+                 bool has_inc, bool inc, bool has_detach, bool detach,
+                 bool has_resume, bool resume, Error **errp)
+{
+    Error *local_err = NULL;
+    MigrationState *s = migrate_get_current();
+    const char *p;
+
+    if (!migrate_prepare(s, has_blk && blk, has_inc && inc,
+                         has_resume && resume, errp)) {
+        /* Error detected, put into errp */
+        return;
+    }
 
     if (strstart(uri, "tcp:", &p)) {
         tcp_start_outgoing_migration(s, p, &local_err);
@@ -1395,6 +1575,7 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
                    "a valid migration protocol");
         migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
                           MIGRATION_STATUS_FAILED);
+        block_cleanup_parameters(s);
         return;
     }
 
@@ -1486,7 +1667,7 @@ bool migrate_postcopy_ram(void)
 
 bool migrate_postcopy(void)
 {
-    return migrate_postcopy_ram();
+    return migrate_postcopy_ram() || migrate_dirty_bitmaps();
 }
 
 bool migrate_auto_converge(void)
@@ -1507,6 +1688,15 @@ bool migrate_zero_blocks(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
 }
 
+bool migrate_postcopy_blocktime(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_POSTCOPY_BLOCKTIME];
+}
+
 bool migrate_use_compression(void)
 {
     MigrationState *s;
@@ -1543,6 +1733,15 @@ int migrate_decompress_threads(void)
     return s->parameters.decompress_threads;
 }
 
+bool migrate_dirty_bitmaps(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_DIRTY_BITMAPS];
+}
+
 bool migrate_use_events(void)
 {
     MigrationState *s;
@@ -1653,6 +1852,8 @@ static struct rp_cmd_args {
     [MIG_RP_MSG_PONG]           = { .len =  4, .name = "PONG" },
     [MIG_RP_MSG_REQ_PAGES]      = { .len = 12, .name = "REQ_PAGES" },
     [MIG_RP_MSG_REQ_PAGES_ID]   = { .len = -1, .name = "REQ_PAGES_ID" },
+    [MIG_RP_MSG_RECV_BITMAP]    = { .len = -1, .name = "RECV_BITMAP" },
+    [MIG_RP_MSG_RESUME_ACK]     = { .len =  4, .name = "RESUME_ACK" },
     [MIG_RP_MSG_MAX]            = { .len = -1, .name = "MAX" },
 };
 
@@ -1685,6 +1886,51 @@ static void migrate_handle_rp_req_pages(MigrationState *ms, const char* rbname,
     }
 }
 
+/* Return true to retry, false to quit */
+static bool postcopy_pause_return_path_thread(MigrationState *s)
+{
+    trace_postcopy_pause_return_path();
+
+    qemu_sem_wait(&s->postcopy_pause_rp_sem);
+
+    trace_postcopy_pause_return_path_continued();
+
+    return true;
+}
+
+static int migrate_handle_rp_recv_bitmap(MigrationState *s, char *block_name)
+{
+    RAMBlock *block = qemu_ram_block_by_name(block_name);
+
+    if (!block) {
+        error_report("%s: invalid block name '%s'", __func__, block_name);
+        return -EINVAL;
+    }
+
+    /* Fetch the received bitmap and refresh the dirty bitmap */
+    return ram_dirty_bitmap_reload(s, block);
+}
+
+static int migrate_handle_rp_resume_ack(MigrationState *s, uint32_t value)
+{
+    trace_source_return_path_thread_resume_ack(value);
+
+    if (value != MIGRATION_RESUME_ACK_VALUE) {
+        error_report("%s: illegal resume_ack value %"PRIu32,
+                     __func__, value);
+        return -1;
+    }
+
+    /* Now both sides are active. */
+    migrate_set_state(&s->state, MIGRATION_STATUS_POSTCOPY_RECOVER,
+                      MIGRATION_STATUS_POSTCOPY_ACTIVE);
+
+    /* Notify send thread that time to continue send pages */
+    qemu_sem_post(&s->rp_state.rp_sem);
+
+    return 0;
+}
+
 /*
  * Handles messages sent on the return path towards the source VM
  *
@@ -1701,12 +1947,19 @@ static void *source_return_path_thread(void *opaque)
     int res;
 
     trace_source_return_path_thread_entry();
+
+retry:
     while (!ms->rp_state.error && !qemu_file_get_error(rp) &&
            migration_is_setup_or_active(ms->state)) {
         trace_source_return_path_thread_loop_top();
         header_type = qemu_get_be16(rp);
         header_len = qemu_get_be16(rp);
 
+        if (qemu_file_get_error(rp)) {
+            mark_source_rp_bad(ms);
+            goto out;
+        }
+
         if (header_type >= MIG_RP_MSG_MAX ||
             header_type == MIG_RP_MSG_INVALID) {
             error_report("RP: Received invalid message 0x%04x length 0x%04x",
@@ -1783,23 +2036,61 @@ static void *source_return_path_thread(void *opaque)
             migrate_handle_rp_req_pages(ms, (char *)&buf[13], start, len);
             break;
 
+        case MIG_RP_MSG_RECV_BITMAP:
+            if (header_len < 1) {
+                error_report("%s: missing block name", __func__);
+                mark_source_rp_bad(ms);
+                goto out;
+            }
+            /* Format: len (1B) + idstr (<255B). This ends the idstr. */
+            buf[buf[0] + 1] = '\0';
+            if (migrate_handle_rp_recv_bitmap(ms, (char *)(buf + 1))) {
+                mark_source_rp_bad(ms);
+                goto out;
+            }
+            break;
+
+        case MIG_RP_MSG_RESUME_ACK:
+            tmp32 = ldl_be_p(buf);
+            if (migrate_handle_rp_resume_ack(ms, tmp32)) {
+                mark_source_rp_bad(ms);
+                goto out;
+            }
+            break;
+
         default:
             break;
         }
     }
-    if (qemu_file_get_error(rp)) {
+
+out:
+    res = qemu_file_get_error(rp);
+    if (res) {
+        if (res == -EIO) {
+            /*
+             * Maybe there is something we can do: it looks like a
+             * network down issue, and we pause for a recovery.
+             */
+            if (postcopy_pause_return_path_thread(ms)) {
+                /* Reload rp, reset the rest */
+                rp = ms->rp_state.from_dst_file;
+                ms->rp_state.error = false;
+                goto retry;
+            }
+        }
+
         trace_source_return_path_thread_bad_end();
         mark_source_rp_bad(ms);
     }
 
     trace_source_return_path_thread_end();
-out:
     ms->rp_state.from_dst_file = NULL;
     qemu_fclose(rp);
     return NULL;
 }
 
-static int open_return_path_on_source(MigrationState *ms)
+static int open_return_path_on_source(MigrationState *ms,
+                                      bool create_thread)
 {
 
     ms->rp_state.from_dst_file = qemu_file_get_return_path(ms->to_dst_file);
@@ -1808,6 +2099,12 @@ static int open_return_path_on_source(MigrationState *ms)
     }
 
     trace_open_return_path_on_source();
+
+    if (!create_thread) {
+        /* We're done */
+        return 0;
+    }
+
     qemu_thread_create(&ms->rp_state.rp_thread, "return path",
                        source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
 
@@ -2147,6 +2444,150 @@ bool migrate_colo_enabled(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_X_COLO];
 }
 
+typedef enum MigThrError {
+    /* No error detected */
+    MIG_THR_ERR_NONE = 0,
+    /* Detected error, but resumed successfully */
+    MIG_THR_ERR_RECOVERED = 1,
+    /* Detected fatal error, need to exit */
+    MIG_THR_ERR_FATAL = 2,
+} MigThrError;
+
+static int postcopy_resume_handshake(MigrationState *s)
+{
+    qemu_savevm_send_postcopy_resume(s->to_dst_file);
+
+    while (s->state == MIGRATION_STATUS_POSTCOPY_RECOVER) {
+        qemu_sem_wait(&s->rp_state.rp_sem);
+    }
+
+    if (s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+        return 0;
+    }
+
+    return -1;
+}
+
+/* Return zero if success, or <0 for error */
+static int postcopy_do_resume(MigrationState *s)
+{
+    int ret;
+
+    /*
+     * Call all the resume_prepare() hooks, so that modules can be
+     * ready for the migration resume.
+     */
+    ret = qemu_savevm_state_resume_prepare(s);
+    if (ret) {
+        error_report("%s: resume_prepare() failure detected: %d",
+                     __func__, ret);
+        return ret;
+    }
+
+    /*
+     * Last handshake with destination on the resume (destination will
+     * switch to postcopy-active afterwards)
+     */
+    ret = postcopy_resume_handshake(s);
+    if (ret) {
+        error_report("%s: handshake failed: %d", __func__, ret);
+        return ret;
+    }
+
+    return 0;
+}
+
+/*
+ * We don't return until we are in a safe state to continue current
+ * postcopy migration.  Returns MIG_THR_ERR_RECOVERED if recovered, or
+ * MIG_THR_ERR_FATAL if unrecovery failure happened.
+ */
+static MigThrError postcopy_pause(MigrationState *s)
+{
+    assert(s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE);
+
+    while (true) {
+        migrate_set_state(&s->state, s->state,
+                          MIGRATION_STATUS_POSTCOPY_PAUSED);
+
+        /* Current channel is possibly broken. Release it. */
+        assert(s->to_dst_file);
+        qemu_file_shutdown(s->to_dst_file);
+        qemu_fclose(s->to_dst_file);
+        s->to_dst_file = NULL;
+
+        error_report("Detected IO failure for postcopy. "
+                     "Migration paused.");
+
+        /*
+         * We wait until things fixed up. Then someone will setup the
+         * status back for us.
+         */
+        while (s->state == MIGRATION_STATUS_POSTCOPY_PAUSED) {
+            qemu_sem_wait(&s->postcopy_pause_sem);
+        }
+
+        if (s->state == MIGRATION_STATUS_POSTCOPY_RECOVER) {
+            /* Woken up by a recover procedure. Give it a shot */
+
+            /*
+             * Firstly, let's wake up the return path now, with a new
+             * return path channel.
+             */
+            qemu_sem_post(&s->postcopy_pause_rp_sem);
+
+            /* Do the resume logic */
+            if (postcopy_do_resume(s) == 0) {
+                /* Let's continue! */
+                trace_postcopy_pause_continued();
+                return MIG_THR_ERR_RECOVERED;
+            } else {
+                /*
+                 * Something wrong happened during the recovery, let's
+                 * pause again. Pause is always better than throwing
+                 * data away.
+                 */
+                continue;
+            }
+        } else {
+            /* This is not right... Time to quit. */
+            return MIG_THR_ERR_FATAL;
+        }
+    }
+}
+
+static MigThrError migration_detect_error(MigrationState *s)
+{
+    int ret;
+
+    /* Try to detect any file errors */
+    ret = qemu_file_get_error(s->to_dst_file);
+
+    if (!ret) {
+        /* Everything is fine */
+        return MIG_THR_ERR_NONE;
+    }
+
+    if (s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE && ret == -EIO) {
+        /*
+         * For postcopy, we allow the network to be down for a
+         * while. After that, it can be continued by a
+         * recovery phase.
+         */
+        return postcopy_pause(s);
+    } else {
+        /*
+         * For precopy (or postcopy with error outside IO), we fail
+         * with no time.
+         */
+        migrate_set_state(&s->state, s->state, MIGRATION_STATUS_FAILED);
+        trace_migration_thread_file_err();
+
+        /* Time to stop the migration, now. */
+        return MIG_THR_ERR_FATAL;
+    }
+}
+
 static void migration_calculate_complete(MigrationState *s)
 {
     uint64_t bytes = qemu_ftell(s->to_dst_file);
@@ -2215,20 +2656,20 @@ typedef enum {
  */
 static MigIterateState migration_iteration_run(MigrationState *s)
 {
-    uint64_t pending_size, pend_post, pend_nonpost;
+    uint64_t pending_size, pend_pre, pend_compat, pend_post;
     bool in_postcopy = s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE;
 
-    qemu_savevm_state_pending(s->to_dst_file, s->threshold_size,
-                              &pend_nonpost, &pend_post);
-    pending_size = pend_nonpost + pend_post;
+    qemu_savevm_state_pending(s->to_dst_file, s->threshold_size, &pend_pre,
+                              &pend_compat, &pend_post);
+    pending_size = pend_pre + pend_compat + pend_post;
 
     trace_migrate_pending(pending_size, s->threshold_size,
-                          pend_post, pend_nonpost);
+                          pend_pre, pend_compat, pend_post);
 
     if (pending_size && pending_size >= s->threshold_size) {
         /* Still a significant amount to transfer */
         if (migrate_postcopy() && !in_postcopy &&
-            pend_nonpost <= s->threshold_size &&
+            pend_pre <= s->threshold_size &&
             atomic_read(&s->start_postcopy)) {
             if (postcopy_start(s)) {
                 error_report("%s: postcopy failed to start", __func__);
@@ -2303,6 +2744,7 @@ static void *migration_thread(void *opaque)
 {
     MigrationState *s = opaque;
     int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST);
+    MigThrError thr_error;
 
     rcu_register_thread();
 
@@ -2352,13 +2794,22 @@ static void *migration_thread(void *opaque)
             }
         }
 
-        if (qemu_file_get_error(s->to_dst_file)) {
-            if (migration_is_setup_or_active(s->state)) {
-                migrate_set_state(&s->state, s->state,
-                                  MIGRATION_STATUS_FAILED);
-            }
-            trace_migration_thread_file_err();
+        /*
+         * Try to detect any kind of failures, and see whether we
+         * should stop the migration now.
+         */
+        thr_error = migration_detect_error(s);
+        if (thr_error == MIG_THR_ERR_FATAL) {
+            /* Stop migration */
             break;
+        } else if (thr_error == MIG_THR_ERR_RECOVERED) {
+            /*
+             * Just recovered from a e.g. network failure, reset all
+             * the local variables. This is important to avoid
+             * breaking transferred_bytes and bandwidth calculation
+             */
+            s->iteration_start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+            s->iteration_initial_bytes = 0;
         }
 
         current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
@@ -2380,6 +2831,9 @@ static void *migration_thread(void *opaque)
 
 void migrate_fd_connect(MigrationState *s, Error *error_in)
 {
+    int64_t rate_limit;
+    bool resume = s->state == MIGRATION_STATUS_POSTCOPY_PAUSED;
+
     s->expected_downtime = s->parameters.downtime_limit;
     s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s);
     if (error_in) {
@@ -2388,12 +2842,21 @@ void migrate_fd_connect(MigrationState *s, Error *error_in)
         return;
     }
 
-    qemu_file_set_blocking(s->to_dst_file, true);
-    qemu_file_set_rate_limit(s->to_dst_file,
-                             s->parameters.max_bandwidth / XFER_LIMIT_RATIO);
+    if (resume) {
+        /* This is a resumed migration */
+        rate_limit = INT64_MAX;
+    } else {
+        /* This is a fresh new migration */
+        rate_limit = s->parameters.max_bandwidth / XFER_LIMIT_RATIO;
+        s->expected_downtime = s->parameters.downtime_limit;
+        s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s);
 
-    /* Notify before starting migration thread */
-    notifier_list_notify(&migration_state_notifiers, s);
+        /* Notify before starting migration thread */
+        notifier_list_notify(&migration_state_notifiers, s);
+    }
+
+    qemu_file_set_rate_limit(s->to_dst_file, rate_limit);
+    qemu_file_set_blocking(s->to_dst_file, true);
 
     /*
      * Open the return path. For postcopy, it is used exclusively. For
@@ -2401,15 +2864,22 @@ void migrate_fd_connect(MigrationState *s, Error *error_in)
      * QEMU uses the return path.
      */
     if (migrate_postcopy_ram() || migrate_use_return_path()) {
-        if (open_return_path_on_source(s)) {
+        if (open_return_path_on_source(s, !resume)) {
             error_report("Unable to open return-path for postcopy");
-            migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
-                              MIGRATION_STATUS_FAILED);
+            migrate_set_state(&s->state, s->state, MIGRATION_STATUS_FAILED);
             migrate_fd_cleanup(s);
             return;
         }
     }
 
+    if (resume) {
+        /* Wakeup the main migration thread to do the recovery */
+        migrate_set_state(&s->state, MIGRATION_STATUS_POSTCOPY_PAUSED,
+                          MIGRATION_STATUS_POSTCOPY_RECOVER);
+        qemu_sem_post(&s->postcopy_pause_sem);
+        return;
+    }
+
     if (multifd_save_setup() != 0) {
         migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
                           MIGRATION_STATUS_FAILED);
@@ -2516,6 +2986,10 @@ static void migration_instance_finalize(Object *obj)
     g_free(params->tls_hostname);
     g_free(params->tls_creds);
     qemu_sem_destroy(&ms->pause_sem);
+    qemu_sem_destroy(&ms->postcopy_pause_sem);
+    qemu_sem_destroy(&ms->postcopy_pause_rp_sem);
+    qemu_sem_destroy(&ms->rp_state.rp_sem);
+    error_free(ms->error);
 }
 
 static void migration_instance_init(Object *obj)
@@ -2544,6 +3018,10 @@ static void migration_instance_init(Object *obj)
     params->has_x_multifd_channels = true;
     params->has_x_multifd_page_count = true;
     params->has_xbzrle_cache_size = true;
+
+    qemu_sem_init(&ms->postcopy_pause_sem, 0);
+    qemu_sem_init(&ms->postcopy_pause_rp_sem, 0);
+    qemu_sem_init(&ms->rp_state.rp_sem, 0);
 }
 
 /*
This page took 0.055562 seconds and 4 git commands to generate.