]> Git Repo - qemu.git/blobdiff - migration/migration.c
migration: size_t'ify some of qemu-file
[qemu.git] / migration / migration.c
index bc424907f3c001f1509c8dcbe9adc3654146d2dc..46bb410ef398662f98f8daf710003f93b09c850c 100644 (file)
  */
 
 #include "qemu-common.h"
+#include "qemu/error-report.h"
 #include "qemu/main-loop.h"
 #include "migration/migration.h"
-#include "monitor/monitor.h"
 #include "migration/qemu-file.h"
 #include "sysemu/sysemu.h"
 #include "block/block.h"
+#include "qapi/qmp/qerror.h"
 #include "qemu/sockets.h"
+#include "qemu/rcu.h"
 #include "migration/block.h"
 #include "qemu/thread.h"
 #include "qmp-commands.h"
 #include "trace.h"
+#include "qapi/util.h"
+#include "qapi-event.h"
 
 #define MAX_THROTTLE  (32 << 20)      /* Migration speed throttling */
 
 #define BUFFER_DELAY     100
 #define XFER_LIMIT_RATIO (1000 / BUFFER_DELAY)
 
+/* Default compression thread count */
+#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
+/* Default decompression thread count, usually decompression is at
+ * least 4 times as fast as compression.*/
+#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
+/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
+#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
+
 /* Migration XBZRLE default cache size */
 #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024)
 
@@ -45,6 +57,7 @@ static bool deferred_incoming;
    migrations at once.  For now we don't need to add
    dynamic creation of migration */
 
+/* For outgoing */
 MigrationState *migrate_get_current(void)
 {
     static MigrationState current_migration = {
@@ -52,11 +65,168 @@ MigrationState *migrate_get_current(void)
         .bandwidth_limit = MAX_THROTTLE,
         .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
         .mbps = -1,
+        .parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] =
+                DEFAULT_MIGRATE_COMPRESS_LEVEL,
+        .parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] =
+                DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
+        .parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
+                DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
     };
 
     return &current_migration;
 }
 
+/* For incoming */
+static MigrationIncomingState *mis_current;
+
+MigrationIncomingState *migration_incoming_get_current(void)
+{
+    return mis_current;
+}
+
+MigrationIncomingState *migration_incoming_state_new(QEMUFile* f)
+{
+    mis_current = g_malloc0(sizeof(MigrationIncomingState));
+    mis_current->file = f;
+    QLIST_INIT(&mis_current->loadvm_handlers);
+
+    return mis_current;
+}
+
+void migration_incoming_state_destroy(void)
+{
+    loadvm_free_handlers(mis_current);
+    g_free(mis_current);
+    mis_current = NULL;
+}
+
+
+typedef struct {
+    bool optional;
+    uint32_t size;
+    uint8_t runstate[100];
+    RunState state;
+    bool received;
+} GlobalState;
+
+static GlobalState global_state;
+
+int global_state_store(void)
+{
+    if (!runstate_store((char *)global_state.runstate,
+                        sizeof(global_state.runstate))) {
+        error_report("runstate name too big: %s", global_state.runstate);
+        trace_migrate_state_too_big();
+        return -EINVAL;
+    }
+    return 0;
+}
+
+void global_state_store_running(void)
+{
+    const char *state = RunState_lookup[RUN_STATE_RUNNING];
+    strncpy((char *)global_state.runstate,
+           state, sizeof(global_state.runstate));
+}
+
+static bool global_state_received(void)
+{
+    return global_state.received;
+}
+
+static RunState global_state_get_runstate(void)
+{
+    return global_state.state;
+}
+
+void global_state_set_optional(void)
+{
+    global_state.optional = true;
+}
+
+static bool global_state_needed(void *opaque)
+{
+    GlobalState *s = opaque;
+    char *runstate = (char *)s->runstate;
+
+    /* If it is not optional, it is mandatory */
+
+    if (s->optional == false) {
+        return true;
+    }
+
+    /* If state is running or paused, it is not needed */
+
+    if (strcmp(runstate, "running") == 0 ||
+        strcmp(runstate, "paused") == 0) {
+        return false;
+    }
+
+    /* for any other state it is needed */
+    return true;
+}
+
+static int global_state_post_load(void *opaque, int version_id)
+{
+    GlobalState *s = opaque;
+    Error *local_err = NULL;
+    int r;
+    char *runstate = (char *)s->runstate;
+
+    s->received = true;
+    trace_migrate_global_state_post_load(runstate);
+
+    r = qapi_enum_parse(RunState_lookup, runstate, RUN_STATE_MAX,
+                                -1, &local_err);
+
+    if (r == -1) {
+        if (local_err) {
+            error_report_err(local_err);
+        }
+        return -EINVAL;
+    }
+    s->state = r;
+
+    return 0;
+}
+
+static void global_state_pre_save(void *opaque)
+{
+    GlobalState *s = opaque;
+
+    trace_migrate_global_state_pre_save((char *)s->runstate);
+    s->size = strlen((char *)s->runstate) + 1;
+}
+
+static const VMStateDescription vmstate_globalstate = {
+    .name = "globalstate",
+    .version_id = 1,
+    .minimum_version_id = 1,
+    .post_load = global_state_post_load,
+    .pre_save = global_state_pre_save,
+    .needed = global_state_needed,
+    .fields = (VMStateField[]) {
+        VMSTATE_UINT32(size, GlobalState),
+        VMSTATE_BUFFER(runstate, GlobalState),
+        VMSTATE_END_OF_LIST()
+    },
+};
+
+void register_global_state(void)
+{
+    /* We would use it independently that we receive it */
+    strcpy((char *)&global_state.runstate, "");
+    global_state.received = false;
+    vmstate_register(NULL, 0, &vmstate_globalstate, &global_state);
+}
+
+static void migrate_generate_event(int new_state)
+{
+    if (migrate_use_events()) {
+        qapi_event_send_migration(new_state, &error_abort);
+    }
+}
+
 /*
  * Called on -incoming with a defer: uri.
  * The migration can be started later after any parameters have been
@@ -74,6 +244,7 @@ void qemu_start_incoming_migration(const char *uri, Error **errp)
 {
     const char *p;
 
+    qapi_event_send_migration(MIGRATION_STATUS_SETUP, &error_abort);
     if (!strcmp(uri, "defer")) {
         deferred_incoming_migration(errp);
     } else if (strstart(uri, "tcp:", &p)) {
@@ -101,27 +272,46 @@ static void process_incoming_migration_co(void *opaque)
     Error *local_err = NULL;
     int ret;
 
+    migration_incoming_state_new(f);
+    migrate_generate_event(MIGRATION_STATUS_ACTIVE);
     ret = qemu_loadvm_state(f);
+
     qemu_fclose(f);
     free_xbzrle_decoded_buf();
+    migration_incoming_state_destroy();
+
     if (ret < 0) {
+        migrate_generate_event(MIGRATION_STATUS_FAILED);
         error_report("load of migration failed: %s", strerror(-ret));
+        migrate_decompress_threads_join();
         exit(EXIT_FAILURE);
     }
+    migrate_generate_event(MIGRATION_STATUS_COMPLETED);
     qemu_announce_self();
 
     /* Make sure all file formats flush their mutable metadata */
     bdrv_invalidate_cache_all(&local_err);
     if (local_err) {
         error_report_err(local_err);
+        migrate_decompress_threads_join();
         exit(EXIT_FAILURE);
     }
 
-    if (autostart) {
-        vm_start();
+    /* If global state section was not received or we are in running
+       state, we need to obey autostart. Any other state is set with
+       runstate_set. */
+
+    if (!global_state_received() ||
+        global_state_get_runstate() == RUN_STATE_RUNNING) {
+        if (autostart) {
+            vm_start();
+        } else {
+            runstate_set(RUN_STATE_PAUSED);
+        }
     } else {
-        runstate_set(RUN_STATE_PAUSED);
+        runstate_set(global_state_get_runstate());
     }
+    migrate_decompress_threads_join();
 }
 
 void process_incoming_migration(QEMUFile *f)
@@ -130,6 +320,7 @@ void process_incoming_migration(QEMUFile *f)
     int fd = qemu_get_fd(f);
 
     assert(fd != -1);
+    migrate_decompress_threads_create();
     qemu_set_nonblock(fd);
     qemu_coroutine_enter(co, f);
 }
@@ -170,6 +361,21 @@ MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp)
     return head;
 }
 
+MigrationParameters *qmp_query_migrate_parameters(Error **errp)
+{
+    MigrationParameters *params;
+    MigrationState *s = migrate_get_current();
+
+    params = g_malloc0(sizeof(*params));
+    params->compress_level = s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL];
+    params->compress_threads =
+            s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS];
+    params->decompress_threads =
+            s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS];
+
+    return params;
+}
+
 static void get_xbzrle_cache_stats(MigrationInfo *info)
 {
     if (migrate_use_xbzrle()) {
@@ -274,7 +480,7 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
 
     if (s->state == MIGRATION_STATUS_ACTIVE ||
         s->state == MIGRATION_STATUS_SETUP) {
-        error_set(errp, QERR_MIGRATION_ACTIVE);
+        error_setg(errp, QERR_MIGRATION_ACTIVE);
         return;
     }
 
@@ -283,12 +489,54 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
     }
 }
 
+void qmp_migrate_set_parameters(bool has_compress_level,
+                                int64_t compress_level,
+                                bool has_compress_threads,
+                                int64_t compress_threads,
+                                bool has_decompress_threads,
+                                int64_t decompress_threads, Error **errp)
+{
+    MigrationState *s = migrate_get_current();
+
+    if (has_compress_level && (compress_level < 0 || compress_level > 9)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "compress_level",
+                   "is invalid, it should be in the range of 0 to 9");
+        return;
+    }
+    if (has_compress_threads &&
+            (compress_threads < 1 || compress_threads > 255)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "compress_threads",
+                   "is invalid, it should be in the range of 1 to 255");
+        return;
+    }
+    if (has_decompress_threads &&
+            (decompress_threads < 1 || decompress_threads > 255)) {
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+                   "decompress_threads",
+                   "is invalid, it should be in the range of 1 to 255");
+        return;
+    }
+
+    if (has_compress_level) {
+        s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] = compress_level;
+    }
+    if (has_compress_threads) {
+        s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] = compress_threads;
+    }
+    if (has_decompress_threads) {
+        s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
+                                                    decompress_threads;
+    }
+}
+
 /* shared migration helpers */
 
 static void migrate_set_state(MigrationState *s, int old_state, int new_state)
 {
-    if (atomic_cmpxchg(&s->state, old_state, new_state) == new_state) {
+    if (atomic_cmpxchg(&s->state, old_state, new_state) == old_state) {
         trace_migrate_set_state(new_state);
+        migrate_generate_event(new_state);
     }
 }
 
@@ -305,6 +553,7 @@ static void migrate_fd_cleanup(void *opaque)
         qemu_thread_join(&s->thread);
         qemu_mutex_lock_iothread();
 
+        migrate_compress_threads_join();
         qemu_fclose(s->file);
         s->file = NULL;
     }
@@ -326,8 +575,7 @@ void migrate_fd_error(MigrationState *s)
 {
     trace_migrate_fd_error();
     assert(s->file == NULL);
-    s->state = MIGRATION_STATUS_FAILED;
-    trace_migrate_set_state(MIGRATION_STATUS_FAILED);
+    migrate_set_state(s, MIGRATION_STATUS_SETUP, MIGRATION_STATUS_FAILED);
     notifier_list_notify(&migration_state_notifiers, s);
 }
 
@@ -390,6 +638,11 @@ static MigrationState *migrate_init(const MigrationParams *params)
     int64_t bandwidth_limit = s->bandwidth_limit;
     bool enabled_capabilities[MIGRATION_CAPABILITY_MAX];
     int64_t xbzrle_cache_size = s->xbzrle_cache_size;
+    int compress_level = s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL];
+    int compress_thread_count =
+            s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS];
+    int decompress_thread_count =
+            s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS];
 
     memcpy(enabled_capabilities, s->enabled_capabilities,
            sizeof(enabled_capabilities));
@@ -400,9 +653,13 @@ static MigrationState *migrate_init(const MigrationParams *params)
            sizeof(enabled_capabilities));
     s->xbzrle_cache_size = xbzrle_cache_size;
 
+    s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL] = compress_level;
+    s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS] =
+               compress_thread_count;
+    s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS] =
+               decompress_thread_count;
     s->bandwidth_limit = bandwidth_limit;
-    s->state = MIGRATION_STATUS_SETUP;
-    trace_migrate_set_state(MIGRATION_STATUS_SETUP);
+    migrate_set_state(s, MIGRATION_STATUS_NONE, MIGRATION_STATUS_SETUP);
 
     s->total_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
     return s;
@@ -458,10 +715,9 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
     if (s->state == MIGRATION_STATUS_ACTIVE ||
         s->state == MIGRATION_STATUS_SETUP ||
         s->state == MIGRATION_STATUS_CANCELLING) {
-        error_set(errp, QERR_MIGRATION_ACTIVE);
+        error_setg(errp, QERR_MIGRATION_ACTIVE);
         return;
     }
-
     if (runstate_check(RUN_STATE_INMIGRATE)) {
         error_setg(errp, "Guest is waiting for an incoming migration");
         return;
@@ -476,6 +732,12 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
         return;
     }
 
+    /* We are starting a new migration, so we want to start in a clean
+       state.  This change is only needed if previous migration
+       failed/was cancelled.  We don't use migrate_set_state() because
+       we are setting the initial state, not changing it. */
+    s->state = MIGRATION_STATUS_NONE;
+
     s = migrate_init(&params);
 
     if (strstart(uri, "tcp:", &p)) {
@@ -493,8 +755,9 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk,
         fd_start_outgoing_migration(s, p, &local_err);
 #endif
     } else {
-        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "uri", "a valid migration protocol");
-        s->state = MIGRATION_STATUS_FAILED;
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "uri",
+                   "a valid migration protocol");
+        migrate_set_state(s, MIGRATION_STATUS_SETUP, MIGRATION_STATUS_FAILED);
         return;
     }
 
@@ -517,22 +780,22 @@ void qmp_migrate_set_cache_size(int64_t value, Error **errp)
 
     /* Check for truncation */
     if (value != (size_t)value) {
-        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
-                  "exceeding address space");
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
+                   "exceeding address space");
         return;
     }
 
     /* Cache should not be larger than guest ram size */
     if (value > ram_bytes_total()) {
-        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
-                  "exceeds guest ram size ");
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
+                   "exceeds guest ram size ");
         return;
     }
 
     new_size = xbzrle_cache_resize(value);
     if (new_size < 0) {
-        error_set(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
-                  "is smaller than page size");
+        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
+                   "is smaller than page size");
         return;
     }
 
@@ -587,6 +850,51 @@ bool migrate_zero_blocks(void)
     return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
 }
 
+bool migrate_use_compression(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
+}
+
+int migrate_compress_level(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters[MIGRATION_PARAMETER_COMPRESS_LEVEL];
+}
+
+int migrate_compress_threads(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters[MIGRATION_PARAMETER_COMPRESS_THREADS];
+}
+
+int migrate_decompress_threads(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->parameters[MIGRATION_PARAMETER_DECOMPRESS_THREADS];
+}
+
+bool migrate_use_events(void)
+{
+    MigrationState *s;
+
+    s = migrate_get_current();
+
+    return s->enabled_capabilities[MIGRATION_CAPABILITY_EVENTS];
+}
+
 int migrate_use_xbzrle(void)
 {
     MigrationState *s;
@@ -605,6 +913,50 @@ int64_t migrate_xbzrle_cache_size(void)
     return s->xbzrle_cache_size;
 }
 
+/**
+ * migration_completion: Used by migration_thread when there's not much left.
+ *   The caller 'breaks' the loop when this returns.
+ *
+ * @s: Current migration state
+ * @*old_vm_running: Pointer to old_vm_running flag
+ * @*start_time: Pointer to time to update
+ */
+static void migration_completion(MigrationState *s, bool *old_vm_running,
+                                 int64_t *start_time)
+{
+    int ret;
+
+    qemu_mutex_lock_iothread();
+    *start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
+    qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER);
+    *old_vm_running = runstate_is_running();
+
+    ret = global_state_store();
+    if (!ret) {
+        ret = vm_stop_force_state(RUN_STATE_FINISH_MIGRATE);
+        if (ret >= 0) {
+            qemu_file_set_rate_limit(s->file, INT64_MAX);
+            qemu_savevm_state_complete(s->file);
+        }
+    }
+    qemu_mutex_unlock_iothread();
+
+    if (ret < 0) {
+        goto fail;
+    }
+
+    if (qemu_file_get_error(s->file)) {
+        trace_migration_completion_file_err();
+        goto fail;
+    }
+
+    migrate_set_state(s, MIGRATION_STATUS_ACTIVE, MIGRATION_STATUS_COMPLETED);
+    return;
+
+fail:
+    migrate_set_state(s, MIGRATION_STATUS_ACTIVE, MIGRATION_STATUS_FAILED);
+}
+
 /* migration thread support */
 
 static void *migration_thread(void *opaque)
@@ -617,6 +969,9 @@ static void *migration_thread(void *opaque)
     int64_t start_time = initial_time;
     bool old_vm_running = false;
 
+    rcu_register_thread();
+
+    qemu_savevm_state_header(s->file);
     qemu_savevm_state_begin(s->file, &s->params);
 
     s->setup_time = qemu_clock_get_ms(QEMU_CLOCK_HOST) - setup_start;
@@ -632,31 +987,9 @@ static void *migration_thread(void *opaque)
             if (pending_size && pending_size >= max_size) {
                 qemu_savevm_state_iterate(s->file);
             } else {
-                int ret;
-
-                qemu_mutex_lock_iothread();
-                start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
-                qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER);
-                old_vm_running = runstate_is_running();
-
-                ret = vm_stop_force_state(RUN_STATE_FINISH_MIGRATE);
-                if (ret >= 0) {
-                    qemu_file_set_rate_limit(s->file, INT64_MAX);
-                    qemu_savevm_state_complete(s->file);
-                }
-                qemu_mutex_unlock_iothread();
-
-                if (ret < 0) {
-                    migrate_set_state(s, MIGRATION_STATUS_ACTIVE,
-                                      MIGRATION_STATUS_FAILED);
-                    break;
-                }
-
-                if (!qemu_file_get_error(s->file)) {
-                    migrate_set_state(s, MIGRATION_STATUS_ACTIVE,
-                                      MIGRATION_STATUS_COMPLETED);
-                    break;
-                }
+                trace_migration_thread_low_pending(pending_size);
+                migration_completion(s, &old_vm_running, &start_time);
+                break;
             }
         }
 
@@ -712,14 +1045,12 @@ static void *migration_thread(void *opaque)
     qemu_bh_schedule(s->cleanup_bh);
     qemu_mutex_unlock_iothread();
 
+    rcu_unregister_thread();
     return NULL;
 }
 
 void migrate_fd_connect(MigrationState *s)
 {
-    s->state = MIGRATION_STATUS_SETUP;
-    trace_migrate_set_state(MIGRATION_STATUS_SETUP);
-
     /* This is a best 1st approximation. ns to ms */
     s->expected_downtime = max_downtime/1000000;
     s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s);
@@ -730,6 +1061,7 @@ void migrate_fd_connect(MigrationState *s)
     /* Notify before starting migration thread */
     notifier_list_notify(&migration_state_notifiers, s);
 
+    migrate_compress_threads_create();
     qemu_thread_create(&s->thread, "migration", migration_thread, s,
                        QEMU_THREAD_JOINABLE);
 }
This page took 0.040559 seconds and 4 git commands to generate.