#include <stdint.h>
#include <stdarg.h>
#include <stdlib.h>
+#include <zlib.h>
#ifndef _WIN32
#include <sys/types.h>
#include <sys/mman.h>
#define RAM_SAVE_FLAG_CONTINUE 0x20
#define RAM_SAVE_FLAG_XBZRLE 0x40
/* 0x80 is reserved in migration.h start with 0x100 next */
+#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100
static struct defconfig_file {
const char *filename;
return acct_info.xbzrle_overflows;
}
-static size_t save_block_hdr(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
- int cont, int flag)
+/* This is the last block that we have visited serching for dirty pages
+ */
+static RAMBlock *last_seen_block;
+/* This is the last block from where we have sent data */
+static RAMBlock *last_sent_block;
+static ram_addr_t last_offset;
+static unsigned long *migration_bitmap;
+static uint64_t migration_dirty_pages;
+static uint32_t last_version;
+static bool ram_bulk_stage;
+
+struct CompressParam {
+ bool start;
+ bool done;
+ QEMUFile *file;
+ QemuMutex mutex;
+ QemuCond cond;
+ RAMBlock *block;
+ ram_addr_t offset;
+};
+typedef struct CompressParam CompressParam;
+
+struct DecompressParam {
+ bool start;
+ QemuMutex mutex;
+ QemuCond cond;
+ void *des;
+ uint8 *compbuf;
+ int len;
+};
+typedef struct DecompressParam DecompressParam;
+
+static CompressParam *comp_param;
+static QemuThread *compress_threads;
+/* comp_done_cond is used to wake up the migration thread when
+ * one of the compression threads has finished the compression.
+ * comp_done_lock is used to co-work with comp_done_cond.
+ */
+static QemuMutex *comp_done_lock;
+static QemuCond *comp_done_cond;
+/* The empty QEMUFileOps will be used by file in CompressParam */
+static const QEMUFileOps empty_ops = { };
+
+static bool compression_switch;
+static bool quit_comp_thread;
+static bool quit_decomp_thread;
+static DecompressParam *decomp_param;
+static QemuThread *decompress_threads;
+static uint8_t *compressed_data_buf;
+
+static int do_compress_ram_page(CompressParam *param);
+
+static void *do_data_compress(void *opaque)
+{
+ CompressParam *param = opaque;
+
+ while (!quit_comp_thread) {
+ qemu_mutex_lock(¶m->mutex);
+ /* Re-check the quit_comp_thread in case of
+ * terminate_compression_threads is called just before
+ * qemu_mutex_lock(¶m->mutex) and after
+ * while(!quit_comp_thread), re-check it here can make
+ * sure the compression thread terminate as expected.
+ */
+ while (!param->start && !quit_comp_thread) {
+ qemu_cond_wait(¶m->cond, ¶m->mutex);
+ }
+ if (!quit_comp_thread) {
+ do_compress_ram_page(param);
+ }
+ param->start = false;
+ qemu_mutex_unlock(¶m->mutex);
+
+ qemu_mutex_lock(comp_done_lock);
+ param->done = true;
+ qemu_cond_signal(comp_done_cond);
+ qemu_mutex_unlock(comp_done_lock);
+ }
+
+ return NULL;
+}
+
+static inline void terminate_compression_threads(void)
+{
+ int idx, thread_count;
+
+ thread_count = migrate_compress_threads();
+ quit_comp_thread = true;
+ for (idx = 0; idx < thread_count; idx++) {
+ qemu_mutex_lock(&comp_param[idx].mutex);
+ qemu_cond_signal(&comp_param[idx].cond);
+ qemu_mutex_unlock(&comp_param[idx].mutex);
+ }
+}
+
+void migrate_compress_threads_join(void)
+{
+ int i, thread_count;
+
+ if (!migrate_use_compression()) {
+ return;
+ }
+ terminate_compression_threads();
+ thread_count = migrate_compress_threads();
+ for (i = 0; i < thread_count; i++) {
+ qemu_thread_join(compress_threads + i);
+ qemu_fclose(comp_param[i].file);
+ qemu_mutex_destroy(&comp_param[i].mutex);
+ qemu_cond_destroy(&comp_param[i].cond);
+ }
+ qemu_mutex_destroy(comp_done_lock);
+ qemu_cond_destroy(comp_done_cond);
+ g_free(compress_threads);
+ g_free(comp_param);
+ g_free(comp_done_cond);
+ g_free(comp_done_lock);
+ compress_threads = NULL;
+ comp_param = NULL;
+ comp_done_cond = NULL;
+ comp_done_lock = NULL;
+}
+
+void migrate_compress_threads_create(void)
+{
+ int i, thread_count;
+
+ if (!migrate_use_compression()) {
+ return;
+ }
+ quit_comp_thread = false;
+ compression_switch = true;
+ thread_count = migrate_compress_threads();
+ compress_threads = g_new0(QemuThread, thread_count);
+ comp_param = g_new0(CompressParam, thread_count);
+ comp_done_cond = g_new0(QemuCond, 1);
+ comp_done_lock = g_new0(QemuMutex, 1);
+ qemu_cond_init(comp_done_cond);
+ qemu_mutex_init(comp_done_lock);
+ for (i = 0; i < thread_count; i++) {
+ /* com_param[i].file is just used as a dummy buffer to save data, set
+ * it's ops to empty.
+ */
+ comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
+ comp_param[i].done = true;
+ qemu_mutex_init(&comp_param[i].mutex);
+ qemu_cond_init(&comp_param[i].cond);
+ qemu_thread_create(compress_threads + i, "compress",
+ do_data_compress, comp_param + i,
+ QEMU_THREAD_JOINABLE);
+ }
+}
+
+/**
+ * save_page_header: Write page header to wire
+ *
+ * If this is the 1st block, it also writes the block identification
+ *
+ * Returns: Number of bytes written
+ *
+ * @f: QEMUFile where to send the data
+ * @block: block that contains the page we want to send
+ * @offset: offset inside the block for the page
+ * in the lower bits, it contains flags
+ */
+static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
{
size_t size;
- qemu_put_be64(f, offset | cont | flag);
+ qemu_put_be64(f, offset);
size = 8;
- if (!cont) {
+ if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
qemu_put_byte(f, strlen(block->idstr));
qemu_put_buffer(f, (uint8_t *)block->idstr,
strlen(block->idstr));
return size;
}
-/* This is the last block that we have visited serching for dirty pages
- */
-static RAMBlock *last_seen_block;
-/* This is the last block from where we have sent data */
-static RAMBlock *last_sent_block;
-static ram_addr_t last_offset;
-static unsigned long *migration_bitmap;
-static uint64_t migration_dirty_pages;
-static uint32_t last_version;
-static bool ram_bulk_stage;
-
/* Update the xbzrle cache to reflect a page that's been sent as all 0.
* The important thing is that a stale (not-yet-0'd) page be replaced
* by the new data.
#define ENCODING_FLAG_XBZRLE 0x1
+/**
+ * save_xbzrle_page: compress and send current page
+ *
+ * Returns: 1 means that we wrote the page
+ * 0 means that page is identical to the one already sent
+ * -1 means that xbzrle would be longer than normal
+ *
+ * @f: QEMUFile where to send the data
+ * @current_data:
+ * @current_addr:
+ * @block: block that contains the page we want to send
+ * @offset: offset inside the block for the page
+ * @last_stage: if we are at the completion stage
+ * @bytes_transferred: increase it with the number of transferred bytes
+ */
static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
ram_addr_t current_addr, RAMBlock *block,
- ram_addr_t offset, int cont, bool last_stage)
+ ram_addr_t offset, bool last_stage,
+ uint64_t *bytes_transferred)
{
- int encoded_len = 0, bytes_sent = -1;
+ int encoded_len = 0, bytes_xbzrle;
uint8_t *prev_cached_page;
if (!cache_is_cached(XBZRLE.cache, current_addr, bitmap_sync_count)) {
}
/* Send XBZRLE based compressed page */
- bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_XBZRLE);
+ bytes_xbzrle = save_page_header(f, block, offset | RAM_SAVE_FLAG_XBZRLE);
qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
qemu_put_be16(f, encoded_len);
qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
- bytes_sent += encoded_len + 1 + 2;
+ bytes_xbzrle += encoded_len + 1 + 2;
acct_info.xbzrle_pages++;
- acct_info.xbzrle_bytes += bytes_sent;
+ acct_info.xbzrle_bytes += bytes_xbzrle;
+ *bytes_transferred += bytes_xbzrle;
- return bytes_sent;
+ return 1;
}
static inline
static int64_t start_time;
static int64_t bytes_xfer_prev;
static int64_t num_dirty_pages_period;
+static uint64_t xbzrle_cache_miss_prev;
+static uint64_t iterations_prev;
static void migration_bitmap_sync_init(void)
{
start_time = 0;
bytes_xfer_prev = 0;
num_dirty_pages_period = 0;
+ xbzrle_cache_miss_prev = 0;
+ iterations_prev = 0;
}
/* Called with iothread lock held, to protect ram_list.dirty_memory[] */
MigrationState *s = migrate_get_current();
int64_t end_time;
int64_t bytes_xfer_now;
- static uint64_t xbzrle_cache_miss_prev;
- static uint64_t iterations_prev;
bitmap_sync_count++;
mig_throttle_on = false;
}
if (migrate_use_xbzrle()) {
- if (iterations_prev != 0) {
+ if (iterations_prev != acct_info.iterations) {
acct_info.xbzrle_cache_miss_rate =
(double)(acct_info.xbzrle_cache_miss -
xbzrle_cache_miss_prev) /
s->dirty_bytes_rate = s->dirty_pages_rate * TARGET_PAGE_SIZE;
start_time = end_time;
num_dirty_pages_period = 0;
- s->dirty_sync_count = bitmap_sync_count;
}
+ s->dirty_sync_count = bitmap_sync_count;
}
-/*
+/**
+ * save_zero_page: Send the zero page to the stream
+ *
+ * Returns: Number of pages written.
+ *
+ * @f: QEMUFile where to send the data
+ * @block: block that contains the page we want to send
+ * @offset: offset inside the block for the page
+ * @p: pointer to the page
+ * @bytes_transferred: increase it with the number of transferred bytes
+ */
+static int save_zero_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
+ uint8_t *p, uint64_t *bytes_transferred)
+{
+ int pages = -1;
+
+ if (is_zero_range(p, TARGET_PAGE_SIZE)) {
+ acct_info.dup_pages++;
+ *bytes_transferred += save_page_header(f, block,
+ offset | RAM_SAVE_FLAG_COMPRESS);
+ qemu_put_byte(f, 0);
+ *bytes_transferred += 1;
+ pages = 1;
+ }
+
+ return pages;
+}
+
+/**
* ram_save_page: Send the given page to the stream
*
- * Returns: Number of bytes written.
+ * Returns: Number of pages written.
+ *
+ * @f: QEMUFile where to send the data
+ * @block: block that contains the page we want to send
+ * @offset: offset inside the block for the page
+ * @last_stage: if we are at the completion stage
+ * @bytes_transferred: increase it with the number of transferred bytes
*/
static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
- bool last_stage)
+ bool last_stage, uint64_t *bytes_transferred)
{
- int bytes_sent;
+ int pages = -1;
uint64_t bytes_xmit;
- int cont;
ram_addr_t current_addr;
MemoryRegion *mr = block->mr;
uint8_t *p;
int ret;
bool send_async = true;
- cont = (block == last_sent_block) ? RAM_SAVE_FLAG_CONTINUE : 0;
-
p = memory_region_get_ram_ptr(mr) + offset;
/* In doubt sent page as normal */
- bytes_sent = -1;
bytes_xmit = 0;
ret = ram_control_save_page(f, block->offset,
offset, TARGET_PAGE_SIZE, &bytes_xmit);
if (bytes_xmit) {
- bytes_sent = bytes_xmit;
+ *bytes_transferred += bytes_xmit;
+ pages = 1;
}
XBZRLE_cache_lock();
current_addr = block->offset + offset;
+
+ if (block == last_sent_block) {
+ offset |= RAM_SAVE_FLAG_CONTINUE;
+ }
if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
if (ret != RAM_SAVE_CONTROL_DELAYED) {
if (bytes_xmit > 0) {
acct_info.dup_pages++;
}
}
- } else if (is_zero_range(p, TARGET_PAGE_SIZE)) {
- acct_info.dup_pages++;
- bytes_sent = save_block_hdr(f, block, offset, cont,
- RAM_SAVE_FLAG_COMPRESS);
- qemu_put_byte(f, 0);
- bytes_sent++;
- /* Must let xbzrle know, otherwise a previous (now 0'd) cached
- * page would be stale
- */
- xbzrle_cache_zero_page(current_addr);
- } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
- bytes_sent = save_xbzrle_page(f, &p, current_addr, block,
- offset, cont, last_stage);
- if (!last_stage) {
- /* Can't send this cached data async, since the cache page
- * might get updated before it gets to the wire
+ } else {
+ pages = save_zero_page(f, block, offset, p, bytes_transferred);
+ if (pages > 0) {
+ /* Must let xbzrle know, otherwise a previous (now 0'd) cached
+ * page would be stale
*/
- send_async = false;
+ xbzrle_cache_zero_page(current_addr);
+ } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
+ pages = save_xbzrle_page(f, &p, current_addr, block,
+ offset, last_stage, bytes_transferred);
+ if (!last_stage) {
+ /* Can't send this cached data async, since the cache page
+ * might get updated before it gets to the wire
+ */
+ send_async = false;
+ }
}
}
/* XBZRLE overflow or normal page */
- if (bytes_sent == -1) {
- bytes_sent = save_block_hdr(f, block, offset, cont, RAM_SAVE_FLAG_PAGE);
+ if (pages == -1) {
+ *bytes_transferred += save_page_header(f, block,
+ offset | RAM_SAVE_FLAG_PAGE);
if (send_async) {
qemu_put_buffer_async(f, p, TARGET_PAGE_SIZE);
} else {
qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
}
- bytes_sent += TARGET_PAGE_SIZE;
+ *bytes_transferred += TARGET_PAGE_SIZE;
+ pages = 1;
acct_info.norm_pages++;
}
XBZRLE_cache_unlock();
+ return pages;
+}
+
+static int do_compress_ram_page(CompressParam *param)
+{
+ int bytes_sent, blen;
+ uint8_t *p;
+ RAMBlock *block = param->block;
+ ram_addr_t offset = param->offset;
+
+ p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
+
+ bytes_sent = save_page_header(param->file, block, offset |
+ RAM_SAVE_FLAG_COMPRESS_PAGE);
+ blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
+ migrate_compress_level());
+ bytes_sent += blen;
+
return bytes_sent;
}
+static inline void start_compression(CompressParam *param)
+{
+ param->done = false;
+ qemu_mutex_lock(¶m->mutex);
+ param->start = true;
+ qemu_cond_signal(¶m->cond);
+ qemu_mutex_unlock(¶m->mutex);
+}
+
+static inline void start_decompression(DecompressParam *param)
+{
+ qemu_mutex_lock(¶m->mutex);
+ param->start = true;
+ qemu_cond_signal(¶m->cond);
+ qemu_mutex_unlock(¶m->mutex);
+}
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+ int idx, len, thread_count;
+
+ if (!migrate_use_compression()) {
+ return;
+ }
+ thread_count = migrate_compress_threads();
+ for (idx = 0; idx < thread_count; idx++) {
+ if (!comp_param[idx].done) {
+ qemu_mutex_lock(comp_done_lock);
+ while (!comp_param[idx].done && !quit_comp_thread) {
+ qemu_cond_wait(comp_done_cond, comp_done_lock);
+ }
+ qemu_mutex_unlock(comp_done_lock);
+ }
+ if (!quit_comp_thread) {
+ len = qemu_put_qemu_file(f, comp_param[idx].file);
+ bytes_transferred += len;
+ }
+ }
+}
+
+static inline void set_compress_params(CompressParam *param, RAMBlock *block,
+ ram_addr_t offset)
+{
+ param->block = block;
+ param->offset = offset;
+}
+
+static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
+ ram_addr_t offset,
+ uint64_t *bytes_transferred)
+{
+ int idx, thread_count, bytes_xmit = -1, pages = -1;
+
+ thread_count = migrate_compress_threads();
+ qemu_mutex_lock(comp_done_lock);
+ while (true) {
+ for (idx = 0; idx < thread_count; idx++) {
+ if (comp_param[idx].done) {
+ bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
+ set_compress_params(&comp_param[idx], block, offset);
+ start_compression(&comp_param[idx]);
+ pages = 1;
+ acct_info.norm_pages++;
+ *bytes_transferred += bytes_xmit;
+ break;
+ }
+ }
+ if (pages > 0) {
+ break;
+ } else {
+ qemu_cond_wait(comp_done_cond, comp_done_lock);
+ }
+ }
+ qemu_mutex_unlock(comp_done_lock);
+
+ return pages;
+}
+
+/**
+ * ram_save_compressed_page: compress the given page and send it to the stream
+ *
+ * Returns: Number of pages written.
+ *
+ * @f: QEMUFile where to send the data
+ * @block: block that contains the page we want to send
+ * @offset: offset inside the block for the page
+ * @last_stage: if we are at the completion stage
+ * @bytes_transferred: increase it with the number of transferred bytes
+ */
+static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
+ ram_addr_t offset, bool last_stage,
+ uint64_t *bytes_transferred)
+{
+ int pages = -1;
+ uint64_t bytes_xmit;
+ MemoryRegion *mr = block->mr;
+ uint8_t *p;
+ int ret;
+
+ p = memory_region_get_ram_ptr(mr) + offset;
+
+ bytes_xmit = 0;
+ ret = ram_control_save_page(f, block->offset,
+ offset, TARGET_PAGE_SIZE, &bytes_xmit);
+ if (bytes_xmit) {
+ *bytes_transferred += bytes_xmit;
+ pages = 1;
+ }
+ if (block == last_sent_block) {
+ offset |= RAM_SAVE_FLAG_CONTINUE;
+ }
+ if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+ if (ret != RAM_SAVE_CONTROL_DELAYED) {
+ if (bytes_xmit > 0) {
+ acct_info.norm_pages++;
+ } else if (bytes_xmit == 0) {
+ acct_info.dup_pages++;
+ }
+ }
+ } else {
+ /* When starting the process of a new block, the first page of
+ * the block should be sent out before other pages in the same
+ * block, and all the pages in last block should have been sent
+ * out, keeping this order is important, because the 'cont' flag
+ * is used to avoid resending the block name.
+ */
+ if (block != last_sent_block) {
+ flush_compressed_data(f);
+ pages = save_zero_page(f, block, offset, p, bytes_transferred);
+ if (pages == -1) {
+ set_compress_params(&comp_param[0], block, offset);
+ /* Use the qemu thread to compress the data to make sure the
+ * first page is sent out before other pages
+ */
+ bytes_xmit = do_compress_ram_page(&comp_param[0]);
+ acct_info.norm_pages++;
+ qemu_put_qemu_file(f, comp_param[0].file);
+ *bytes_transferred += bytes_xmit;
+ pages = 1;
+ }
+ } else {
+ pages = save_zero_page(f, block, offset, p, bytes_transferred);
+ if (pages == -1) {
+ pages = compress_page_with_multi_thread(f, block, offset,
+ bytes_transferred);
+ }
+ }
+ }
+
+ return pages;
+}
+
/**
* ram_find_and_save_block: Finds a dirty page and sends it to f
*
RAMBlock *block = last_seen_block;
ram_addr_t offset = last_offset;
bool complete_round = false;
- int bytes_sent = 0;
+ int pages = 0;
MemoryRegion *mr;
if (!block)
block = QLIST_FIRST_RCU(&ram_list.blocks);
complete_round = true;
ram_bulk_stage = false;
+ if (migrate_use_xbzrle()) {
+ /* If xbzrle is on, stop using the data compression at this
+ * point. In theory, xbzrle can do better than compression.
+ */
+ flush_compressed_data(f);
+ compression_switch = false;
+ }
}
} else {
- bytes_sent = ram_save_page(f, block, offset, last_stage);
+ if (compression_switch && migrate_use_compression()) {
+ pages = ram_save_compressed_page(f, block, offset, last_stage,
+ bytes_transferred);
+ } else {
+ pages = ram_save_page(f, block, offset, last_stage,
+ bytes_transferred);
+ }
/* if page is unmodified, continue to the next */
- if (bytes_sent > 0) {
+ if (pages > 0) {
last_sent_block = block;
break;
}
last_seen_block = block;
last_offset = offset;
- *bytes_transferred += bytes_sent;
-
- return (bytes_sent != 0);
+ return pages;
}
-static uint64_t bytes_transferred;
-
void acct_update_position(QEMUFile *f, size_t size, bool zero)
{
uint64_t pages = size / TARGET_PAGE_SIZE;
* Count the total number of pages used by ram blocks not including any
* gaps due to alignment or unplugs.
*/
- migration_dirty_pages = 0;
- QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
- uint64_t block_pages;
-
- block_pages = block->used_length >> TARGET_PAGE_BITS;
- migration_dirty_pages += block_pages;
- }
+ migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
memory_global_dirty_log_start();
migration_bitmap_sync();
}
i++;
}
+ flush_compressed_data(f);
rcu_read_unlock();
/*
}
}
+ flush_compressed_data(f);
ram_control_after_iterate(f, RAM_CONTROL_FINISH);
migration_end();
}
}
+static void *do_data_decompress(void *opaque)
+{
+ DecompressParam *param = opaque;
+ unsigned long pagesize;
+
+ while (!quit_decomp_thread) {
+ qemu_mutex_lock(¶m->mutex);
+ while (!param->start && !quit_decomp_thread) {
+ qemu_cond_wait(¶m->cond, ¶m->mutex);
+ pagesize = TARGET_PAGE_SIZE;
+ if (!quit_decomp_thread) {
+ /* uncompress() will return failed in some case, especially
+ * when the page is dirted when doing the compression, it's
+ * not a problem because the dirty page will be retransferred
+ * and uncompress() won't break the data in other pages.
+ */
+ uncompress((Bytef *)param->des, &pagesize,
+ (const Bytef *)param->compbuf, param->len);
+ }
+ param->start = false;
+ }
+ qemu_mutex_unlock(¶m->mutex);
+ }
+
+ return NULL;
+}
+
+void migrate_decompress_threads_create(void)
+{
+ int i, thread_count;
+
+ thread_count = migrate_decompress_threads();
+ decompress_threads = g_new0(QemuThread, thread_count);
+ decomp_param = g_new0(DecompressParam, thread_count);
+ compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+ quit_decomp_thread = false;
+ for (i = 0; i < thread_count; i++) {
+ qemu_mutex_init(&decomp_param[i].mutex);
+ qemu_cond_init(&decomp_param[i].cond);
+ decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+ qemu_thread_create(decompress_threads + i, "decompress",
+ do_data_decompress, decomp_param + i,
+ QEMU_THREAD_JOINABLE);
+ }
+}
+
+void migrate_decompress_threads_join(void)
+{
+ int i, thread_count;
+
+ quit_decomp_thread = true;
+ thread_count = migrate_decompress_threads();
+ for (i = 0; i < thread_count; i++) {
+ qemu_mutex_lock(&decomp_param[i].mutex);
+ qemu_cond_signal(&decomp_param[i].cond);
+ qemu_mutex_unlock(&decomp_param[i].mutex);
+ }
+ for (i = 0; i < thread_count; i++) {
+ qemu_thread_join(decompress_threads + i);
+ qemu_mutex_destroy(&decomp_param[i].mutex);
+ qemu_cond_destroy(&decomp_param[i].cond);
+ g_free(decomp_param[i].compbuf);
+ }
+ g_free(decompress_threads);
+ g_free(decomp_param);
+ g_free(compressed_data_buf);
+ decompress_threads = NULL;
+ decomp_param = NULL;
+ compressed_data_buf = NULL;
+}
+
+static void decompress_data_with_multi_threads(uint8_t *compbuf,
+ void *host, int len)
+{
+ int idx, thread_count;
+
+ thread_count = migrate_decompress_threads();
+ while (true) {
+ for (idx = 0; idx < thread_count; idx++) {
+ if (!decomp_param[idx].start) {
+ memcpy(decomp_param[idx].compbuf, compbuf, len);
+ decomp_param[idx].des = host;
+ decomp_param[idx].len = len;
+ start_decompression(&decomp_param[idx]);
+ break;
+ }
+ }
+ if (idx < thread_count) {
+ break;
+ }
+ }
+}
+
static int ram_load(QEMUFile *f, void *opaque, int version_id)
{
int flags = 0, ret = 0;
static uint64_t seq_iter;
+ int len = 0;
seq_iter++;
}
qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
break;
+ case RAM_SAVE_FLAG_COMPRESS_PAGE:
+ host = host_from_stream_offset(f, addr, flags);
+ if (!host) {
+ error_report("Invalid RAM offset " RAM_ADDR_FMT, addr);
+ ret = -EINVAL;
+ break;
+ }
+
+ len = qemu_get_be32(f);
+ if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
+ error_report("Invalid compressed data length: %d", len);
+ ret = -EINVAL;
+ break;
+ }
+ qemu_get_buffer(f, compressed_data_buf, len);
+ decompress_data_with_multi_threads(compressed_data_buf, host, len);
+ break;
case RAM_SAVE_FLAG_XBZRLE:
host = host_from_stream_offset(f, addr, flags);
if (!host) {