fd_start_incoming_migration(p, errp);
#endif
else {
- error_setg(errp, "unknown migration protocol: %s\n", uri);
+ error_setg(errp, "unknown migration protocol: %s", uri);
}
}
int ret;
ret = qemu_loadvm_state(f);
- qemu_set_fd_handler(qemu_get_fd(f), NULL, NULL, NULL);
qemu_fclose(f);
if (ret < 0) {
fprintf(stderr, "load of migration failed\n");
}
}
-static void enter_migration_coroutine(void *opaque)
-{
- Coroutine *co = opaque;
- qemu_coroutine_enter(co, NULL);
-}
-
void process_incoming_migration(QEMUFile *f)
{
Coroutine *co = qemu_coroutine_create(process_incoming_migration_co);
assert(fd != -1);
socket_set_nonblock(fd);
- qemu_set_fd_handler(fd, enter_migration_coroutine, NULL, co);
qemu_coroutine_enter(co, f);
}
notifier_list_notify(&migration_state_notifiers, s);
}
-ssize_t migrate_fd_put_buffer(MigrationState *s, const void *data,
- size_t size)
+static ssize_t migrate_fd_put_buffer(MigrationState *s, const void *data,
+ size_t size)
{
ssize_t ret;
s->state = MIG_STATE_CANCELLED;
notifier_list_notify(&migration_state_notifiers, s);
- qemu_savevm_state_cancel(s->file);
+ qemu_savevm_state_cancel();
migrate_fd_cleanup(s);
}
/* migration thread support */
-typedef struct QEMUFileBuffered {
- MigrationState *migration_state;
- QEMUFile *file;
- size_t bytes_xfer;
- size_t xfer_limit;
- uint8_t *buffer;
- size_t buffer_size;
- size_t buffer_capacity;
- QemuThread thread;
-} QEMUFileBuffered;
-static ssize_t buffered_flush(QEMUFileBuffered *s)
+static ssize_t buffered_flush(MigrationState *s)
{
size_t offset = 0;
ssize_t ret = 0;
while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
size_t to_send = MIN(s->buffer_size - offset, s->xfer_limit - s->bytes_xfer);
- ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset,
- to_send);
+ ret = migrate_fd_put_buffer(s, s->buffer + offset, to_send);
if (ret <= 0) {
DPRINTF("error flushing data, %zd\n", ret);
break;
static int buffered_put_buffer(void *opaque, const uint8_t *buf,
int64_t pos, int size)
{
- QEMUFileBuffered *s = opaque;
+ MigrationState *s = opaque;
ssize_t error;
DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
static int buffered_close(void *opaque)
{
- QEMUFileBuffered *s = opaque;
+ MigrationState *s = opaque;
ssize_t ret = 0;
int ret2;
}
}
- ret2 = migrate_fd_close(s->migration_state);
+ ret2 = migrate_fd_close(s);
if (ret >= 0) {
ret = ret2;
}
- ret = migrate_fd_close(s->migration_state);
- s->migration_state->complete = true;
+ s->complete = true;
return ret;
}
static int buffered_get_fd(void *opaque)
{
- QEMUFileBuffered *s = opaque;
+ MigrationState *s = opaque;
- return s->migration_state->fd;
+ return s->fd;
}
/*
*/
static int buffered_rate_limit(void *opaque)
{
- QEMUFileBuffered *s = opaque;
+ MigrationState *s = opaque;
int ret;
ret = qemu_file_get_error(s->file);
return ret;
}
- if (s->bytes_xfer > s->xfer_limit) {
+ if (s->bytes_xfer >= s->xfer_limit) {
return 1;
}
static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
{
- QEMUFileBuffered *s = opaque;
+ MigrationState *s = opaque;
if (qemu_file_get_error(s->file)) {
goto out;
}
new_rate = SIZE_MAX;
}
- s->xfer_limit = new_rate / 10;
+ s->xfer_limit = new_rate / XFER_LIMIT_RATIO;
out:
return s->xfer_limit;
static int64_t buffered_get_rate_limit(void *opaque)
{
- QEMUFileBuffered *s = opaque;
+ MigrationState *s = opaque;
return s->xfer_limit;
}
-static bool migrate_fd_put_ready(MigrationState *s, uint64_t max_size)
+static void *buffered_file_thread(void *opaque)
{
- int ret;
- uint64_t pending_size;
+ MigrationState *s = opaque;
+ int64_t initial_time = qemu_get_clock_ms(rt_clock);
+ int64_t max_size = 0;
bool last_round = false;
+ int ret;
qemu_mutex_lock_iothread();
- if (s->state != MIG_STATE_ACTIVE) {
- DPRINTF("put_ready returning because of non-active state\n");
+ DPRINTF("beginning savevm\n");
+ ret = qemu_savevm_state_begin(s->file, &s->params);
+ if (ret < 0) {
+ DPRINTF("failed, %d\n", ret);
qemu_mutex_unlock_iothread();
- return false;
- }
- if (s->first_time) {
- s->first_time = false;
- DPRINTF("beginning savevm\n");
- ret = qemu_savevm_state_begin(s->file, &s->params);
- if (ret < 0) {
- DPRINTF("failed, %d\n", ret);
- migrate_fd_error(s);
- qemu_mutex_unlock_iothread();
- return false;
- }
- }
-
- DPRINTF("iterate\n");
- pending_size = qemu_savevm_state_pending(s->file, max_size);
- DPRINTF("pending size %lu max %lu\n", pending_size, max_size);
- if (pending_size >= max_size) {
- ret = qemu_savevm_state_iterate(s->file);
- if (ret < 0) {
- migrate_fd_error(s);
- }
- } else {
- int old_vm_running = runstate_is_running();
- int64_t start_time, end_time;
-
- DPRINTF("done iterating\n");
- start_time = qemu_get_clock_ms(rt_clock);
- qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER);
- if (old_vm_running) {
- vm_stop(RUN_STATE_FINISH_MIGRATE);
- } else {
- vm_stop_force_state(RUN_STATE_FINISH_MIGRATE);
- }
-
- if (qemu_savevm_state_complete(s->file) < 0) {
- migrate_fd_error(s);
- } else {
- migrate_fd_completed(s);
- }
- end_time = qemu_get_clock_ms(rt_clock);
- s->total_time = end_time - s->total_time;
- s->downtime = end_time - start_time;
- if (s->state != MIG_STATE_COMPLETED) {
- if (old_vm_running) {
- vm_start();
- }
- }
- last_round = true;
+ goto out;
}
qemu_mutex_unlock_iothread();
- return last_round;
-}
-
-static void *buffered_file_thread(void *opaque)
-{
- QEMUFileBuffered *s = opaque;
- int64_t initial_time = qemu_get_clock_ms(rt_clock);
- int64_t max_size = 0;
- bool last_round = false;
-
while (true) {
int64_t current_time = qemu_get_clock_ms(rt_clock);
+ uint64_t pending_size;
- if (s->migration_state->complete) {
+ qemu_mutex_lock_iothread();
+ if (s->state != MIG_STATE_ACTIVE) {
+ DPRINTF("put_ready returning because of non-active state\n");
+ qemu_mutex_unlock_iothread();
break;
}
+ if (s->complete) {
+ qemu_mutex_unlock_iothread();
+ break;
+ }
+ if (s->bytes_xfer < s->xfer_limit) {
+ DPRINTF("iterate\n");
+ pending_size = qemu_savevm_state_pending(s->file, max_size);
+ DPRINTF("pending size %lu max %lu\n", pending_size, max_size);
+ if (pending_size && pending_size >= max_size) {
+ ret = qemu_savevm_state_iterate(s->file);
+ if (ret < 0) {
+ qemu_mutex_unlock_iothread();
+ break;
+ }
+ } else {
+ int old_vm_running = runstate_is_running();
+ int64_t start_time, end_time;
+
+ DPRINTF("done iterating\n");
+ start_time = qemu_get_clock_ms(rt_clock);
+ qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER);
+ if (old_vm_running) {
+ vm_stop(RUN_STATE_FINISH_MIGRATE);
+ } else {
+ vm_stop_force_state(RUN_STATE_FINISH_MIGRATE);
+ }
+ ret = qemu_savevm_state_complete(s->file);
+ if (ret < 0) {
+ qemu_mutex_unlock_iothread();
+ break;
+ } else {
+ migrate_fd_completed(s);
+ }
+ end_time = qemu_get_clock_ms(rt_clock);
+ s->total_time = end_time - s->total_time;
+ s->downtime = end_time - start_time;
+ if (s->state != MIG_STATE_COMPLETED) {
+ if (old_vm_running) {
+ vm_start();
+ }
+ }
+ last_round = true;
+ }
+ }
+ qemu_mutex_unlock_iothread();
if (current_time >= initial_time + BUFFER_DELAY) {
uint64_t transferred_bytes = s->bytes_xfer;
uint64_t time_spent = current_time - initial_time;
/* usleep expects microseconds */
g_usleep((initial_time + BUFFER_DELAY - current_time)*1000);
}
- if (buffered_flush(s) < 0) {
+ ret = buffered_flush(s);
+ if (ret < 0) {
break;
}
-
- DPRINTF("file is ready\n");
- if (s->bytes_xfer < s->xfer_limit) {
- DPRINTF("notifying client\n");
- last_round = migrate_fd_put_ready(s->migration_state, max_size);
- }
}
+out:
+ if (ret < 0) {
+ migrate_fd_error(s);
+ }
g_free(s->buffer);
- g_free(s);
return NULL;
}
.set_rate_limit = buffered_set_rate_limit,
};
-void migrate_fd_connect(MigrationState *migration_state)
+void migrate_fd_connect(MigrationState *s)
{
- QEMUFileBuffered *s;
-
- migration_state->state = MIG_STATE_ACTIVE;
- migration_state->first_time = true;
- s = g_malloc0(sizeof(*s));
+ s->state = MIG_STATE_ACTIVE;
+ s->bytes_xfer = 0;
+ s->buffer = NULL;
+ s->buffer_size = 0;
+ s->buffer_capacity = 0;
- s->migration_state = migration_state;
- s->xfer_limit = s->migration_state->bandwidth_limit / XFER_LIMIT_RATIO;
- s->migration_state->complete = false;
+ s->xfer_limit = s->bandwidth_limit / XFER_LIMIT_RATIO;
+ s->complete = false;
s->file = qemu_fopen_ops(s, &buffered_file_ops);
- migration_state->file = s->file;
-
qemu_thread_create(&s->thread, buffered_file_thread, s,
QEMU_THREAD_DETACHED);
notifier_list_notify(&migration_state_notifiers, s);