]> Git Repo - qemu.git/blob - migration/ram.c
migration: add speed limit for multifd migration
[qemu.git] / migration / ram.c
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2011-2015 Red Hat Inc
6  *
7  * Authors:
8  *  Juan Quintela <[email protected]>
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  */
28
29 #include "qemu/osdep.h"
30 #include "cpu.h"
31 #include <zlib.h>
32 #include "qemu/cutils.h"
33 #include "qemu/bitops.h"
34 #include "qemu/bitmap.h"
35 #include "qemu/main-loop.h"
36 #include "qemu/pmem.h"
37 #include "xbzrle.h"
38 #include "ram.h"
39 #include "migration.h"
40 #include "socket.h"
41 #include "migration/register.h"
42 #include "migration/misc.h"
43 #include "qemu-file.h"
44 #include "postcopy-ram.h"
45 #include "page_cache.h"
46 #include "qemu/error-report.h"
47 #include "qapi/error.h"
48 #include "qapi/qapi-events-migration.h"
49 #include "qapi/qmp/qerror.h"
50 #include "trace.h"
51 #include "exec/ram_addr.h"
52 #include "exec/target_page.h"
53 #include "qemu/rcu_queue.h"
54 #include "migration/colo.h"
55 #include "block.h"
56 #include "sysemu/sysemu.h"
57 #include "qemu/uuid.h"
58 #include "savevm.h"
59 #include "qemu/iov.h"
60
61 /***********************************************************/
62 /* ram save/restore */
63
64 /* RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it
65  * worked for pages that where filled with the same char.  We switched
66  * it to only search for the zero value.  And to avoid confusion with
67  * RAM_SSAVE_FLAG_COMPRESS_PAGE just rename it.
68  */
69
70 #define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
71 #define RAM_SAVE_FLAG_ZERO     0x02
72 #define RAM_SAVE_FLAG_MEM_SIZE 0x04
73 #define RAM_SAVE_FLAG_PAGE     0x08
74 #define RAM_SAVE_FLAG_EOS      0x10
75 #define RAM_SAVE_FLAG_CONTINUE 0x20
76 #define RAM_SAVE_FLAG_XBZRLE   0x40
77 /* 0x80 is reserved in migration.h start with 0x100 next */
78 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
79
80 static inline bool is_zero_range(uint8_t *p, uint64_t size)
81 {
82     return buffer_is_zero(p, size);
83 }
84
85 XBZRLECacheStats xbzrle_counters;
86
87 /* struct contains XBZRLE cache and a static page
88    used by the compression */
89 static struct {
90     /* buffer used for XBZRLE encoding */
91     uint8_t *encoded_buf;
92     /* buffer for storing page content */
93     uint8_t *current_buf;
94     /* Cache for XBZRLE, Protected by lock. */
95     PageCache *cache;
96     QemuMutex lock;
97     /* it will store a page full of zeros */
98     uint8_t *zero_target_page;
99     /* buffer used for XBZRLE decoding */
100     uint8_t *decoded_buf;
101 } XBZRLE;
102
103 static void XBZRLE_cache_lock(void)
104 {
105     if (migrate_use_xbzrle())
106         qemu_mutex_lock(&XBZRLE.lock);
107 }
108
109 static void XBZRLE_cache_unlock(void)
110 {
111     if (migrate_use_xbzrle())
112         qemu_mutex_unlock(&XBZRLE.lock);
113 }
114
115 /**
116  * xbzrle_cache_resize: resize the xbzrle cache
117  *
118  * This function is called from qmp_migrate_set_cache_size in main
119  * thread, possibly while a migration is in progress.  A running
120  * migration may be using the cache and might finish during this call,
121  * hence changes to the cache are protected by XBZRLE.lock().
122  *
123  * Returns 0 for success or -1 for error
124  *
125  * @new_size: new cache size
126  * @errp: set *errp if the check failed, with reason
127  */
128 int xbzrle_cache_resize(int64_t new_size, Error **errp)
129 {
130     PageCache *new_cache;
131     int64_t ret = 0;
132
133     /* Check for truncation */
134     if (new_size != (size_t)new_size) {
135         error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
136                    "exceeding address space");
137         return -1;
138     }
139
140     if (new_size == migrate_xbzrle_cache_size()) {
141         /* nothing to do */
142         return 0;
143     }
144
145     XBZRLE_cache_lock();
146
147     if (XBZRLE.cache != NULL) {
148         new_cache = cache_init(new_size, TARGET_PAGE_SIZE, errp);
149         if (!new_cache) {
150             ret = -1;
151             goto out;
152         }
153
154         cache_fini(XBZRLE.cache);
155         XBZRLE.cache = new_cache;
156     }
157 out:
158     XBZRLE_cache_unlock();
159     return ret;
160 }
161
162 static bool ramblock_is_ignored(RAMBlock *block)
163 {
164     return !qemu_ram_is_migratable(block) ||
165            (migrate_ignore_shared() && qemu_ram_is_shared(block));
166 }
167
168 /* Should be holding either ram_list.mutex, or the RCU lock. */
169 #define RAMBLOCK_FOREACH_NOT_IGNORED(block)            \
170     INTERNAL_RAMBLOCK_FOREACH(block)                   \
171         if (ramblock_is_ignored(block)) {} else
172
173 #define RAMBLOCK_FOREACH_MIGRATABLE(block)             \
174     INTERNAL_RAMBLOCK_FOREACH(block)                   \
175         if (!qemu_ram_is_migratable(block)) {} else
176
177 #undef RAMBLOCK_FOREACH
178
179 int foreach_not_ignored_block(RAMBlockIterFunc func, void *opaque)
180 {
181     RAMBlock *block;
182     int ret = 0;
183
184     rcu_read_lock();
185     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
186         ret = func(block, opaque);
187         if (ret) {
188             break;
189         }
190     }
191     rcu_read_unlock();
192     return ret;
193 }
194
195 static void ramblock_recv_map_init(void)
196 {
197     RAMBlock *rb;
198
199     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
200         assert(!rb->receivedmap);
201         rb->receivedmap = bitmap_new(rb->max_length >> qemu_target_page_bits());
202     }
203 }
204
205 int ramblock_recv_bitmap_test(RAMBlock *rb, void *host_addr)
206 {
207     return test_bit(ramblock_recv_bitmap_offset(host_addr, rb),
208                     rb->receivedmap);
209 }
210
211 bool ramblock_recv_bitmap_test_byte_offset(RAMBlock *rb, uint64_t byte_offset)
212 {
213     return test_bit(byte_offset >> TARGET_PAGE_BITS, rb->receivedmap);
214 }
215
216 void ramblock_recv_bitmap_set(RAMBlock *rb, void *host_addr)
217 {
218     set_bit_atomic(ramblock_recv_bitmap_offset(host_addr, rb), rb->receivedmap);
219 }
220
221 void ramblock_recv_bitmap_set_range(RAMBlock *rb, void *host_addr,
222                                     size_t nr)
223 {
224     bitmap_set_atomic(rb->receivedmap,
225                       ramblock_recv_bitmap_offset(host_addr, rb),
226                       nr);
227 }
228
229 #define  RAMBLOCK_RECV_BITMAP_ENDING  (0x0123456789abcdefULL)
230
231 /*
232  * Format: bitmap_size (8 bytes) + whole_bitmap (N bytes).
233  *
234  * Returns >0 if success with sent bytes, or <0 if error.
235  */
236 int64_t ramblock_recv_bitmap_send(QEMUFile *file,
237                                   const char *block_name)
238 {
239     RAMBlock *block = qemu_ram_block_by_name(block_name);
240     unsigned long *le_bitmap, nbits;
241     uint64_t size;
242
243     if (!block) {
244         error_report("%s: invalid block name: %s", __func__, block_name);
245         return -1;
246     }
247
248     nbits = block->used_length >> TARGET_PAGE_BITS;
249
250     /*
251      * Make sure the tmp bitmap buffer is big enough, e.g., on 32bit
252      * machines we may need 4 more bytes for padding (see below
253      * comment). So extend it a bit before hand.
254      */
255     le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
256
257     /*
258      * Always use little endian when sending the bitmap. This is
259      * required that when source and destination VMs are not using the
260      * same endianess. (Note: big endian won't work.)
261      */
262     bitmap_to_le(le_bitmap, block->receivedmap, nbits);
263
264     /* Size of the bitmap, in bytes */
265     size = DIV_ROUND_UP(nbits, 8);
266
267     /*
268      * size is always aligned to 8 bytes for 64bit machines, but it
269      * may not be true for 32bit machines. We need this padding to
270      * make sure the migration can survive even between 32bit and
271      * 64bit machines.
272      */
273     size = ROUND_UP(size, 8);
274
275     qemu_put_be64(file, size);
276     qemu_put_buffer(file, (const uint8_t *)le_bitmap, size);
277     /*
278      * Mark as an end, in case the middle part is screwed up due to
279      * some "misterious" reason.
280      */
281     qemu_put_be64(file, RAMBLOCK_RECV_BITMAP_ENDING);
282     qemu_fflush(file);
283
284     g_free(le_bitmap);
285
286     if (qemu_file_get_error(file)) {
287         return qemu_file_get_error(file);
288     }
289
290     return size + sizeof(size);
291 }
292
293 /*
294  * An outstanding page request, on the source, having been received
295  * and queued
296  */
297 struct RAMSrcPageRequest {
298     RAMBlock *rb;
299     hwaddr    offset;
300     hwaddr    len;
301
302     QSIMPLEQ_ENTRY(RAMSrcPageRequest) next_req;
303 };
304
305 /* State of RAM for migration */
306 struct RAMState {
307     /* QEMUFile used for this migration */
308     QEMUFile *f;
309     /* Last block that we have visited searching for dirty pages */
310     RAMBlock *last_seen_block;
311     /* Last block from where we have sent data */
312     RAMBlock *last_sent_block;
313     /* Last dirty target page we have sent */
314     ram_addr_t last_page;
315     /* last ram version we have seen */
316     uint32_t last_version;
317     /* We are in the first round */
318     bool ram_bulk_stage;
319     /* The free page optimization is enabled */
320     bool fpo_enabled;
321     /* How many times we have dirty too many pages */
322     int dirty_rate_high_cnt;
323     /* these variables are used for bitmap sync */
324     /* last time we did a full bitmap_sync */
325     int64_t time_last_bitmap_sync;
326     /* bytes transferred at start_time */
327     uint64_t bytes_xfer_prev;
328     /* number of dirty pages since start_time */
329     uint64_t num_dirty_pages_period;
330     /* xbzrle misses since the beginning of the period */
331     uint64_t xbzrle_cache_miss_prev;
332
333     /* compression statistics since the beginning of the period */
334     /* amount of count that no free thread to compress data */
335     uint64_t compress_thread_busy_prev;
336     /* amount bytes after compression */
337     uint64_t compressed_size_prev;
338     /* amount of compressed pages */
339     uint64_t compress_pages_prev;
340
341     /* total handled target pages at the beginning of period */
342     uint64_t target_page_count_prev;
343     /* total handled target pages since start */
344     uint64_t target_page_count;
345     /* number of dirty bits in the bitmap */
346     uint64_t migration_dirty_pages;
347     /* Protects modification of the bitmap and migration dirty pages */
348     QemuMutex bitmap_mutex;
349     /* The RAMBlock used in the last src_page_requests */
350     RAMBlock *last_req_rb;
351     /* Queue of outstanding page requests from the destination */
352     QemuMutex src_page_req_mutex;
353     QSIMPLEQ_HEAD(, RAMSrcPageRequest) src_page_requests;
354 };
355 typedef struct RAMState RAMState;
356
357 static RAMState *ram_state;
358
359 static NotifierWithReturnList precopy_notifier_list;
360
361 void precopy_infrastructure_init(void)
362 {
363     notifier_with_return_list_init(&precopy_notifier_list);
364 }
365
366 void precopy_add_notifier(NotifierWithReturn *n)
367 {
368     notifier_with_return_list_add(&precopy_notifier_list, n);
369 }
370
371 void precopy_remove_notifier(NotifierWithReturn *n)
372 {
373     notifier_with_return_remove(n);
374 }
375
376 int precopy_notify(PrecopyNotifyReason reason, Error **errp)
377 {
378     PrecopyNotifyData pnd;
379     pnd.reason = reason;
380     pnd.errp = errp;
381
382     return notifier_with_return_list_notify(&precopy_notifier_list, &pnd);
383 }
384
385 void precopy_enable_free_page_optimization(void)
386 {
387     if (!ram_state) {
388         return;
389     }
390
391     ram_state->fpo_enabled = true;
392 }
393
394 uint64_t ram_bytes_remaining(void)
395 {
396     return ram_state ? (ram_state->migration_dirty_pages * TARGET_PAGE_SIZE) :
397                        0;
398 }
399
400 MigrationStats ram_counters;
401
402 /* used by the search for pages to send */
403 struct PageSearchStatus {
404     /* Current block being searched */
405     RAMBlock    *block;
406     /* Current page to search from */
407     unsigned long page;
408     /* Set once we wrap around */
409     bool         complete_round;
410 };
411 typedef struct PageSearchStatus PageSearchStatus;
412
413 CompressionStats compression_counters;
414
415 struct CompressParam {
416     bool done;
417     bool quit;
418     bool zero_page;
419     QEMUFile *file;
420     QemuMutex mutex;
421     QemuCond cond;
422     RAMBlock *block;
423     ram_addr_t offset;
424
425     /* internally used fields */
426     z_stream stream;
427     uint8_t *originbuf;
428 };
429 typedef struct CompressParam CompressParam;
430
431 struct DecompressParam {
432     bool done;
433     bool quit;
434     QemuMutex mutex;
435     QemuCond cond;
436     void *des;
437     uint8_t *compbuf;
438     int len;
439     z_stream stream;
440 };
441 typedef struct DecompressParam DecompressParam;
442
443 static CompressParam *comp_param;
444 static QemuThread *compress_threads;
445 /* comp_done_cond is used to wake up the migration thread when
446  * one of the compression threads has finished the compression.
447  * comp_done_lock is used to co-work with comp_done_cond.
448  */
449 static QemuMutex comp_done_lock;
450 static QemuCond comp_done_cond;
451 /* The empty QEMUFileOps will be used by file in CompressParam */
452 static const QEMUFileOps empty_ops = { };
453
454 static QEMUFile *decomp_file;
455 static DecompressParam *decomp_param;
456 static QemuThread *decompress_threads;
457 static QemuMutex decomp_done_lock;
458 static QemuCond decomp_done_cond;
459
460 static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
461                                  ram_addr_t offset, uint8_t *source_buf);
462
463 static void *do_data_compress(void *opaque)
464 {
465     CompressParam *param = opaque;
466     RAMBlock *block;
467     ram_addr_t offset;
468     bool zero_page;
469
470     qemu_mutex_lock(&param->mutex);
471     while (!param->quit) {
472         if (param->block) {
473             block = param->block;
474             offset = param->offset;
475             param->block = NULL;
476             qemu_mutex_unlock(&param->mutex);
477
478             zero_page = do_compress_ram_page(param->file, &param->stream,
479                                              block, offset, param->originbuf);
480
481             qemu_mutex_lock(&comp_done_lock);
482             param->done = true;
483             param->zero_page = zero_page;
484             qemu_cond_signal(&comp_done_cond);
485             qemu_mutex_unlock(&comp_done_lock);
486
487             qemu_mutex_lock(&param->mutex);
488         } else {
489             qemu_cond_wait(&param->cond, &param->mutex);
490         }
491     }
492     qemu_mutex_unlock(&param->mutex);
493
494     return NULL;
495 }
496
497 static void compress_threads_save_cleanup(void)
498 {
499     int i, thread_count;
500
501     if (!migrate_use_compression() || !comp_param) {
502         return;
503     }
504
505     thread_count = migrate_compress_threads();
506     for (i = 0; i < thread_count; i++) {
507         /*
508          * we use it as a indicator which shows if the thread is
509          * properly init'd or not
510          */
511         if (!comp_param[i].file) {
512             break;
513         }
514
515         qemu_mutex_lock(&comp_param[i].mutex);
516         comp_param[i].quit = true;
517         qemu_cond_signal(&comp_param[i].cond);
518         qemu_mutex_unlock(&comp_param[i].mutex);
519
520         qemu_thread_join(compress_threads + i);
521         qemu_mutex_destroy(&comp_param[i].mutex);
522         qemu_cond_destroy(&comp_param[i].cond);
523         deflateEnd(&comp_param[i].stream);
524         g_free(comp_param[i].originbuf);
525         qemu_fclose(comp_param[i].file);
526         comp_param[i].file = NULL;
527     }
528     qemu_mutex_destroy(&comp_done_lock);
529     qemu_cond_destroy(&comp_done_cond);
530     g_free(compress_threads);
531     g_free(comp_param);
532     compress_threads = NULL;
533     comp_param = NULL;
534 }
535
536 static int compress_threads_save_setup(void)
537 {
538     int i, thread_count;
539
540     if (!migrate_use_compression()) {
541         return 0;
542     }
543     thread_count = migrate_compress_threads();
544     compress_threads = g_new0(QemuThread, thread_count);
545     comp_param = g_new0(CompressParam, thread_count);
546     qemu_cond_init(&comp_done_cond);
547     qemu_mutex_init(&comp_done_lock);
548     for (i = 0; i < thread_count; i++) {
549         comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
550         if (!comp_param[i].originbuf) {
551             goto exit;
552         }
553
554         if (deflateInit(&comp_param[i].stream,
555                         migrate_compress_level()) != Z_OK) {
556             g_free(comp_param[i].originbuf);
557             goto exit;
558         }
559
560         /* comp_param[i].file is just used as a dummy buffer to save data,
561          * set its ops to empty.
562          */
563         comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
564         comp_param[i].done = true;
565         comp_param[i].quit = false;
566         qemu_mutex_init(&comp_param[i].mutex);
567         qemu_cond_init(&comp_param[i].cond);
568         qemu_thread_create(compress_threads + i, "compress",
569                            do_data_compress, comp_param + i,
570                            QEMU_THREAD_JOINABLE);
571     }
572     return 0;
573
574 exit:
575     compress_threads_save_cleanup();
576     return -1;
577 }
578
579 /* Multiple fd's */
580
581 #define MULTIFD_MAGIC 0x11223344U
582 #define MULTIFD_VERSION 1
583
584 #define MULTIFD_FLAG_SYNC (1 << 0)
585
586 /* This value needs to be a multiple of qemu_target_page_size() */
587 #define MULTIFD_PACKET_SIZE (512 * 1024)
588
589 typedef struct {
590     uint32_t magic;
591     uint32_t version;
592     unsigned char uuid[16]; /* QemuUUID */
593     uint8_t id;
594     uint8_t unused1[7];     /* Reserved for future use */
595     uint64_t unused2[4];    /* Reserved for future use */
596 } __attribute__((packed)) MultiFDInit_t;
597
598 typedef struct {
599     uint32_t magic;
600     uint32_t version;
601     uint32_t flags;
602     /* maximum number of allocated pages */
603     uint32_t pages_alloc;
604     uint32_t pages_used;
605     /* size of the next packet that contains pages */
606     uint32_t next_packet_size;
607     uint64_t packet_num;
608     uint64_t unused[4];    /* Reserved for future use */
609     char ramblock[256];
610     uint64_t offset[];
611 } __attribute__((packed)) MultiFDPacket_t;
612
613 typedef struct {
614     /* number of used pages */
615     uint32_t used;
616     /* number of allocated pages */
617     uint32_t allocated;
618     /* global number of generated multifd packets */
619     uint64_t packet_num;
620     /* offset of each page */
621     ram_addr_t *offset;
622     /* pointer to each page */
623     struct iovec *iov;
624     RAMBlock *block;
625 } MultiFDPages_t;
626
627 typedef struct {
628     /* this fields are not changed once the thread is created */
629     /* channel number */
630     uint8_t id;
631     /* channel thread name */
632     char *name;
633     /* channel thread id */
634     QemuThread thread;
635     /* communication channel */
636     QIOChannel *c;
637     /* sem where to wait for more work */
638     QemuSemaphore sem;
639     /* this mutex protects the following parameters */
640     QemuMutex mutex;
641     /* is this channel thread running */
642     bool running;
643     /* should this thread finish */
644     bool quit;
645     /* thread has work to do */
646     int pending_job;
647     /* array of pages to sent */
648     MultiFDPages_t *pages;
649     /* packet allocated len */
650     uint32_t packet_len;
651     /* pointer to the packet */
652     MultiFDPacket_t *packet;
653     /* multifd flags for each packet */
654     uint32_t flags;
655     /* size of the next packet that contains pages */
656     uint32_t next_packet_size;
657     /* global number of generated multifd packets */
658     uint64_t packet_num;
659     /* thread local variables */
660     /* packets sent through this channel */
661     uint64_t num_packets;
662     /* pages sent through this channel */
663     uint64_t num_pages;
664 }  MultiFDSendParams;
665
666 typedef struct {
667     /* this fields are not changed once the thread is created */
668     /* channel number */
669     uint8_t id;
670     /* channel thread name */
671     char *name;
672     /* channel thread id */
673     QemuThread thread;
674     /* communication channel */
675     QIOChannel *c;
676     /* this mutex protects the following parameters */
677     QemuMutex mutex;
678     /* is this channel thread running */
679     bool running;
680     /* should this thread finish */
681     bool quit;
682     /* array of pages to receive */
683     MultiFDPages_t *pages;
684     /* packet allocated len */
685     uint32_t packet_len;
686     /* pointer to the packet */
687     MultiFDPacket_t *packet;
688     /* multifd flags for each packet */
689     uint32_t flags;
690     /* global number of generated multifd packets */
691     uint64_t packet_num;
692     /* thread local variables */
693     /* size of the next packet that contains pages */
694     uint32_t next_packet_size;
695     /* packets sent through this channel */
696     uint64_t num_packets;
697     /* pages sent through this channel */
698     uint64_t num_pages;
699     /* syncs main thread and channels */
700     QemuSemaphore sem_sync;
701 } MultiFDRecvParams;
702
703 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
704 {
705     MultiFDInit_t msg;
706     int ret;
707
708     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
709     msg.version = cpu_to_be32(MULTIFD_VERSION);
710     msg.id = p->id;
711     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
712
713     ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
714     if (ret != 0) {
715         return -1;
716     }
717     return 0;
718 }
719
720 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
721 {
722     MultiFDInit_t msg;
723     int ret;
724
725     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
726     if (ret != 0) {
727         return -1;
728     }
729
730     msg.magic = be32_to_cpu(msg.magic);
731     msg.version = be32_to_cpu(msg.version);
732
733     if (msg.magic != MULTIFD_MAGIC) {
734         error_setg(errp, "multifd: received packet magic %x "
735                    "expected %x", msg.magic, MULTIFD_MAGIC);
736         return -1;
737     }
738
739     if (msg.version != MULTIFD_VERSION) {
740         error_setg(errp, "multifd: received packet version %d "
741                    "expected %d", msg.version, MULTIFD_VERSION);
742         return -1;
743     }
744
745     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
746         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
747         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
748
749         error_setg(errp, "multifd: received uuid '%s' and expected "
750                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
751         g_free(uuid);
752         g_free(msg_uuid);
753         return -1;
754     }
755
756     if (msg.id > migrate_multifd_channels()) {
757         error_setg(errp, "multifd: received channel version %d "
758                    "expected %d", msg.version, MULTIFD_VERSION);
759         return -1;
760     }
761
762     return msg.id;
763 }
764
765 static MultiFDPages_t *multifd_pages_init(size_t size)
766 {
767     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
768
769     pages->allocated = size;
770     pages->iov = g_new0(struct iovec, size);
771     pages->offset = g_new0(ram_addr_t, size);
772
773     return pages;
774 }
775
776 static void multifd_pages_clear(MultiFDPages_t *pages)
777 {
778     pages->used = 0;
779     pages->allocated = 0;
780     pages->packet_num = 0;
781     pages->block = NULL;
782     g_free(pages->iov);
783     pages->iov = NULL;
784     g_free(pages->offset);
785     pages->offset = NULL;
786     g_free(pages);
787 }
788
789 static void multifd_send_fill_packet(MultiFDSendParams *p)
790 {
791     MultiFDPacket_t *packet = p->packet;
792     uint32_t page_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
793     int i;
794
795     packet->magic = cpu_to_be32(MULTIFD_MAGIC);
796     packet->version = cpu_to_be32(MULTIFD_VERSION);
797     packet->flags = cpu_to_be32(p->flags);
798     packet->pages_alloc = cpu_to_be32(page_max);
799     packet->pages_used = cpu_to_be32(p->pages->used);
800     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
801     packet->packet_num = cpu_to_be64(p->packet_num);
802
803     if (p->pages->block) {
804         strncpy(packet->ramblock, p->pages->block->idstr, 256);
805     }
806
807     for (i = 0; i < p->pages->used; i++) {
808         packet->offset[i] = cpu_to_be64(p->pages->offset[i]);
809     }
810 }
811
812 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
813 {
814     MultiFDPacket_t *packet = p->packet;
815     uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
816     RAMBlock *block;
817     int i;
818
819     packet->magic = be32_to_cpu(packet->magic);
820     if (packet->magic != MULTIFD_MAGIC) {
821         error_setg(errp, "multifd: received packet "
822                    "magic %x and expected magic %x",
823                    packet->magic, MULTIFD_MAGIC);
824         return -1;
825     }
826
827     packet->version = be32_to_cpu(packet->version);
828     if (packet->version != MULTIFD_VERSION) {
829         error_setg(errp, "multifd: received packet "
830                    "version %d and expected version %d",
831                    packet->version, MULTIFD_VERSION);
832         return -1;
833     }
834
835     p->flags = be32_to_cpu(packet->flags);
836
837     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
838     /*
839      * If we recevied a packet that is 100 times bigger than expected
840      * just stop migration.  It is a magic number.
841      */
842     if (packet->pages_alloc > pages_max * 100) {
843         error_setg(errp, "multifd: received packet "
844                    "with size %d and expected a maximum size of %d",
845                    packet->pages_alloc, pages_max * 100) ;
846         return -1;
847     }
848     /*
849      * We received a packet that is bigger than expected but inside
850      * reasonable limits (see previous comment).  Just reallocate.
851      */
852     if (packet->pages_alloc > p->pages->allocated) {
853         multifd_pages_clear(p->pages);
854         p->pages = multifd_pages_init(packet->pages_alloc);
855     }
856
857     p->pages->used = be32_to_cpu(packet->pages_used);
858     if (p->pages->used > packet->pages_alloc) {
859         error_setg(errp, "multifd: received packet "
860                    "with %d pages and expected maximum pages are %d",
861                    p->pages->used, packet->pages_alloc) ;
862         return -1;
863     }
864
865     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
866     p->packet_num = be64_to_cpu(packet->packet_num);
867
868     if (p->pages->used) {
869         /* make sure that ramblock is 0 terminated */
870         packet->ramblock[255] = 0;
871         block = qemu_ram_block_by_name(packet->ramblock);
872         if (!block) {
873             error_setg(errp, "multifd: unknown ram block %s",
874                        packet->ramblock);
875             return -1;
876         }
877     }
878
879     for (i = 0; i < p->pages->used; i++) {
880         ram_addr_t offset = be64_to_cpu(packet->offset[i]);
881
882         if (offset > (block->used_length - TARGET_PAGE_SIZE)) {
883             error_setg(errp, "multifd: offset too long " RAM_ADDR_FMT
884                        " (max " RAM_ADDR_FMT ")",
885                        offset, block->max_length);
886             return -1;
887         }
888         p->pages->iov[i].iov_base = block->host + offset;
889         p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
890     }
891
892     return 0;
893 }
894
895 struct {
896     MultiFDSendParams *params;
897     /* array of pages to sent */
898     MultiFDPages_t *pages;
899     /* syncs main thread and channels */
900     QemuSemaphore sem_sync;
901     /* global number of generated multifd packets */
902     uint64_t packet_num;
903     /* send channels ready */
904     QemuSemaphore channels_ready;
905 } *multifd_send_state;
906
907 /*
908  * How we use multifd_send_state->pages and channel->pages?
909  *
910  * We create a pages for each channel, and a main one.  Each time that
911  * we need to send a batch of pages we interchange the ones between
912  * multifd_send_state and the channel that is sending it.  There are
913  * two reasons for that:
914  *    - to not have to do so many mallocs during migration
915  *    - to make easier to know what to free at the end of migration
916  *
917  * This way we always know who is the owner of each "pages" struct,
918  * and we don't need any locking.  It belongs to the migration thread
919  * or to the channel thread.  Switching is safe because the migration
920  * thread is using the channel mutex when changing it, and the channel
921  * have to had finish with its own, otherwise pending_job can't be
922  * false.
923  */
924
925 static int multifd_send_pages(RAMState *rs)
926 {
927     int i;
928     static int next_channel;
929     MultiFDSendParams *p = NULL; /* make happy gcc */
930     MultiFDPages_t *pages = multifd_send_state->pages;
931     uint64_t transferred;
932
933     qemu_sem_wait(&multifd_send_state->channels_ready);
934     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
935         p = &multifd_send_state->params[i];
936
937         qemu_mutex_lock(&p->mutex);
938         if (p->quit) {
939             error_report("%s: channel %d has already quit!", __func__, i);
940             qemu_mutex_unlock(&p->mutex);
941             return -1;
942         }
943         if (!p->pending_job) {
944             p->pending_job++;
945             next_channel = (i + 1) % migrate_multifd_channels();
946             break;
947         }
948         qemu_mutex_unlock(&p->mutex);
949     }
950     p->pages->used = 0;
951
952     p->packet_num = multifd_send_state->packet_num++;
953     p->pages->block = NULL;
954     multifd_send_state->pages = p->pages;
955     p->pages = pages;
956     transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len;
957     qemu_file_update_transfer(rs->f, transferred);
958     ram_counters.multifd_bytes += transferred;
959     ram_counters.transferred += transferred;;
960     qemu_mutex_unlock(&p->mutex);
961     qemu_sem_post(&p->sem);
962
963     return 1;
964 }
965
966 static int multifd_queue_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
967 {
968     MultiFDPages_t *pages = multifd_send_state->pages;
969
970     if (!pages->block) {
971         pages->block = block;
972     }
973
974     if (pages->block == block) {
975         pages->offset[pages->used] = offset;
976         pages->iov[pages->used].iov_base = block->host + offset;
977         pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
978         pages->used++;
979
980         if (pages->used < pages->allocated) {
981             return 1;
982         }
983     }
984
985     if (multifd_send_pages(rs) < 0) {
986         return -1;
987     }
988
989     if (pages->block != block) {
990         return  multifd_queue_page(rs, block, offset);
991     }
992
993     return 1;
994 }
995
996 static void multifd_send_terminate_threads(Error *err)
997 {
998     int i;
999
1000     if (err) {
1001         MigrationState *s = migrate_get_current();
1002         migrate_set_error(s, err);
1003         if (s->state == MIGRATION_STATUS_SETUP ||
1004             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
1005             s->state == MIGRATION_STATUS_DEVICE ||
1006             s->state == MIGRATION_STATUS_ACTIVE) {
1007             migrate_set_state(&s->state, s->state,
1008                               MIGRATION_STATUS_FAILED);
1009         }
1010     }
1011
1012     for (i = 0; i < migrate_multifd_channels(); i++) {
1013         MultiFDSendParams *p = &multifd_send_state->params[i];
1014
1015         qemu_mutex_lock(&p->mutex);
1016         p->quit = true;
1017         qemu_sem_post(&p->sem);
1018         qemu_mutex_unlock(&p->mutex);
1019     }
1020 }
1021
1022 void multifd_save_cleanup(void)
1023 {
1024     int i;
1025
1026     if (!migrate_use_multifd()) {
1027         return;
1028     }
1029     multifd_send_terminate_threads(NULL);
1030     for (i = 0; i < migrate_multifd_channels(); i++) {
1031         MultiFDSendParams *p = &multifd_send_state->params[i];
1032
1033         if (p->running) {
1034             qemu_thread_join(&p->thread);
1035         }
1036         socket_send_channel_destroy(p->c);
1037         p->c = NULL;
1038         qemu_mutex_destroy(&p->mutex);
1039         qemu_sem_destroy(&p->sem);
1040         g_free(p->name);
1041         p->name = NULL;
1042         multifd_pages_clear(p->pages);
1043         p->pages = NULL;
1044         p->packet_len = 0;
1045         g_free(p->packet);
1046         p->packet = NULL;
1047     }
1048     qemu_sem_destroy(&multifd_send_state->channels_ready);
1049     qemu_sem_destroy(&multifd_send_state->sem_sync);
1050     g_free(multifd_send_state->params);
1051     multifd_send_state->params = NULL;
1052     multifd_pages_clear(multifd_send_state->pages);
1053     multifd_send_state->pages = NULL;
1054     g_free(multifd_send_state);
1055     multifd_send_state = NULL;
1056 }
1057
1058 static void multifd_send_sync_main(RAMState *rs)
1059 {
1060     int i;
1061
1062     if (!migrate_use_multifd()) {
1063         return;
1064     }
1065     if (multifd_send_state->pages->used) {
1066         if (multifd_send_pages(rs) < 0) {
1067             error_report("%s: multifd_send_pages fail", __func__);
1068             return;
1069         }
1070     }
1071     for (i = 0; i < migrate_multifd_channels(); i++) {
1072         MultiFDSendParams *p = &multifd_send_state->params[i];
1073
1074         trace_multifd_send_sync_main_signal(p->id);
1075
1076         qemu_mutex_lock(&p->mutex);
1077
1078         if (p->quit) {
1079             error_report("%s: channel %d has already quit", __func__, i);
1080             qemu_mutex_unlock(&p->mutex);
1081             return;
1082         }
1083
1084         p->packet_num = multifd_send_state->packet_num++;
1085         p->flags |= MULTIFD_FLAG_SYNC;
1086         p->pending_job++;
1087         qemu_file_update_transfer(rs->f, p->packet_len);
1088         qemu_mutex_unlock(&p->mutex);
1089         qemu_sem_post(&p->sem);
1090     }
1091     for (i = 0; i < migrate_multifd_channels(); i++) {
1092         MultiFDSendParams *p = &multifd_send_state->params[i];
1093
1094         trace_multifd_send_sync_main_wait(p->id);
1095         qemu_sem_wait(&multifd_send_state->sem_sync);
1096     }
1097     trace_multifd_send_sync_main(multifd_send_state->packet_num);
1098 }
1099
1100 static void *multifd_send_thread(void *opaque)
1101 {
1102     MultiFDSendParams *p = opaque;
1103     Error *local_err = NULL;
1104     int ret = 0;
1105     uint32_t flags = 0;
1106
1107     trace_multifd_send_thread_start(p->id);
1108     rcu_register_thread();
1109
1110     if (multifd_send_initial_packet(p, &local_err) < 0) {
1111         goto out;
1112     }
1113     /* initial packet */
1114     p->num_packets = 1;
1115
1116     while (true) {
1117         qemu_sem_wait(&p->sem);
1118         qemu_mutex_lock(&p->mutex);
1119
1120         if (p->pending_job) {
1121             uint32_t used = p->pages->used;
1122             uint64_t packet_num = p->packet_num;
1123             flags = p->flags;
1124
1125             p->next_packet_size = used * qemu_target_page_size();
1126             multifd_send_fill_packet(p);
1127             p->flags = 0;
1128             p->num_packets++;
1129             p->num_pages += used;
1130             p->pages->used = 0;
1131             qemu_mutex_unlock(&p->mutex);
1132
1133             trace_multifd_send(p->id, packet_num, used, flags,
1134                                p->next_packet_size);
1135
1136             ret = qio_channel_write_all(p->c, (void *)p->packet,
1137                                         p->packet_len, &local_err);
1138             if (ret != 0) {
1139                 break;
1140             }
1141
1142             if (used) {
1143                 ret = qio_channel_writev_all(p->c, p->pages->iov,
1144                                              used, &local_err);
1145                 if (ret != 0) {
1146                     break;
1147                 }
1148             }
1149
1150             qemu_mutex_lock(&p->mutex);
1151             p->pending_job--;
1152             qemu_mutex_unlock(&p->mutex);
1153
1154             if (flags & MULTIFD_FLAG_SYNC) {
1155                 qemu_sem_post(&multifd_send_state->sem_sync);
1156             }
1157             qemu_sem_post(&multifd_send_state->channels_ready);
1158         } else if (p->quit) {
1159             qemu_mutex_unlock(&p->mutex);
1160             break;
1161         } else {
1162             qemu_mutex_unlock(&p->mutex);
1163             /* sometimes there are spurious wakeups */
1164         }
1165     }
1166
1167 out:
1168     if (local_err) {
1169         multifd_send_terminate_threads(local_err);
1170     }
1171
1172     /*
1173      * Error happen, I will exit, but I can't just leave, tell
1174      * who pay attention to me.
1175      */
1176     if (ret != 0) {
1177         if (flags & MULTIFD_FLAG_SYNC) {
1178             qemu_sem_post(&multifd_send_state->sem_sync);
1179         }
1180         qemu_sem_post(&multifd_send_state->channels_ready);
1181     }
1182
1183     qemu_mutex_lock(&p->mutex);
1184     p->running = false;
1185     qemu_mutex_unlock(&p->mutex);
1186
1187     rcu_unregister_thread();
1188     trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
1189
1190     return NULL;
1191 }
1192
1193 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1194 {
1195     MultiFDSendParams *p = opaque;
1196     QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
1197     Error *local_err = NULL;
1198
1199     if (qio_task_propagate_error(task, &local_err)) {
1200         migrate_set_error(migrate_get_current(), local_err);
1201         multifd_save_cleanup();
1202     } else {
1203         p->c = QIO_CHANNEL(sioc);
1204         qio_channel_set_delay(p->c, false);
1205         p->running = true;
1206         qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1207                            QEMU_THREAD_JOINABLE);
1208     }
1209 }
1210
1211 int multifd_save_setup(void)
1212 {
1213     int thread_count;
1214     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1215     uint8_t i;
1216
1217     if (!migrate_use_multifd()) {
1218         return 0;
1219     }
1220     thread_count = migrate_multifd_channels();
1221     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1222     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1223     multifd_send_state->pages = multifd_pages_init(page_count);
1224     qemu_sem_init(&multifd_send_state->sem_sync, 0);
1225     qemu_sem_init(&multifd_send_state->channels_ready, 0);
1226
1227     for (i = 0; i < thread_count; i++) {
1228         MultiFDSendParams *p = &multifd_send_state->params[i];
1229
1230         qemu_mutex_init(&p->mutex);
1231         qemu_sem_init(&p->sem, 0);
1232         p->quit = false;
1233         p->pending_job = 0;
1234         p->id = i;
1235         p->pages = multifd_pages_init(page_count);
1236         p->packet_len = sizeof(MultiFDPacket_t)
1237                       + sizeof(ram_addr_t) * page_count;
1238         p->packet = g_malloc0(p->packet_len);
1239         p->name = g_strdup_printf("multifdsend_%d", i);
1240         socket_send_channel_create(multifd_new_send_channel_async, p);
1241     }
1242     return 0;
1243 }
1244
1245 struct {
1246     MultiFDRecvParams *params;
1247     /* number of created threads */
1248     int count;
1249     /* syncs main thread and channels */
1250     QemuSemaphore sem_sync;
1251     /* global number of generated multifd packets */
1252     uint64_t packet_num;
1253 } *multifd_recv_state;
1254
1255 static void multifd_recv_terminate_threads(Error *err)
1256 {
1257     int i;
1258
1259     if (err) {
1260         MigrationState *s = migrate_get_current();
1261         migrate_set_error(s, err);
1262         if (s->state == MIGRATION_STATUS_SETUP ||
1263             s->state == MIGRATION_STATUS_ACTIVE) {
1264             migrate_set_state(&s->state, s->state,
1265                               MIGRATION_STATUS_FAILED);
1266         }
1267     }
1268
1269     for (i = 0; i < migrate_multifd_channels(); i++) {
1270         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1271
1272         qemu_mutex_lock(&p->mutex);
1273         p->quit = true;
1274         /* We could arrive here for two reasons:
1275            - normal quit, i.e. everything went fine, just finished
1276            - error quit: We close the channels so the channel threads
1277              finish the qio_channel_read_all_eof() */
1278         qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1279         qemu_mutex_unlock(&p->mutex);
1280     }
1281 }
1282
1283 int multifd_load_cleanup(Error **errp)
1284 {
1285     int i;
1286     int ret = 0;
1287
1288     if (!migrate_use_multifd()) {
1289         return 0;
1290     }
1291     multifd_recv_terminate_threads(NULL);
1292     for (i = 0; i < migrate_multifd_channels(); i++) {
1293         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1294
1295         if (p->running) {
1296             p->quit = true;
1297             /*
1298              * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
1299              * however try to wakeup it without harm in cleanup phase.
1300              */
1301             qemu_sem_post(&p->sem_sync);
1302             qemu_thread_join(&p->thread);
1303         }
1304         object_unref(OBJECT(p->c));
1305         p->c = NULL;
1306         qemu_mutex_destroy(&p->mutex);
1307         qemu_sem_destroy(&p->sem_sync);
1308         g_free(p->name);
1309         p->name = NULL;
1310         multifd_pages_clear(p->pages);
1311         p->pages = NULL;
1312         p->packet_len = 0;
1313         g_free(p->packet);
1314         p->packet = NULL;
1315     }
1316     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1317     g_free(multifd_recv_state->params);
1318     multifd_recv_state->params = NULL;
1319     g_free(multifd_recv_state);
1320     multifd_recv_state = NULL;
1321
1322     return ret;
1323 }
1324
1325 static void multifd_recv_sync_main(void)
1326 {
1327     int i;
1328
1329     if (!migrate_use_multifd()) {
1330         return;
1331     }
1332     for (i = 0; i < migrate_multifd_channels(); i++) {
1333         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1334
1335         trace_multifd_recv_sync_main_wait(p->id);
1336         qemu_sem_wait(&multifd_recv_state->sem_sync);
1337     }
1338     for (i = 0; i < migrate_multifd_channels(); i++) {
1339         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1340
1341         qemu_mutex_lock(&p->mutex);
1342         if (multifd_recv_state->packet_num < p->packet_num) {
1343             multifd_recv_state->packet_num = p->packet_num;
1344         }
1345         qemu_mutex_unlock(&p->mutex);
1346         trace_multifd_recv_sync_main_signal(p->id);
1347         qemu_sem_post(&p->sem_sync);
1348     }
1349     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1350 }
1351
1352 static void *multifd_recv_thread(void *opaque)
1353 {
1354     MultiFDRecvParams *p = opaque;
1355     Error *local_err = NULL;
1356     int ret;
1357
1358     trace_multifd_recv_thread_start(p->id);
1359     rcu_register_thread();
1360
1361     while (true) {
1362         uint32_t used;
1363         uint32_t flags;
1364
1365         if (p->quit) {
1366             break;
1367         }
1368
1369         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1370                                        p->packet_len, &local_err);
1371         if (ret == 0) {   /* EOF */
1372             break;
1373         }
1374         if (ret == -1) {   /* Error */
1375             break;
1376         }
1377
1378         qemu_mutex_lock(&p->mutex);
1379         ret = multifd_recv_unfill_packet(p, &local_err);
1380         if (ret) {
1381             qemu_mutex_unlock(&p->mutex);
1382             break;
1383         }
1384
1385         used = p->pages->used;
1386         flags = p->flags;
1387         trace_multifd_recv(p->id, p->packet_num, used, flags,
1388                            p->next_packet_size);
1389         p->num_packets++;
1390         p->num_pages += used;
1391         qemu_mutex_unlock(&p->mutex);
1392
1393         if (used) {
1394             ret = qio_channel_readv_all(p->c, p->pages->iov,
1395                                         used, &local_err);
1396             if (ret != 0) {
1397                 break;
1398             }
1399         }
1400
1401         if (flags & MULTIFD_FLAG_SYNC) {
1402             qemu_sem_post(&multifd_recv_state->sem_sync);
1403             qemu_sem_wait(&p->sem_sync);
1404         }
1405     }
1406
1407     if (local_err) {
1408         multifd_recv_terminate_threads(local_err);
1409     }
1410     qemu_mutex_lock(&p->mutex);
1411     p->running = false;
1412     qemu_mutex_unlock(&p->mutex);
1413
1414     rcu_unregister_thread();
1415     trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
1416
1417     return NULL;
1418 }
1419
1420 int multifd_load_setup(void)
1421 {
1422     int thread_count;
1423     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1424     uint8_t i;
1425
1426     if (!migrate_use_multifd()) {
1427         return 0;
1428     }
1429     thread_count = migrate_multifd_channels();
1430     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1431     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1432     atomic_set(&multifd_recv_state->count, 0);
1433     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1434
1435     for (i = 0; i < thread_count; i++) {
1436         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1437
1438         qemu_mutex_init(&p->mutex);
1439         qemu_sem_init(&p->sem_sync, 0);
1440         p->quit = false;
1441         p->id = i;
1442         p->pages = multifd_pages_init(page_count);
1443         p->packet_len = sizeof(MultiFDPacket_t)
1444                       + sizeof(ram_addr_t) * page_count;
1445         p->packet = g_malloc0(p->packet_len);
1446         p->name = g_strdup_printf("multifdrecv_%d", i);
1447     }
1448     return 0;
1449 }
1450
1451 bool multifd_recv_all_channels_created(void)
1452 {
1453     int thread_count = migrate_multifd_channels();
1454
1455     if (!migrate_use_multifd()) {
1456         return true;
1457     }
1458
1459     return thread_count == atomic_read(&multifd_recv_state->count);
1460 }
1461
1462 /*
1463  * Try to receive all multifd channels to get ready for the migration.
1464  * - Return true and do not set @errp when correctly receving all channels;
1465  * - Return false and do not set @errp when correctly receiving the current one;
1466  * - Return false and set @errp when failing to receive the current channel.
1467  */
1468 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1469 {
1470     MultiFDRecvParams *p;
1471     Error *local_err = NULL;
1472     int id;
1473
1474     id = multifd_recv_initial_packet(ioc, &local_err);
1475     if (id < 0) {
1476         multifd_recv_terminate_threads(local_err);
1477         error_propagate_prepend(errp, local_err,
1478                                 "failed to receive packet"
1479                                 " via multifd channel %d: ",
1480                                 atomic_read(&multifd_recv_state->count));
1481         return false;
1482     }
1483
1484     p = &multifd_recv_state->params[id];
1485     if (p->c != NULL) {
1486         error_setg(&local_err, "multifd: received id '%d' already setup'",
1487                    id);
1488         multifd_recv_terminate_threads(local_err);
1489         error_propagate(errp, local_err);
1490         return false;
1491     }
1492     p->c = ioc;
1493     object_ref(OBJECT(ioc));
1494     /* initial packet */
1495     p->num_packets = 1;
1496
1497     p->running = true;
1498     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1499                        QEMU_THREAD_JOINABLE);
1500     atomic_inc(&multifd_recv_state->count);
1501     return atomic_read(&multifd_recv_state->count) ==
1502            migrate_multifd_channels();
1503 }
1504
1505 /**
1506  * save_page_header: write page header to wire
1507  *
1508  * If this is the 1st block, it also writes the block identification
1509  *
1510  * Returns the number of bytes written
1511  *
1512  * @f: QEMUFile where to send the data
1513  * @block: block that contains the page we want to send
1514  * @offset: offset inside the block for the page
1515  *          in the lower bits, it contains flags
1516  */
1517 static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
1518                                ram_addr_t offset)
1519 {
1520     size_t size, len;
1521
1522     if (block == rs->last_sent_block) {
1523         offset |= RAM_SAVE_FLAG_CONTINUE;
1524     }
1525     qemu_put_be64(f, offset);
1526     size = 8;
1527
1528     if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
1529         len = strlen(block->idstr);
1530         qemu_put_byte(f, len);
1531         qemu_put_buffer(f, (uint8_t *)block->idstr, len);
1532         size += 1 + len;
1533         rs->last_sent_block = block;
1534     }
1535     return size;
1536 }
1537
1538 /**
1539  * mig_throttle_guest_down: throotle down the guest
1540  *
1541  * Reduce amount of guest cpu execution to hopefully slow down memory
1542  * writes. If guest dirty memory rate is reduced below the rate at
1543  * which we can transfer pages to the destination then we should be
1544  * able to complete migration. Some workloads dirty memory way too
1545  * fast and will not effectively converge, even with auto-converge.
1546  */
1547 static void mig_throttle_guest_down(void)
1548 {
1549     MigrationState *s = migrate_get_current();
1550     uint64_t pct_initial = s->parameters.cpu_throttle_initial;
1551     uint64_t pct_icrement = s->parameters.cpu_throttle_increment;
1552     int pct_max = s->parameters.max_cpu_throttle;
1553
1554     /* We have not started throttling yet. Let's start it. */
1555     if (!cpu_throttle_active()) {
1556         cpu_throttle_set(pct_initial);
1557     } else {
1558         /* Throttling already on, just increase the rate */
1559         cpu_throttle_set(MIN(cpu_throttle_get_percentage() + pct_icrement,
1560                          pct_max));
1561     }
1562 }
1563
1564 /**
1565  * xbzrle_cache_zero_page: insert a zero page in the XBZRLE cache
1566  *
1567  * @rs: current RAM state
1568  * @current_addr: address for the zero page
1569  *
1570  * Update the xbzrle cache to reflect a page that's been sent as all 0.
1571  * The important thing is that a stale (not-yet-0'd) page be replaced
1572  * by the new data.
1573  * As a bonus, if the page wasn't in the cache it gets added so that
1574  * when a small write is made into the 0'd page it gets XBZRLE sent.
1575  */
1576 static void xbzrle_cache_zero_page(RAMState *rs, ram_addr_t current_addr)
1577 {
1578     if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
1579         return;
1580     }
1581
1582     /* We don't care if this fails to allocate a new cache page
1583      * as long as it updated an old one */
1584     cache_insert(XBZRLE.cache, current_addr, XBZRLE.zero_target_page,
1585                  ram_counters.dirty_sync_count);
1586 }
1587
1588 #define ENCODING_FLAG_XBZRLE 0x1
1589
1590 /**
1591  * save_xbzrle_page: compress and send current page
1592  *
1593  * Returns: 1 means that we wrote the page
1594  *          0 means that page is identical to the one already sent
1595  *          -1 means that xbzrle would be longer than normal
1596  *
1597  * @rs: current RAM state
1598  * @current_data: pointer to the address of the page contents
1599  * @current_addr: addr of the page
1600  * @block: block that contains the page we want to send
1601  * @offset: offset inside the block for the page
1602  * @last_stage: if we are at the completion stage
1603  */
1604 static int save_xbzrle_page(RAMState *rs, uint8_t **current_data,
1605                             ram_addr_t current_addr, RAMBlock *block,
1606                             ram_addr_t offset, bool last_stage)
1607 {
1608     int encoded_len = 0, bytes_xbzrle;
1609     uint8_t *prev_cached_page;
1610
1611     if (!cache_is_cached(XBZRLE.cache, current_addr,
1612                          ram_counters.dirty_sync_count)) {
1613         xbzrle_counters.cache_miss++;
1614         if (!last_stage) {
1615             if (cache_insert(XBZRLE.cache, current_addr, *current_data,
1616                              ram_counters.dirty_sync_count) == -1) {
1617                 return -1;
1618             } else {
1619                 /* update *current_data when the page has been
1620                    inserted into cache */
1621                 *current_data = get_cached_data(XBZRLE.cache, current_addr);
1622             }
1623         }
1624         return -1;
1625     }
1626
1627     prev_cached_page = get_cached_data(XBZRLE.cache, current_addr);
1628
1629     /* save current buffer into memory */
1630     memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE);
1631
1632     /* XBZRLE encoding (if there is no overflow) */
1633     encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf,
1634                                        TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
1635                                        TARGET_PAGE_SIZE);
1636
1637     /*
1638      * Update the cache contents, so that it corresponds to the data
1639      * sent, in all cases except where we skip the page.
1640      */
1641     if (!last_stage && encoded_len != 0) {
1642         memcpy(prev_cached_page, XBZRLE.current_buf, TARGET_PAGE_SIZE);
1643         /*
1644          * In the case where we couldn't compress, ensure that the caller
1645          * sends the data from the cache, since the guest might have
1646          * changed the RAM since we copied it.
1647          */
1648         *current_data = prev_cached_page;
1649     }
1650
1651     if (encoded_len == 0) {
1652         trace_save_xbzrle_page_skipping();
1653         return 0;
1654     } else if (encoded_len == -1) {
1655         trace_save_xbzrle_page_overflow();
1656         xbzrle_counters.overflow++;
1657         return -1;
1658     }
1659
1660     /* Send XBZRLE based compressed page */
1661     bytes_xbzrle = save_page_header(rs, rs->f, block,
1662                                     offset | RAM_SAVE_FLAG_XBZRLE);
1663     qemu_put_byte(rs->f, ENCODING_FLAG_XBZRLE);
1664     qemu_put_be16(rs->f, encoded_len);
1665     qemu_put_buffer(rs->f, XBZRLE.encoded_buf, encoded_len);
1666     bytes_xbzrle += encoded_len + 1 + 2;
1667     xbzrle_counters.pages++;
1668     xbzrle_counters.bytes += bytes_xbzrle;
1669     ram_counters.transferred += bytes_xbzrle;
1670
1671     return 1;
1672 }
1673
1674 /**
1675  * migration_bitmap_find_dirty: find the next dirty page from start
1676  *
1677  * Returns the page offset within memory region of the start of a dirty page
1678  *
1679  * @rs: current RAM state
1680  * @rb: RAMBlock where to search for dirty pages
1681  * @start: page where we start the search
1682  */
1683 static inline
1684 unsigned long migration_bitmap_find_dirty(RAMState *rs, RAMBlock *rb,
1685                                           unsigned long start)
1686 {
1687     unsigned long size = rb->used_length >> TARGET_PAGE_BITS;
1688     unsigned long *bitmap = rb->bmap;
1689     unsigned long next;
1690
1691     if (ramblock_is_ignored(rb)) {
1692         return size;
1693     }
1694
1695     /*
1696      * When the free page optimization is enabled, we need to check the bitmap
1697      * to send the non-free pages rather than all the pages in the bulk stage.
1698      */
1699     if (!rs->fpo_enabled && rs->ram_bulk_stage && start > 0) {
1700         next = start + 1;
1701     } else {
1702         next = find_next_bit(bitmap, size, start);
1703     }
1704
1705     return next;
1706 }
1707
1708 static inline bool migration_bitmap_clear_dirty(RAMState *rs,
1709                                                 RAMBlock *rb,
1710                                                 unsigned long page)
1711 {
1712     bool ret;
1713
1714     qemu_mutex_lock(&rs->bitmap_mutex);
1715
1716     /*
1717      * Clear dirty bitmap if needed.  This _must_ be called before we
1718      * send any of the page in the chunk because we need to make sure
1719      * we can capture further page content changes when we sync dirty
1720      * log the next time.  So as long as we are going to send any of
1721      * the page in the chunk we clear the remote dirty bitmap for all.
1722      * Clearing it earlier won't be a problem, but too late will.
1723      */
1724     if (rb->clear_bmap && clear_bmap_test_and_clear(rb, page)) {
1725         uint8_t shift = rb->clear_bmap_shift;
1726         hwaddr size = 1ULL << (TARGET_PAGE_BITS + shift);
1727         hwaddr start = (page << TARGET_PAGE_BITS) & (-size);
1728
1729         /*
1730          * CLEAR_BITMAP_SHIFT_MIN should always guarantee this... this
1731          * can make things easier sometimes since then start address
1732          * of the small chunk will always be 64 pages aligned so the
1733          * bitmap will always be aligned to unsigned long.  We should
1734          * even be able to remove this restriction but I'm simply
1735          * keeping it.
1736          */
1737         assert(shift >= 6);
1738         trace_migration_bitmap_clear_dirty(rb->idstr, start, size, page);
1739         memory_region_clear_dirty_bitmap(rb->mr, start, size);
1740     }
1741
1742     ret = test_and_clear_bit(page, rb->bmap);
1743
1744     if (ret) {
1745         rs->migration_dirty_pages--;
1746     }
1747     qemu_mutex_unlock(&rs->bitmap_mutex);
1748
1749     return ret;
1750 }
1751
1752 /* Called with RCU critical section */
1753 static void migration_bitmap_sync_range(RAMState *rs, RAMBlock *rb)
1754 {
1755     rs->migration_dirty_pages +=
1756         cpu_physical_memory_sync_dirty_bitmap(rb, 0, rb->used_length,
1757                                               &rs->num_dirty_pages_period);
1758 }
1759
1760 /**
1761  * ram_pagesize_summary: calculate all the pagesizes of a VM
1762  *
1763  * Returns a summary bitmap of the page sizes of all RAMBlocks
1764  *
1765  * For VMs with just normal pages this is equivalent to the host page
1766  * size. If it's got some huge pages then it's the OR of all the
1767  * different page sizes.
1768  */
1769 uint64_t ram_pagesize_summary(void)
1770 {
1771     RAMBlock *block;
1772     uint64_t summary = 0;
1773
1774     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1775         summary |= block->page_size;
1776     }
1777
1778     return summary;
1779 }
1780
1781 uint64_t ram_get_total_transferred_pages(void)
1782 {
1783     return  ram_counters.normal + ram_counters.duplicate +
1784                 compression_counters.pages + xbzrle_counters.pages;
1785 }
1786
1787 static void migration_update_rates(RAMState *rs, int64_t end_time)
1788 {
1789     uint64_t page_count = rs->target_page_count - rs->target_page_count_prev;
1790     double compressed_size;
1791
1792     /* calculate period counters */
1793     ram_counters.dirty_pages_rate = rs->num_dirty_pages_period * 1000
1794                 / (end_time - rs->time_last_bitmap_sync);
1795
1796     if (!page_count) {
1797         return;
1798     }
1799
1800     if (migrate_use_xbzrle()) {
1801         xbzrle_counters.cache_miss_rate = (double)(xbzrle_counters.cache_miss -
1802             rs->xbzrle_cache_miss_prev) / page_count;
1803         rs->xbzrle_cache_miss_prev = xbzrle_counters.cache_miss;
1804     }
1805
1806     if (migrate_use_compression()) {
1807         compression_counters.busy_rate = (double)(compression_counters.busy -
1808             rs->compress_thread_busy_prev) / page_count;
1809         rs->compress_thread_busy_prev = compression_counters.busy;
1810
1811         compressed_size = compression_counters.compressed_size -
1812                           rs->compressed_size_prev;
1813         if (compressed_size) {
1814             double uncompressed_size = (compression_counters.pages -
1815                                     rs->compress_pages_prev) * TARGET_PAGE_SIZE;
1816
1817             /* Compression-Ratio = Uncompressed-size / Compressed-size */
1818             compression_counters.compression_rate =
1819                                         uncompressed_size / compressed_size;
1820
1821             rs->compress_pages_prev = compression_counters.pages;
1822             rs->compressed_size_prev = compression_counters.compressed_size;
1823         }
1824     }
1825 }
1826
1827 static void migration_bitmap_sync(RAMState *rs)
1828 {
1829     RAMBlock *block;
1830     int64_t end_time;
1831     uint64_t bytes_xfer_now;
1832
1833     ram_counters.dirty_sync_count++;
1834
1835     if (!rs->time_last_bitmap_sync) {
1836         rs->time_last_bitmap_sync = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1837     }
1838
1839     trace_migration_bitmap_sync_start();
1840     memory_global_dirty_log_sync();
1841
1842     qemu_mutex_lock(&rs->bitmap_mutex);
1843     rcu_read_lock();
1844     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1845         migration_bitmap_sync_range(rs, block);
1846     }
1847     ram_counters.remaining = ram_bytes_remaining();
1848     rcu_read_unlock();
1849     qemu_mutex_unlock(&rs->bitmap_mutex);
1850
1851     trace_migration_bitmap_sync_end(rs->num_dirty_pages_period);
1852
1853     end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1854
1855     /* more than 1 second = 1000 millisecons */
1856     if (end_time > rs->time_last_bitmap_sync + 1000) {
1857         bytes_xfer_now = ram_counters.transferred;
1858
1859         /* During block migration the auto-converge logic incorrectly detects
1860          * that ram migration makes no progress. Avoid this by disabling the
1861          * throttling logic during the bulk phase of block migration. */
1862         if (migrate_auto_converge() && !blk_mig_bulk_active()) {
1863             /* The following detection logic can be refined later. For now:
1864                Check to see if the dirtied bytes is 50% more than the approx.
1865                amount of bytes that just got transferred since the last time we
1866                were in this routine. If that happens twice, start or increase
1867                throttling */
1868
1869             if ((rs->num_dirty_pages_period * TARGET_PAGE_SIZE >
1870                    (bytes_xfer_now - rs->bytes_xfer_prev) / 2) &&
1871                 (++rs->dirty_rate_high_cnt >= 2)) {
1872                     trace_migration_throttle();
1873                     rs->dirty_rate_high_cnt = 0;
1874                     mig_throttle_guest_down();
1875             }
1876         }
1877
1878         migration_update_rates(rs, end_time);
1879
1880         rs->target_page_count_prev = rs->target_page_count;
1881
1882         /* reset period counters */
1883         rs->time_last_bitmap_sync = end_time;
1884         rs->num_dirty_pages_period = 0;
1885         rs->bytes_xfer_prev = bytes_xfer_now;
1886     }
1887     if (migrate_use_events()) {
1888         qapi_event_send_migration_pass(ram_counters.dirty_sync_count);
1889     }
1890 }
1891
1892 static void migration_bitmap_sync_precopy(RAMState *rs)
1893 {
1894     Error *local_err = NULL;
1895
1896     /*
1897      * The current notifier usage is just an optimization to migration, so we
1898      * don't stop the normal migration process in the error case.
1899      */
1900     if (precopy_notify(PRECOPY_NOTIFY_BEFORE_BITMAP_SYNC, &local_err)) {
1901         error_report_err(local_err);
1902     }
1903
1904     migration_bitmap_sync(rs);
1905
1906     if (precopy_notify(PRECOPY_NOTIFY_AFTER_BITMAP_SYNC, &local_err)) {
1907         error_report_err(local_err);
1908     }
1909 }
1910
1911 /**
1912  * save_zero_page_to_file: send the zero page to the file
1913  *
1914  * Returns the size of data written to the file, 0 means the page is not
1915  * a zero page
1916  *
1917  * @rs: current RAM state
1918  * @file: the file where the data is saved
1919  * @block: block that contains the page we want to send
1920  * @offset: offset inside the block for the page
1921  */
1922 static int save_zero_page_to_file(RAMState *rs, QEMUFile *file,
1923                                   RAMBlock *block, ram_addr_t offset)
1924 {
1925     uint8_t *p = block->host + offset;
1926     int len = 0;
1927
1928     if (is_zero_range(p, TARGET_PAGE_SIZE)) {
1929         len += save_page_header(rs, file, block, offset | RAM_SAVE_FLAG_ZERO);
1930         qemu_put_byte(file, 0);
1931         len += 1;
1932     }
1933     return len;
1934 }
1935
1936 /**
1937  * save_zero_page: send the zero page to the stream
1938  *
1939  * Returns the number of pages written.
1940  *
1941  * @rs: current RAM state
1942  * @block: block that contains the page we want to send
1943  * @offset: offset inside the block for the page
1944  */
1945 static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
1946 {
1947     int len = save_zero_page_to_file(rs, rs->f, block, offset);
1948
1949     if (len) {
1950         ram_counters.duplicate++;
1951         ram_counters.transferred += len;
1952         return 1;
1953     }
1954     return -1;
1955 }
1956
1957 static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
1958 {
1959     if (!migrate_release_ram() || !migration_in_postcopy()) {
1960         return;
1961     }
1962
1963     ram_discard_range(rbname, offset, pages << TARGET_PAGE_BITS);
1964 }
1965
1966 /*
1967  * @pages: the number of pages written by the control path,
1968  *        < 0 - error
1969  *        > 0 - number of pages written
1970  *
1971  * Return true if the pages has been saved, otherwise false is returned.
1972  */
1973 static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
1974                               int *pages)
1975 {
1976     uint64_t bytes_xmit = 0;
1977     int ret;
1978
1979     *pages = -1;
1980     ret = ram_control_save_page(rs->f, block->offset, offset, TARGET_PAGE_SIZE,
1981                                 &bytes_xmit);
1982     if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
1983         return false;
1984     }
1985
1986     if (bytes_xmit) {
1987         ram_counters.transferred += bytes_xmit;
1988         *pages = 1;
1989     }
1990
1991     if (ret == RAM_SAVE_CONTROL_DELAYED) {
1992         return true;
1993     }
1994
1995     if (bytes_xmit > 0) {
1996         ram_counters.normal++;
1997     } else if (bytes_xmit == 0) {
1998         ram_counters.duplicate++;
1999     }
2000
2001     return true;
2002 }
2003
2004 /*
2005  * directly send the page to the stream
2006  *
2007  * Returns the number of pages written.
2008  *
2009  * @rs: current RAM state
2010  * @block: block that contains the page we want to send
2011  * @offset: offset inside the block for the page
2012  * @buf: the page to be sent
2013  * @async: send to page asyncly
2014  */
2015 static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
2016                             uint8_t *buf, bool async)
2017 {
2018     ram_counters.transferred += save_page_header(rs, rs->f, block,
2019                                                  offset | RAM_SAVE_FLAG_PAGE);
2020     if (async) {
2021         qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE,
2022                               migrate_release_ram() &
2023                               migration_in_postcopy());
2024     } else {
2025         qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE);
2026     }
2027     ram_counters.transferred += TARGET_PAGE_SIZE;
2028     ram_counters.normal++;
2029     return 1;
2030 }
2031
2032 /**
2033  * ram_save_page: send the given page to the stream
2034  *
2035  * Returns the number of pages written.
2036  *          < 0 - error
2037  *          >=0 - Number of pages written - this might legally be 0
2038  *                if xbzrle noticed the page was the same.
2039  *
2040  * @rs: current RAM state
2041  * @block: block that contains the page we want to send
2042  * @offset: offset inside the block for the page
2043  * @last_stage: if we are at the completion stage
2044  */
2045 static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
2046 {
2047     int pages = -1;
2048     uint8_t *p;
2049     bool send_async = true;
2050     RAMBlock *block = pss->block;
2051     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
2052     ram_addr_t current_addr = block->offset + offset;
2053
2054     p = block->host + offset;
2055     trace_ram_save_page(block->idstr, (uint64_t)offset, p);
2056
2057     XBZRLE_cache_lock();
2058     if (!rs->ram_bulk_stage && !migration_in_postcopy() &&
2059         migrate_use_xbzrle()) {
2060         pages = save_xbzrle_page(rs, &p, current_addr, block,
2061                                  offset, last_stage);
2062         if (!last_stage) {
2063             /* Can't send this cached data async, since the cache page
2064              * might get updated before it gets to the wire
2065              */
2066             send_async = false;
2067         }
2068     }
2069
2070     /* XBZRLE overflow or normal page */
2071     if (pages == -1) {
2072         pages = save_normal_page(rs, block, offset, p, send_async);
2073     }
2074
2075     XBZRLE_cache_unlock();
2076
2077     return pages;
2078 }
2079
2080 static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
2081                                  ram_addr_t offset)
2082 {
2083     if (multifd_queue_page(rs, block, offset) < 0) {
2084         return -1;
2085     }
2086     ram_counters.normal++;
2087
2088     return 1;
2089 }
2090
2091 static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
2092                                  ram_addr_t offset, uint8_t *source_buf)
2093 {
2094     RAMState *rs = ram_state;
2095     uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
2096     bool zero_page = false;
2097     int ret;
2098
2099     if (save_zero_page_to_file(rs, f, block, offset)) {
2100         zero_page = true;
2101         goto exit;
2102     }
2103
2104     save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
2105
2106     /*
2107      * copy it to a internal buffer to avoid it being modified by VM
2108      * so that we can catch up the error during compression and
2109      * decompression
2110      */
2111     memcpy(source_buf, p, TARGET_PAGE_SIZE);
2112     ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
2113     if (ret < 0) {
2114         qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
2115         error_report("compressed data failed!");
2116         return false;
2117     }
2118
2119 exit:
2120     ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
2121     return zero_page;
2122 }
2123
2124 static void
2125 update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
2126 {
2127     ram_counters.transferred += bytes_xmit;
2128
2129     if (param->zero_page) {
2130         ram_counters.duplicate++;
2131         return;
2132     }
2133
2134     /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
2135     compression_counters.compressed_size += bytes_xmit - 8;
2136     compression_counters.pages++;
2137 }
2138
2139 static bool save_page_use_compression(RAMState *rs);
2140
2141 static void flush_compressed_data(RAMState *rs)
2142 {
2143     int idx, len, thread_count;
2144
2145     if (!save_page_use_compression(rs)) {
2146         return;
2147     }
2148     thread_count = migrate_compress_threads();
2149
2150     qemu_mutex_lock(&comp_done_lock);
2151     for (idx = 0; idx < thread_count; idx++) {
2152         while (!comp_param[idx].done) {
2153             qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2154         }
2155     }
2156     qemu_mutex_unlock(&comp_done_lock);
2157
2158     for (idx = 0; idx < thread_count; idx++) {
2159         qemu_mutex_lock(&comp_param[idx].mutex);
2160         if (!comp_param[idx].quit) {
2161             len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2162             /*
2163              * it's safe to fetch zero_page without holding comp_done_lock
2164              * as there is no further request submitted to the thread,
2165              * i.e, the thread should be waiting for a request at this point.
2166              */
2167             update_compress_thread_counts(&comp_param[idx], len);
2168         }
2169         qemu_mutex_unlock(&comp_param[idx].mutex);
2170     }
2171 }
2172
2173 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
2174                                        ram_addr_t offset)
2175 {
2176     param->block = block;
2177     param->offset = offset;
2178 }
2179
2180 static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
2181                                            ram_addr_t offset)
2182 {
2183     int idx, thread_count, bytes_xmit = -1, pages = -1;
2184     bool wait = migrate_compress_wait_thread();
2185
2186     thread_count = migrate_compress_threads();
2187     qemu_mutex_lock(&comp_done_lock);
2188 retry:
2189     for (idx = 0; idx < thread_count; idx++) {
2190         if (comp_param[idx].done) {
2191             comp_param[idx].done = false;
2192             bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2193             qemu_mutex_lock(&comp_param[idx].mutex);
2194             set_compress_params(&comp_param[idx], block, offset);
2195             qemu_cond_signal(&comp_param[idx].cond);
2196             qemu_mutex_unlock(&comp_param[idx].mutex);
2197             pages = 1;
2198             update_compress_thread_counts(&comp_param[idx], bytes_xmit);
2199             break;
2200         }
2201     }
2202
2203     /*
2204      * wait for the free thread if the user specifies 'compress-wait-thread',
2205      * otherwise we will post the page out in the main thread as normal page.
2206      */
2207     if (pages < 0 && wait) {
2208         qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2209         goto retry;
2210     }
2211     qemu_mutex_unlock(&comp_done_lock);
2212
2213     return pages;
2214 }
2215
2216 /**
2217  * find_dirty_block: find the next dirty page and update any state
2218  * associated with the search process.
2219  *
2220  * Returns true if a page is found
2221  *
2222  * @rs: current RAM state
2223  * @pss: data about the state of the current dirty page scan
2224  * @again: set to false if the search has scanned the whole of RAM
2225  */
2226 static bool find_dirty_block(RAMState *rs, PageSearchStatus *pss, bool *again)
2227 {
2228     pss->page = migration_bitmap_find_dirty(rs, pss->block, pss->page);
2229     if (pss->complete_round && pss->block == rs->last_seen_block &&
2230         pss->page >= rs->last_page) {
2231         /*
2232          * We've been once around the RAM and haven't found anything.
2233          * Give up.
2234          */
2235         *again = false;
2236         return false;
2237     }
2238     if ((pss->page << TARGET_PAGE_BITS) >= pss->block->used_length) {
2239         /* Didn't find anything in this RAM Block */
2240         pss->page = 0;
2241         pss->block = QLIST_NEXT_RCU(pss->block, next);
2242         if (!pss->block) {
2243             /*
2244              * If memory migration starts over, we will meet a dirtied page
2245              * which may still exists in compression threads's ring, so we
2246              * should flush the compressed data to make sure the new page
2247              * is not overwritten by the old one in the destination.
2248              *
2249              * Also If xbzrle is on, stop using the data compression at this
2250              * point. In theory, xbzrle can do better than compression.
2251              */
2252             flush_compressed_data(rs);
2253
2254             /* Hit the end of the list */
2255             pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
2256             /* Flag that we've looped */
2257             pss->complete_round = true;
2258             rs->ram_bulk_stage = false;
2259         }
2260         /* Didn't find anything this time, but try again on the new block */
2261         *again = true;
2262         return false;
2263     } else {
2264         /* Can go around again, but... */
2265         *again = true;
2266         /* We've found something so probably don't need to */
2267         return true;
2268     }
2269 }
2270
2271 /**
2272  * unqueue_page: gets a page of the queue
2273  *
2274  * Helper for 'get_queued_page' - gets a page off the queue
2275  *
2276  * Returns the block of the page (or NULL if none available)
2277  *
2278  * @rs: current RAM state
2279  * @offset: used to return the offset within the RAMBlock
2280  */
2281 static RAMBlock *unqueue_page(RAMState *rs, ram_addr_t *offset)
2282 {
2283     RAMBlock *block = NULL;
2284
2285     if (QSIMPLEQ_EMPTY_ATOMIC(&rs->src_page_requests)) {
2286         return NULL;
2287     }
2288
2289     qemu_mutex_lock(&rs->src_page_req_mutex);
2290     if (!QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
2291         struct RAMSrcPageRequest *entry =
2292                                 QSIMPLEQ_FIRST(&rs->src_page_requests);
2293         block = entry->rb;
2294         *offset = entry->offset;
2295
2296         if (entry->len > TARGET_PAGE_SIZE) {
2297             entry->len -= TARGET_PAGE_SIZE;
2298             entry->offset += TARGET_PAGE_SIZE;
2299         } else {
2300             memory_region_unref(block->mr);
2301             QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2302             g_free(entry);
2303             migration_consume_urgent_request();
2304         }
2305     }
2306     qemu_mutex_unlock(&rs->src_page_req_mutex);
2307
2308     return block;
2309 }
2310
2311 /**
2312  * get_queued_page: unqueue a page from the postcopy requests
2313  *
2314  * Skips pages that are already sent (!dirty)
2315  *
2316  * Returns true if a queued page is found
2317  *
2318  * @rs: current RAM state
2319  * @pss: data about the state of the current dirty page scan
2320  */
2321 static bool get_queued_page(RAMState *rs, PageSearchStatus *pss)
2322 {
2323     RAMBlock  *block;
2324     ram_addr_t offset;
2325     bool dirty;
2326
2327     do {
2328         block = unqueue_page(rs, &offset);
2329         /*
2330          * We're sending this page, and since it's postcopy nothing else
2331          * will dirty it, and we must make sure it doesn't get sent again
2332          * even if this queue request was received after the background
2333          * search already sent it.
2334          */
2335         if (block) {
2336             unsigned long page;
2337
2338             page = offset >> TARGET_PAGE_BITS;
2339             dirty = test_bit(page, block->bmap);
2340             if (!dirty) {
2341                 trace_get_queued_page_not_dirty(block->idstr, (uint64_t)offset,
2342                        page, test_bit(page, block->unsentmap));
2343             } else {
2344                 trace_get_queued_page(block->idstr, (uint64_t)offset, page);
2345             }
2346         }
2347
2348     } while (block && !dirty);
2349
2350     if (block) {
2351         /*
2352          * As soon as we start servicing pages out of order, then we have
2353          * to kill the bulk stage, since the bulk stage assumes
2354          * in (migration_bitmap_find_and_reset_dirty) that every page is
2355          * dirty, that's no longer true.
2356          */
2357         rs->ram_bulk_stage = false;
2358
2359         /*
2360          * We want the background search to continue from the queued page
2361          * since the guest is likely to want other pages near to the page
2362          * it just requested.
2363          */
2364         pss->block = block;
2365         pss->page = offset >> TARGET_PAGE_BITS;
2366
2367         /*
2368          * This unqueued page would break the "one round" check, even is
2369          * really rare.
2370          */
2371         pss->complete_round = false;
2372     }
2373
2374     return !!block;
2375 }
2376
2377 /**
2378  * migration_page_queue_free: drop any remaining pages in the ram
2379  * request queue
2380  *
2381  * It should be empty at the end anyway, but in error cases there may
2382  * be some left.  in case that there is any page left, we drop it.
2383  *
2384  */
2385 static void migration_page_queue_free(RAMState *rs)
2386 {
2387     struct RAMSrcPageRequest *mspr, *next_mspr;
2388     /* This queue generally should be empty - but in the case of a failed
2389      * migration might have some droppings in.
2390      */
2391     rcu_read_lock();
2392     QSIMPLEQ_FOREACH_SAFE(mspr, &rs->src_page_requests, next_req, next_mspr) {
2393         memory_region_unref(mspr->rb->mr);
2394         QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2395         g_free(mspr);
2396     }
2397     rcu_read_unlock();
2398 }
2399
2400 /**
2401  * ram_save_queue_pages: queue the page for transmission
2402  *
2403  * A request from postcopy destination for example.
2404  *
2405  * Returns zero on success or negative on error
2406  *
2407  * @rbname: Name of the RAMBLock of the request. NULL means the
2408  *          same that last one.
2409  * @start: starting address from the start of the RAMBlock
2410  * @len: length (in bytes) to send
2411  */
2412 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len)
2413 {
2414     RAMBlock *ramblock;
2415     RAMState *rs = ram_state;
2416
2417     ram_counters.postcopy_requests++;
2418     rcu_read_lock();
2419     if (!rbname) {
2420         /* Reuse last RAMBlock */
2421         ramblock = rs->last_req_rb;
2422
2423         if (!ramblock) {
2424             /*
2425              * Shouldn't happen, we can't reuse the last RAMBlock if
2426              * it's the 1st request.
2427              */
2428             error_report("ram_save_queue_pages no previous block");
2429             goto err;
2430         }
2431     } else {
2432         ramblock = qemu_ram_block_by_name(rbname);
2433
2434         if (!ramblock) {
2435             /* We shouldn't be asked for a non-existent RAMBlock */
2436             error_report("ram_save_queue_pages no block '%s'", rbname);
2437             goto err;
2438         }
2439         rs->last_req_rb = ramblock;
2440     }
2441     trace_ram_save_queue_pages(ramblock->idstr, start, len);
2442     if (start+len > ramblock->used_length) {
2443         error_report("%s request overrun start=" RAM_ADDR_FMT " len="
2444                      RAM_ADDR_FMT " blocklen=" RAM_ADDR_FMT,
2445                      __func__, start, len, ramblock->used_length);
2446         goto err;
2447     }
2448
2449     struct RAMSrcPageRequest *new_entry =
2450         g_malloc0(sizeof(struct RAMSrcPageRequest));
2451     new_entry->rb = ramblock;
2452     new_entry->offset = start;
2453     new_entry->len = len;
2454
2455     memory_region_ref(ramblock->mr);
2456     qemu_mutex_lock(&rs->src_page_req_mutex);
2457     QSIMPLEQ_INSERT_TAIL(&rs->src_page_requests, new_entry, next_req);
2458     migration_make_urgent_request();
2459     qemu_mutex_unlock(&rs->src_page_req_mutex);
2460     rcu_read_unlock();
2461
2462     return 0;
2463
2464 err:
2465     rcu_read_unlock();
2466     return -1;
2467 }
2468
2469 static bool save_page_use_compression(RAMState *rs)
2470 {
2471     if (!migrate_use_compression()) {
2472         return false;
2473     }
2474
2475     /*
2476      * If xbzrle is on, stop using the data compression after first
2477      * round of migration even if compression is enabled. In theory,
2478      * xbzrle can do better than compression.
2479      */
2480     if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
2481         return true;
2482     }
2483
2484     return false;
2485 }
2486
2487 /*
2488  * try to compress the page before posting it out, return true if the page
2489  * has been properly handled by compression, otherwise needs other
2490  * paths to handle it
2491  */
2492 static bool save_compress_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
2493 {
2494     if (!save_page_use_compression(rs)) {
2495         return false;
2496     }
2497
2498     /*
2499      * When starting the process of a new block, the first page of
2500      * the block should be sent out before other pages in the same
2501      * block, and all the pages in last block should have been sent
2502      * out, keeping this order is important, because the 'cont' flag
2503      * is used to avoid resending the block name.
2504      *
2505      * We post the fist page as normal page as compression will take
2506      * much CPU resource.
2507      */
2508     if (block != rs->last_sent_block) {
2509         flush_compressed_data(rs);
2510         return false;
2511     }
2512
2513     if (compress_page_with_multi_thread(rs, block, offset) > 0) {
2514         return true;
2515     }
2516
2517     compression_counters.busy++;
2518     return false;
2519 }
2520
2521 /**
2522  * ram_save_target_page: save one target page
2523  *
2524  * Returns the number of pages written
2525  *
2526  * @rs: current RAM state
2527  * @pss: data about the page we want to send
2528  * @last_stage: if we are at the completion stage
2529  */
2530 static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
2531                                 bool last_stage)
2532 {
2533     RAMBlock *block = pss->block;
2534     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
2535     int res;
2536
2537     if (control_save_page(rs, block, offset, &res)) {
2538         return res;
2539     }
2540
2541     if (save_compress_page(rs, block, offset)) {
2542         return 1;
2543     }
2544
2545     res = save_zero_page(rs, block, offset);
2546     if (res > 0) {
2547         /* Must let xbzrle know, otherwise a previous (now 0'd) cached
2548          * page would be stale
2549          */
2550         if (!save_page_use_compression(rs)) {
2551             XBZRLE_cache_lock();
2552             xbzrle_cache_zero_page(rs, block->offset + offset);
2553             XBZRLE_cache_unlock();
2554         }
2555         ram_release_pages(block->idstr, offset, res);
2556         return res;
2557     }
2558
2559     /*
2560      * do not use multifd for compression as the first page in the new
2561      * block should be posted out before sending the compressed page
2562      */
2563     if (!save_page_use_compression(rs) && migrate_use_multifd()) {
2564         return ram_save_multifd_page(rs, block, offset);
2565     }
2566
2567     return ram_save_page(rs, pss, last_stage);
2568 }
2569
2570 /**
2571  * ram_save_host_page: save a whole host page
2572  *
2573  * Starting at *offset send pages up to the end of the current host
2574  * page. It's valid for the initial offset to point into the middle of
2575  * a host page in which case the remainder of the hostpage is sent.
2576  * Only dirty target pages are sent. Note that the host page size may
2577  * be a huge page for this block.
2578  * The saving stops at the boundary of the used_length of the block
2579  * if the RAMBlock isn't a multiple of the host page size.
2580  *
2581  * Returns the number of pages written or negative on error
2582  *
2583  * @rs: current RAM state
2584  * @ms: current migration state
2585  * @pss: data about the page we want to send
2586  * @last_stage: if we are at the completion stage
2587  */
2588 static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss,
2589                               bool last_stage)
2590 {
2591     int tmppages, pages = 0;
2592     size_t pagesize_bits =
2593         qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS;
2594
2595     if (ramblock_is_ignored(pss->block)) {
2596         error_report("block %s should not be migrated !", pss->block->idstr);
2597         return 0;
2598     }
2599
2600     do {
2601         /* Check the pages is dirty and if it is send it */
2602         if (!migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
2603             pss->page++;
2604             continue;
2605         }
2606
2607         tmppages = ram_save_target_page(rs, pss, last_stage);
2608         if (tmppages < 0) {
2609             return tmppages;
2610         }
2611
2612         pages += tmppages;
2613         if (pss->block->unsentmap) {
2614             clear_bit(pss->page, pss->block->unsentmap);
2615         }
2616
2617         pss->page++;
2618     } while ((pss->page & (pagesize_bits - 1)) &&
2619              offset_in_ramblock(pss->block, pss->page << TARGET_PAGE_BITS));
2620
2621     /* The offset we leave with is the last one we looked at */
2622     pss->page--;
2623     return pages;
2624 }
2625
2626 /**
2627  * ram_find_and_save_block: finds a dirty page and sends it to f
2628  *
2629  * Called within an RCU critical section.
2630  *
2631  * Returns the number of pages written where zero means no dirty pages,
2632  * or negative on error
2633  *
2634  * @rs: current RAM state
2635  * @last_stage: if we are at the completion stage
2636  *
2637  * On systems where host-page-size > target-page-size it will send all the
2638  * pages in a host page that are dirty.
2639  */
2640
2641 static int ram_find_and_save_block(RAMState *rs, bool last_stage)
2642 {
2643     PageSearchStatus pss;
2644     int pages = 0;
2645     bool again, found;
2646
2647     /* No dirty page as there is zero RAM */
2648     if (!ram_bytes_total()) {
2649         return pages;
2650     }
2651
2652     pss.block = rs->last_seen_block;
2653     pss.page = rs->last_page;
2654     pss.complete_round = false;
2655
2656     if (!pss.block) {
2657         pss.block = QLIST_FIRST_RCU(&ram_list.blocks);
2658     }
2659
2660     do {
2661         again = true;
2662         found = get_queued_page(rs, &pss);
2663
2664         if (!found) {
2665             /* priority queue empty, so just search for something dirty */
2666             found = find_dirty_block(rs, &pss, &again);
2667         }
2668
2669         if (found) {
2670             pages = ram_save_host_page(rs, &pss, last_stage);
2671         }
2672     } while (!pages && again);
2673
2674     rs->last_seen_block = pss.block;
2675     rs->last_page = pss.page;
2676
2677     return pages;
2678 }
2679
2680 void acct_update_position(QEMUFile *f, size_t size, bool zero)
2681 {
2682     uint64_t pages = size / TARGET_PAGE_SIZE;
2683
2684     if (zero) {
2685         ram_counters.duplicate += pages;
2686     } else {
2687         ram_counters.normal += pages;
2688         ram_counters.transferred += size;
2689         qemu_update_position(f, size);
2690     }
2691 }
2692
2693 static uint64_t ram_bytes_total_common(bool count_ignored)
2694 {
2695     RAMBlock *block;
2696     uint64_t total = 0;
2697
2698     rcu_read_lock();
2699     if (count_ignored) {
2700         RAMBLOCK_FOREACH_MIGRATABLE(block) {
2701             total += block->used_length;
2702         }
2703     } else {
2704         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2705             total += block->used_length;
2706         }
2707     }
2708     rcu_read_unlock();
2709     return total;
2710 }
2711
2712 uint64_t ram_bytes_total(void)
2713 {
2714     return ram_bytes_total_common(false);
2715 }
2716
2717 static void xbzrle_load_setup(void)
2718 {
2719     XBZRLE.decoded_buf = g_malloc(TARGET_PAGE_SIZE);
2720 }
2721
2722 static void xbzrle_load_cleanup(void)
2723 {
2724     g_free(XBZRLE.decoded_buf);
2725     XBZRLE.decoded_buf = NULL;
2726 }
2727
2728 static void ram_state_cleanup(RAMState **rsp)
2729 {
2730     if (*rsp) {
2731         migration_page_queue_free(*rsp);
2732         qemu_mutex_destroy(&(*rsp)->bitmap_mutex);
2733         qemu_mutex_destroy(&(*rsp)->src_page_req_mutex);
2734         g_free(*rsp);
2735         *rsp = NULL;
2736     }
2737 }
2738
2739 static void xbzrle_cleanup(void)
2740 {
2741     XBZRLE_cache_lock();
2742     if (XBZRLE.cache) {
2743         cache_fini(XBZRLE.cache);
2744         g_free(XBZRLE.encoded_buf);
2745         g_free(XBZRLE.current_buf);
2746         g_free(XBZRLE.zero_target_page);
2747         XBZRLE.cache = NULL;
2748         XBZRLE.encoded_buf = NULL;
2749         XBZRLE.current_buf = NULL;
2750         XBZRLE.zero_target_page = NULL;
2751     }
2752     XBZRLE_cache_unlock();
2753 }
2754
2755 static void ram_save_cleanup(void *opaque)
2756 {
2757     RAMState **rsp = opaque;
2758     RAMBlock *block;
2759
2760     /* caller have hold iothread lock or is in a bh, so there is
2761      * no writing race against the migration bitmap
2762      */
2763     memory_global_dirty_log_stop();
2764
2765     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2766         g_free(block->clear_bmap);
2767         block->clear_bmap = NULL;
2768         g_free(block->bmap);
2769         block->bmap = NULL;
2770         g_free(block->unsentmap);
2771         block->unsentmap = NULL;
2772     }
2773
2774     xbzrle_cleanup();
2775     compress_threads_save_cleanup();
2776     ram_state_cleanup(rsp);
2777 }
2778
2779 static void ram_state_reset(RAMState *rs)
2780 {
2781     rs->last_seen_block = NULL;
2782     rs->last_sent_block = NULL;
2783     rs->last_page = 0;
2784     rs->last_version = ram_list.version;
2785     rs->ram_bulk_stage = true;
2786     rs->fpo_enabled = false;
2787 }
2788
2789 #define MAX_WAIT 50 /* ms, half buffered_file limit */
2790
2791 /*
2792  * 'expected' is the value you expect the bitmap mostly to be full
2793  * of; it won't bother printing lines that are all this value.
2794  * If 'todump' is null the migration bitmap is dumped.
2795  */
2796 void ram_debug_dump_bitmap(unsigned long *todump, bool expected,
2797                            unsigned long pages)
2798 {
2799     int64_t cur;
2800     int64_t linelen = 128;
2801     char linebuf[129];
2802
2803     for (cur = 0; cur < pages; cur += linelen) {
2804         int64_t curb;
2805         bool found = false;
2806         /*
2807          * Last line; catch the case where the line length
2808          * is longer than remaining ram
2809          */
2810         if (cur + linelen > pages) {
2811             linelen = pages - cur;
2812         }
2813         for (curb = 0; curb < linelen; curb++) {
2814             bool thisbit = test_bit(cur + curb, todump);
2815             linebuf[curb] = thisbit ? '1' : '.';
2816             found = found || (thisbit != expected);
2817         }
2818         if (found) {
2819             linebuf[curb] = '\0';
2820             fprintf(stderr,  "0x%08" PRIx64 " : %s\n", cur, linebuf);
2821         }
2822     }
2823 }
2824
2825 /* **** functions for postcopy ***** */
2826
2827 void ram_postcopy_migrated_memory_release(MigrationState *ms)
2828 {
2829     struct RAMBlock *block;
2830
2831     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2832         unsigned long *bitmap = block->bmap;
2833         unsigned long range = block->used_length >> TARGET_PAGE_BITS;
2834         unsigned long run_start = find_next_zero_bit(bitmap, range, 0);
2835
2836         while (run_start < range) {
2837             unsigned long run_end = find_next_bit(bitmap, range, run_start + 1);
2838             ram_discard_range(block->idstr, run_start << TARGET_PAGE_BITS,
2839                               (run_end - run_start) << TARGET_PAGE_BITS);
2840             run_start = find_next_zero_bit(bitmap, range, run_end + 1);
2841         }
2842     }
2843 }
2844
2845 /**
2846  * postcopy_send_discard_bm_ram: discard a RAMBlock
2847  *
2848  * Returns zero on success
2849  *
2850  * Callback from postcopy_each_ram_send_discard for each RAMBlock
2851  * Note: At this point the 'unsentmap' is the processed bitmap combined
2852  *       with the dirtymap; so a '1' means it's either dirty or unsent.
2853  *
2854  * @ms: current migration state
2855  * @block: RAMBlock to discard
2856  */
2857 static int postcopy_send_discard_bm_ram(MigrationState *ms, RAMBlock *block)
2858 {
2859     unsigned long end = block->used_length >> TARGET_PAGE_BITS;
2860     unsigned long current;
2861     unsigned long *unsentmap = block->unsentmap;
2862
2863     for (current = 0; current < end; ) {
2864         unsigned long one = find_next_bit(unsentmap, end, current);
2865         unsigned long zero, discard_length;
2866
2867         if (one >= end) {
2868             break;
2869         }
2870
2871         zero = find_next_zero_bit(unsentmap, end, one + 1);
2872
2873         if (zero >= end) {
2874             discard_length = end - one;
2875         } else {
2876             discard_length = zero - one;
2877         }
2878         postcopy_discard_send_range(ms, one, discard_length);
2879         current = one + discard_length;
2880     }
2881
2882     return 0;
2883 }
2884
2885 /**
2886  * postcopy_each_ram_send_discard: discard all RAMBlocks
2887  *
2888  * Returns 0 for success or negative for error
2889  *
2890  * Utility for the outgoing postcopy code.
2891  *   Calls postcopy_send_discard_bm_ram for each RAMBlock
2892  *   passing it bitmap indexes and name.
2893  * (qemu_ram_foreach_block ends up passing unscaled lengths
2894  *  which would mean postcopy code would have to deal with target page)
2895  *
2896  * @ms: current migration state
2897  */
2898 static int postcopy_each_ram_send_discard(MigrationState *ms)
2899 {
2900     struct RAMBlock *block;
2901     int ret;
2902
2903     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2904         postcopy_discard_send_init(ms, block->idstr);
2905
2906         /*
2907          * Postcopy sends chunks of bitmap over the wire, but it
2908          * just needs indexes at this point, avoids it having
2909          * target page specific code.
2910          */
2911         ret = postcopy_send_discard_bm_ram(ms, block);
2912         postcopy_discard_send_finish(ms);
2913         if (ret) {
2914             return ret;
2915         }
2916     }
2917
2918     return 0;
2919 }
2920
2921 /**
2922  * postcopy_chunk_hostpages_pass: canocalize bitmap in hostpages
2923  *
2924  * Helper for postcopy_chunk_hostpages; it's called twice to
2925  * canonicalize the two bitmaps, that are similar, but one is
2926  * inverted.
2927  *
2928  * Postcopy requires that all target pages in a hostpage are dirty or
2929  * clean, not a mix.  This function canonicalizes the bitmaps.
2930  *
2931  * @ms: current migration state
2932  * @unsent_pass: if true we need to canonicalize partially unsent host pages
2933  *               otherwise we need to canonicalize partially dirty host pages
2934  * @block: block that contains the page we want to canonicalize
2935  */
2936 static void postcopy_chunk_hostpages_pass(MigrationState *ms, bool unsent_pass,
2937                                           RAMBlock *block)
2938 {
2939     RAMState *rs = ram_state;
2940     unsigned long *bitmap = block->bmap;
2941     unsigned long *unsentmap = block->unsentmap;
2942     unsigned int host_ratio = block->page_size / TARGET_PAGE_SIZE;
2943     unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
2944     unsigned long run_start;
2945
2946     if (block->page_size == TARGET_PAGE_SIZE) {
2947         /* Easy case - TPS==HPS for a non-huge page RAMBlock */
2948         return;
2949     }
2950
2951     if (unsent_pass) {
2952         /* Find a sent page */
2953         run_start = find_next_zero_bit(unsentmap, pages, 0);
2954     } else {
2955         /* Find a dirty page */
2956         run_start = find_next_bit(bitmap, pages, 0);
2957     }
2958
2959     while (run_start < pages) {
2960
2961         /*
2962          * If the start of this run of pages is in the middle of a host
2963          * page, then we need to fixup this host page.
2964          */
2965         if (QEMU_IS_ALIGNED(run_start, host_ratio)) {
2966             /* Find the end of this run */
2967             if (unsent_pass) {
2968                 run_start = find_next_bit(unsentmap, pages, run_start + 1);
2969             } else {
2970                 run_start = find_next_zero_bit(bitmap, pages, run_start + 1);
2971             }
2972             /*
2973              * If the end isn't at the start of a host page, then the
2974              * run doesn't finish at the end of a host page
2975              * and we need to discard.
2976              */
2977         }
2978
2979         if (!QEMU_IS_ALIGNED(run_start, host_ratio)) {
2980             unsigned long page;
2981             unsigned long fixup_start_addr = QEMU_ALIGN_DOWN(run_start,
2982                                                              host_ratio);
2983             run_start = QEMU_ALIGN_UP(run_start, host_ratio);
2984
2985             /* Tell the destination to discard this page */
2986             if (unsent_pass || !test_bit(fixup_start_addr, unsentmap)) {
2987                 /* For the unsent_pass we:
2988                  *     discard partially sent pages
2989                  * For the !unsent_pass (dirty) we:
2990                  *     discard partially dirty pages that were sent
2991                  *     (any partially sent pages were already discarded
2992                  *     by the previous unsent_pass)
2993                  */
2994                 postcopy_discard_send_range(ms, fixup_start_addr, host_ratio);
2995             }
2996
2997             /* Clean up the bitmap */
2998             for (page = fixup_start_addr;
2999                  page < fixup_start_addr + host_ratio; page++) {
3000                 /* All pages in this host page are now not sent */
3001                 set_bit(page, unsentmap);
3002
3003                 /*
3004                  * Remark them as dirty, updating the count for any pages
3005                  * that weren't previously dirty.
3006                  */
3007                 rs->migration_dirty_pages += !test_and_set_bit(page, bitmap);
3008             }
3009         }
3010
3011         if (unsent_pass) {
3012             /* Find the next sent page for the next iteration */
3013             run_start = find_next_zero_bit(unsentmap, pages, run_start);
3014         } else {
3015             /* Find the next dirty page for the next iteration */
3016             run_start = find_next_bit(bitmap, pages, run_start);
3017         }
3018     }
3019 }
3020
3021 /**
3022  * postcopy_chunk_hostpages: discard any partially sent host page
3023  *
3024  * Utility for the outgoing postcopy code.
3025  *
3026  * Discard any partially sent host-page size chunks, mark any partially
3027  * dirty host-page size chunks as all dirty.  In this case the host-page
3028  * is the host-page for the particular RAMBlock, i.e. it might be a huge page
3029  *
3030  * Returns zero on success
3031  *
3032  * @ms: current migration state
3033  * @block: block we want to work with
3034  */
3035 static int postcopy_chunk_hostpages(MigrationState *ms, RAMBlock *block)
3036 {
3037     postcopy_discard_send_init(ms, block->idstr);
3038
3039     /* First pass: Discard all partially sent host pages */
3040     postcopy_chunk_hostpages_pass(ms, true, block);
3041     /*
3042      * Second pass: Ensure that all partially dirty host pages are made
3043      * fully dirty.
3044      */
3045     postcopy_chunk_hostpages_pass(ms, false, block);
3046
3047     postcopy_discard_send_finish(ms);
3048     return 0;
3049 }
3050
3051 /**
3052  * ram_postcopy_send_discard_bitmap: transmit the discard bitmap
3053  *
3054  * Returns zero on success
3055  *
3056  * Transmit the set of pages to be discarded after precopy to the target
3057  * these are pages that:
3058  *     a) Have been previously transmitted but are now dirty again
3059  *     b) Pages that have never been transmitted, this ensures that
3060  *        any pages on the destination that have been mapped by background
3061  *        tasks get discarded (transparent huge pages is the specific concern)
3062  * Hopefully this is pretty sparse
3063  *
3064  * @ms: current migration state
3065  */
3066 int ram_postcopy_send_discard_bitmap(MigrationState *ms)
3067 {
3068     RAMState *rs = ram_state;
3069     RAMBlock *block;
3070     int ret;
3071
3072     rcu_read_lock();
3073
3074     /* This should be our last sync, the src is now paused */
3075     migration_bitmap_sync(rs);
3076
3077     /* Easiest way to make sure we don't resume in the middle of a host-page */
3078     rs->last_seen_block = NULL;
3079     rs->last_sent_block = NULL;
3080     rs->last_page = 0;
3081
3082     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3083         unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
3084         unsigned long *bitmap = block->bmap;
3085         unsigned long *unsentmap = block->unsentmap;
3086
3087         if (!unsentmap) {
3088             /* We don't have a safe way to resize the sentmap, so
3089              * if the bitmap was resized it will be NULL at this
3090              * point.
3091              */
3092             error_report("migration ram resized during precopy phase");
3093             rcu_read_unlock();
3094             return -EINVAL;
3095         }
3096         /* Deal with TPS != HPS and huge pages */
3097         ret = postcopy_chunk_hostpages(ms, block);
3098         if (ret) {
3099             rcu_read_unlock();
3100             return ret;
3101         }
3102
3103         /*
3104          * Update the unsentmap to be unsentmap = unsentmap | dirty
3105          */
3106         bitmap_or(unsentmap, unsentmap, bitmap, pages);
3107 #ifdef DEBUG_POSTCOPY
3108         ram_debug_dump_bitmap(unsentmap, true, pages);
3109 #endif
3110     }
3111     trace_ram_postcopy_send_discard_bitmap();
3112
3113     ret = postcopy_each_ram_send_discard(ms);
3114     rcu_read_unlock();
3115
3116     return ret;
3117 }
3118
3119 /**
3120  * ram_discard_range: discard dirtied pages at the beginning of postcopy
3121  *
3122  * Returns zero on success
3123  *
3124  * @rbname: name of the RAMBlock of the request. NULL means the
3125  *          same that last one.
3126  * @start: RAMBlock starting page
3127  * @length: RAMBlock size
3128  */
3129 int ram_discard_range(const char *rbname, uint64_t start, size_t length)
3130 {
3131     int ret = -1;
3132
3133     trace_ram_discard_range(rbname, start, length);
3134
3135     rcu_read_lock();
3136     RAMBlock *rb = qemu_ram_block_by_name(rbname);
3137
3138     if (!rb) {
3139         error_report("ram_discard_range: Failed to find block '%s'", rbname);
3140         goto err;
3141     }
3142
3143     /*
3144      * On source VM, we don't need to update the received bitmap since
3145      * we don't even have one.
3146      */
3147     if (rb->receivedmap) {
3148         bitmap_clear(rb->receivedmap, start >> qemu_target_page_bits(),
3149                      length >> qemu_target_page_bits());
3150     }
3151
3152     ret = ram_block_discard_range(rb, start, length);
3153
3154 err:
3155     rcu_read_unlock();
3156
3157     return ret;
3158 }
3159
3160 /*
3161  * For every allocation, we will try not to crash the VM if the
3162  * allocation failed.
3163  */
3164 static int xbzrle_init(void)
3165 {
3166     Error *local_err = NULL;
3167
3168     if (!migrate_use_xbzrle()) {
3169         return 0;
3170     }
3171
3172     XBZRLE_cache_lock();
3173
3174     XBZRLE.zero_target_page = g_try_malloc0(TARGET_PAGE_SIZE);
3175     if (!XBZRLE.zero_target_page) {
3176         error_report("%s: Error allocating zero page", __func__);
3177         goto err_out;
3178     }
3179
3180     XBZRLE.cache = cache_init(migrate_xbzrle_cache_size(),
3181                               TARGET_PAGE_SIZE, &local_err);
3182     if (!XBZRLE.cache) {
3183         error_report_err(local_err);
3184         goto free_zero_page;
3185     }
3186
3187     XBZRLE.encoded_buf = g_try_malloc0(TARGET_PAGE_SIZE);
3188     if (!XBZRLE.encoded_buf) {
3189         error_report("%s: Error allocating encoded_buf", __func__);
3190         goto free_cache;
3191     }
3192
3193     XBZRLE.current_buf = g_try_malloc(TARGET_PAGE_SIZE);
3194     if (!XBZRLE.current_buf) {
3195         error_report("%s: Error allocating current_buf", __func__);
3196         goto free_encoded_buf;
3197     }
3198
3199     /* We are all good */
3200     XBZRLE_cache_unlock();
3201     return 0;
3202
3203 free_encoded_buf:
3204     g_free(XBZRLE.encoded_buf);
3205     XBZRLE.encoded_buf = NULL;
3206 free_cache:
3207     cache_fini(XBZRLE.cache);
3208     XBZRLE.cache = NULL;
3209 free_zero_page:
3210     g_free(XBZRLE.zero_target_page);
3211     XBZRLE.zero_target_page = NULL;
3212 err_out:
3213     XBZRLE_cache_unlock();
3214     return -ENOMEM;
3215 }
3216
3217 static int ram_state_init(RAMState **rsp)
3218 {
3219     *rsp = g_try_new0(RAMState, 1);
3220
3221     if (!*rsp) {
3222         error_report("%s: Init ramstate fail", __func__);
3223         return -1;
3224     }
3225
3226     qemu_mutex_init(&(*rsp)->bitmap_mutex);
3227     qemu_mutex_init(&(*rsp)->src_page_req_mutex);
3228     QSIMPLEQ_INIT(&(*rsp)->src_page_requests);
3229
3230     /*
3231      * Count the total number of pages used by ram blocks not including any
3232      * gaps due to alignment or unplugs.
3233      * This must match with the initial values of dirty bitmap.
3234      */
3235     (*rsp)->migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
3236     ram_state_reset(*rsp);
3237
3238     return 0;
3239 }
3240
3241 static void ram_list_init_bitmaps(void)
3242 {
3243     MigrationState *ms = migrate_get_current();
3244     RAMBlock *block;
3245     unsigned long pages;
3246     uint8_t shift;
3247
3248     /* Skip setting bitmap if there is no RAM */
3249     if (ram_bytes_total()) {
3250         shift = ms->clear_bitmap_shift;
3251         if (shift > CLEAR_BITMAP_SHIFT_MAX) {
3252             error_report("clear_bitmap_shift (%u) too big, using "
3253                          "max value (%u)", shift, CLEAR_BITMAP_SHIFT_MAX);
3254             shift = CLEAR_BITMAP_SHIFT_MAX;
3255         } else if (shift < CLEAR_BITMAP_SHIFT_MIN) {
3256             error_report("clear_bitmap_shift (%u) too small, using "
3257                          "min value (%u)", shift, CLEAR_BITMAP_SHIFT_MIN);
3258             shift = CLEAR_BITMAP_SHIFT_MIN;
3259         }
3260
3261         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3262             pages = block->max_length >> TARGET_PAGE_BITS;
3263             /*
3264              * The initial dirty bitmap for migration must be set with all
3265              * ones to make sure we'll migrate every guest RAM page to
3266              * destination.
3267              * Here we set RAMBlock.bmap all to 1 because when rebegin a
3268              * new migration after a failed migration, ram_list.
3269              * dirty_memory[DIRTY_MEMORY_MIGRATION] don't include the whole
3270              * guest memory.
3271              */
3272             block->bmap = bitmap_new(pages);
3273             bitmap_set(block->bmap, 0, pages);
3274             block->clear_bmap_shift = shift;
3275             block->clear_bmap = bitmap_new(clear_bmap_size(pages, shift));
3276             if (migrate_postcopy_ram()) {
3277                 block->unsentmap = bitmap_new(pages);
3278                 bitmap_set(block->unsentmap, 0, pages);
3279             }
3280         }
3281     }
3282 }
3283
3284 static void ram_init_bitmaps(RAMState *rs)
3285 {
3286     /* For memory_global_dirty_log_start below.  */
3287     qemu_mutex_lock_iothread();
3288     qemu_mutex_lock_ramlist();
3289     rcu_read_lock();
3290
3291     ram_list_init_bitmaps();
3292     memory_global_dirty_log_start();
3293     migration_bitmap_sync_precopy(rs);
3294
3295     rcu_read_unlock();
3296     qemu_mutex_unlock_ramlist();
3297     qemu_mutex_unlock_iothread();
3298 }
3299
3300 static int ram_init_all(RAMState **rsp)
3301 {
3302     if (ram_state_init(rsp)) {
3303         return -1;
3304     }
3305
3306     if (xbzrle_init()) {
3307         ram_state_cleanup(rsp);
3308         return -1;
3309     }
3310
3311     ram_init_bitmaps(*rsp);
3312
3313     return 0;
3314 }
3315
3316 static void ram_state_resume_prepare(RAMState *rs, QEMUFile *out)
3317 {
3318     RAMBlock *block;
3319     uint64_t pages = 0;
3320
3321     /*
3322      * Postcopy is not using xbzrle/compression, so no need for that.
3323      * Also, since source are already halted, we don't need to care
3324      * about dirty page logging as well.
3325      */
3326
3327     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3328         pages += bitmap_count_one(block->bmap,
3329                                   block->used_length >> TARGET_PAGE_BITS);
3330     }
3331
3332     /* This may not be aligned with current bitmaps. Recalculate. */
3333     rs->migration_dirty_pages = pages;
3334
3335     rs->last_seen_block = NULL;
3336     rs->last_sent_block = NULL;
3337     rs->last_page = 0;
3338     rs->last_version = ram_list.version;
3339     /*
3340      * Disable the bulk stage, otherwise we'll resend the whole RAM no
3341      * matter what we have sent.
3342      */
3343     rs->ram_bulk_stage = false;
3344
3345     /* Update RAMState cache of output QEMUFile */
3346     rs->f = out;
3347
3348     trace_ram_state_resume_prepare(pages);
3349 }
3350
3351 /*
3352  * This function clears bits of the free pages reported by the caller from the
3353  * migration dirty bitmap. @addr is the host address corresponding to the
3354  * start of the continuous guest free pages, and @len is the total bytes of
3355  * those pages.
3356  */
3357 void qemu_guest_free_page_hint(void *addr, size_t len)
3358 {
3359     RAMBlock *block;
3360     ram_addr_t offset;
3361     size_t used_len, start, npages;
3362     MigrationState *s = migrate_get_current();
3363
3364     /* This function is currently expected to be used during live migration */
3365     if (!migration_is_setup_or_active(s->state)) {
3366         return;
3367     }
3368
3369     for (; len > 0; len -= used_len, addr += used_len) {
3370         block = qemu_ram_block_from_host(addr, false, &offset);
3371         if (unlikely(!block || offset >= block->used_length)) {
3372             /*
3373              * The implementation might not support RAMBlock resize during
3374              * live migration, but it could happen in theory with future
3375              * updates. So we add a check here to capture that case.
3376              */
3377             error_report_once("%s unexpected error", __func__);
3378             return;
3379         }
3380
3381         if (len <= block->used_length - offset) {
3382             used_len = len;
3383         } else {
3384             used_len = block->used_length - offset;
3385         }
3386
3387         start = offset >> TARGET_PAGE_BITS;
3388         npages = used_len >> TARGET_PAGE_BITS;
3389
3390         qemu_mutex_lock(&ram_state->bitmap_mutex);
3391         ram_state->migration_dirty_pages -=
3392                       bitmap_count_one_with_offset(block->bmap, start, npages);
3393         bitmap_clear(block->bmap, start, npages);
3394         qemu_mutex_unlock(&ram_state->bitmap_mutex);
3395     }
3396 }
3397
3398 /*
3399  * Each of ram_save_setup, ram_save_iterate and ram_save_complete has
3400  * long-running RCU critical section.  When rcu-reclaims in the code
3401  * start to become numerous it will be necessary to reduce the
3402  * granularity of these critical sections.
3403  */
3404
3405 /**
3406  * ram_save_setup: Setup RAM for migration
3407  *
3408  * Returns zero to indicate success and negative for error
3409  *
3410  * @f: QEMUFile where to send the data
3411  * @opaque: RAMState pointer
3412  */
3413 static int ram_save_setup(QEMUFile *f, void *opaque)
3414 {
3415     RAMState **rsp = opaque;
3416     RAMBlock *block;
3417
3418     if (compress_threads_save_setup()) {
3419         return -1;
3420     }
3421
3422     /* migration has already setup the bitmap, reuse it. */
3423     if (!migration_in_colo_state()) {
3424         if (ram_init_all(rsp) != 0) {
3425             compress_threads_save_cleanup();
3426             return -1;
3427         }
3428     }
3429     (*rsp)->f = f;
3430
3431     rcu_read_lock();
3432
3433     qemu_put_be64(f, ram_bytes_total_common(true) | RAM_SAVE_FLAG_MEM_SIZE);
3434
3435     RAMBLOCK_FOREACH_MIGRATABLE(block) {
3436         qemu_put_byte(f, strlen(block->idstr));
3437         qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
3438         qemu_put_be64(f, block->used_length);
3439         if (migrate_postcopy_ram() && block->page_size != qemu_host_page_size) {
3440             qemu_put_be64(f, block->page_size);
3441         }
3442         if (migrate_ignore_shared()) {
3443             qemu_put_be64(f, block->mr->addr);
3444         }
3445     }
3446
3447     rcu_read_unlock();
3448
3449     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
3450     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
3451
3452     multifd_send_sync_main(*rsp);
3453     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3454     qemu_fflush(f);
3455
3456     return 0;
3457 }
3458
3459 /**
3460  * ram_save_iterate: iterative stage for migration
3461  *
3462  * Returns zero to indicate success and negative for error
3463  *
3464  * @f: QEMUFile where to send the data
3465  * @opaque: RAMState pointer
3466  */
3467 static int ram_save_iterate(QEMUFile *f, void *opaque)
3468 {
3469     RAMState **temp = opaque;
3470     RAMState *rs = *temp;
3471     int ret;
3472     int i;
3473     int64_t t0;
3474     int done = 0;
3475
3476     if (blk_mig_bulk_active()) {
3477         /* Avoid transferring ram during bulk phase of block migration as
3478          * the bulk phase will usually take a long time and transferring
3479          * ram updates during that time is pointless. */
3480         goto out;
3481     }
3482
3483     rcu_read_lock();
3484     if (ram_list.version != rs->last_version) {
3485         ram_state_reset(rs);
3486     }
3487
3488     /* Read version before ram_list.blocks */
3489     smp_rmb();
3490
3491     ram_control_before_iterate(f, RAM_CONTROL_ROUND);
3492
3493     t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
3494     i = 0;
3495     while ((ret = qemu_file_rate_limit(f)) == 0 ||
3496             !QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
3497         int pages;
3498
3499         if (qemu_file_get_error(f)) {
3500             break;
3501         }
3502
3503         pages = ram_find_and_save_block(rs, false);
3504         /* no more pages to sent */
3505         if (pages == 0) {
3506             done = 1;
3507             break;
3508         }
3509
3510         if (pages < 0) {
3511             qemu_file_set_error(f, pages);
3512             break;
3513         }
3514
3515         rs->target_page_count += pages;
3516
3517         /* we want to check in the 1st loop, just in case it was the 1st time
3518            and we had to sync the dirty bitmap.
3519            qemu_clock_get_ns() is a bit expensive, so we only check each some
3520            iterations
3521         */
3522         if ((i & 63) == 0) {
3523             uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) / 1000000;
3524             if (t1 > MAX_WAIT) {
3525                 trace_ram_save_iterate_big_wait(t1, i);
3526                 break;
3527             }
3528         }
3529         i++;
3530     }
3531     rcu_read_unlock();
3532
3533     /*
3534      * Must occur before EOS (or any QEMUFile operation)
3535      * because of RDMA protocol.
3536      */
3537     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
3538
3539 out:
3540     multifd_send_sync_main(rs);
3541     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3542     qemu_fflush(f);
3543     ram_counters.transferred += 8;
3544
3545     ret = qemu_file_get_error(f);
3546     if (ret < 0) {
3547         return ret;
3548     }
3549
3550     return done;
3551 }
3552
3553 /**
3554  * ram_save_complete: function called to send the remaining amount of ram
3555  *
3556  * Returns zero to indicate success or negative on error
3557  *
3558  * Called with iothread lock
3559  *
3560  * @f: QEMUFile where to send the data
3561  * @opaque: RAMState pointer
3562  */
3563 static int ram_save_complete(QEMUFile *f, void *opaque)
3564 {
3565     RAMState **temp = opaque;
3566     RAMState *rs = *temp;
3567     int ret = 0;
3568
3569     rcu_read_lock();
3570
3571     if (!migration_in_postcopy()) {
3572         migration_bitmap_sync_precopy(rs);
3573     }
3574
3575     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
3576
3577     /* try transferring iterative blocks of memory */
3578
3579     /* flush all remaining blocks regardless of rate limiting */
3580     while (true) {
3581         int pages;
3582
3583         pages = ram_find_and_save_block(rs, !migration_in_colo_state());
3584         /* no more blocks to sent */
3585         if (pages == 0) {
3586             break;
3587         }
3588         if (pages < 0) {
3589             ret = pages;
3590             break;
3591         }
3592     }
3593
3594     flush_compressed_data(rs);
3595     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
3596
3597     rcu_read_unlock();
3598
3599     multifd_send_sync_main(rs);
3600     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3601     qemu_fflush(f);
3602
3603     return ret;
3604 }
3605
3606 static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
3607                              uint64_t *res_precopy_only,
3608                              uint64_t *res_compatible,
3609                              uint64_t *res_postcopy_only)
3610 {
3611     RAMState **temp = opaque;
3612     RAMState *rs = *temp;
3613     uint64_t remaining_size;
3614
3615     remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3616
3617     if (!migration_in_postcopy() &&
3618         remaining_size < max_size) {
3619         qemu_mutex_lock_iothread();
3620         rcu_read_lock();
3621         migration_bitmap_sync_precopy(rs);
3622         rcu_read_unlock();
3623         qemu_mutex_unlock_iothread();
3624         remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3625     }
3626
3627     if (migrate_postcopy_ram()) {
3628         /* We can do postcopy, and all the data is postcopiable */
3629         *res_compatible += remaining_size;
3630     } else {
3631         *res_precopy_only += remaining_size;
3632     }
3633 }
3634
3635 static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
3636 {
3637     unsigned int xh_len;
3638     int xh_flags;
3639     uint8_t *loaded_data;
3640
3641     /* extract RLE header */
3642     xh_flags = qemu_get_byte(f);
3643     xh_len = qemu_get_be16(f);
3644
3645     if (xh_flags != ENCODING_FLAG_XBZRLE) {
3646         error_report("Failed to load XBZRLE page - wrong compression!");
3647         return -1;
3648     }
3649
3650     if (xh_len > TARGET_PAGE_SIZE) {
3651         error_report("Failed to load XBZRLE page - len overflow!");
3652         return -1;
3653     }
3654     loaded_data = XBZRLE.decoded_buf;
3655     /* load data and decode */
3656     /* it can change loaded_data to point to an internal buffer */
3657     qemu_get_buffer_in_place(f, &loaded_data, xh_len);
3658
3659     /* decode RLE */
3660     if (xbzrle_decode_buffer(loaded_data, xh_len, host,
3661                              TARGET_PAGE_SIZE) == -1) {
3662         error_report("Failed to load XBZRLE page - decode error!");
3663         return -1;
3664     }
3665
3666     return 0;
3667 }
3668
3669 /**
3670  * ram_block_from_stream: read a RAMBlock id from the migration stream
3671  *
3672  * Must be called from within a rcu critical section.
3673  *
3674  * Returns a pointer from within the RCU-protected ram_list.
3675  *
3676  * @f: QEMUFile where to read the data from
3677  * @flags: Page flags (mostly to see if it's a continuation of previous block)
3678  */
3679 static inline RAMBlock *ram_block_from_stream(QEMUFile *f, int flags)
3680 {
3681     static RAMBlock *block = NULL;
3682     char id[256];
3683     uint8_t len;
3684
3685     if (flags & RAM_SAVE_FLAG_CONTINUE) {
3686         if (!block) {
3687             error_report("Ack, bad migration stream!");
3688             return NULL;
3689         }
3690         return block;
3691     }
3692
3693     len = qemu_get_byte(f);
3694     qemu_get_buffer(f, (uint8_t *)id, len);
3695     id[len] = 0;
3696
3697     block = qemu_ram_block_by_name(id);
3698     if (!block) {
3699         error_report("Can't find block %s", id);
3700         return NULL;
3701     }
3702
3703     if (ramblock_is_ignored(block)) {
3704         error_report("block %s should not be migrated !", id);
3705         return NULL;
3706     }
3707
3708     return block;
3709 }
3710
3711 static inline void *host_from_ram_block_offset(RAMBlock *block,
3712                                                ram_addr_t offset)
3713 {
3714     if (!offset_in_ramblock(block, offset)) {
3715         return NULL;
3716     }
3717
3718     return block->host + offset;
3719 }
3720
3721 static inline void *colo_cache_from_block_offset(RAMBlock *block,
3722                                                  ram_addr_t offset)
3723 {
3724     if (!offset_in_ramblock(block, offset)) {
3725         return NULL;
3726     }
3727     if (!block->colo_cache) {
3728         error_report("%s: colo_cache is NULL in block :%s",
3729                      __func__, block->idstr);
3730         return NULL;
3731     }
3732
3733     /*
3734     * During colo checkpoint, we need bitmap of these migrated pages.
3735     * It help us to decide which pages in ram cache should be flushed
3736     * into VM's RAM later.
3737     */
3738     if (!test_and_set_bit(offset >> TARGET_PAGE_BITS, block->bmap)) {
3739         ram_state->migration_dirty_pages++;
3740     }
3741     return block->colo_cache + offset;
3742 }
3743
3744 /**
3745  * ram_handle_compressed: handle the zero page case
3746  *
3747  * If a page (or a whole RDMA chunk) has been
3748  * determined to be zero, then zap it.
3749  *
3750  * @host: host address for the zero page
3751  * @ch: what the page is filled from.  We only support zero
3752  * @size: size of the zero page
3753  */
3754 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
3755 {
3756     if (ch != 0 || !is_zero_range(host, size)) {
3757         memset(host, ch, size);
3758     }
3759 }
3760
3761 /* return the size after decompression, or negative value on error */
3762 static int
3763 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
3764                      const uint8_t *source, size_t source_len)
3765 {
3766     int err;
3767
3768     err = inflateReset(stream);
3769     if (err != Z_OK) {
3770         return -1;
3771     }
3772
3773     stream->avail_in = source_len;
3774     stream->next_in = (uint8_t *)source;
3775     stream->avail_out = dest_len;
3776     stream->next_out = dest;
3777
3778     err = inflate(stream, Z_NO_FLUSH);
3779     if (err != Z_STREAM_END) {
3780         return -1;
3781     }
3782
3783     return stream->total_out;
3784 }
3785
3786 static void *do_data_decompress(void *opaque)
3787 {
3788     DecompressParam *param = opaque;
3789     unsigned long pagesize;
3790     uint8_t *des;
3791     int len, ret;
3792
3793     qemu_mutex_lock(&param->mutex);
3794     while (!param->quit) {
3795         if (param->des) {
3796             des = param->des;
3797             len = param->len;
3798             param->des = 0;
3799             qemu_mutex_unlock(&param->mutex);
3800
3801             pagesize = TARGET_PAGE_SIZE;
3802
3803             ret = qemu_uncompress_data(&param->stream, des, pagesize,
3804                                        param->compbuf, len);
3805             if (ret < 0 && migrate_get_current()->decompress_error_check) {
3806                 error_report("decompress data failed");
3807                 qemu_file_set_error(decomp_file, ret);
3808             }
3809
3810             qemu_mutex_lock(&decomp_done_lock);
3811             param->done = true;
3812             qemu_cond_signal(&decomp_done_cond);
3813             qemu_mutex_unlock(&decomp_done_lock);
3814
3815             qemu_mutex_lock(&param->mutex);
3816         } else {
3817             qemu_cond_wait(&param->cond, &param->mutex);
3818         }
3819     }
3820     qemu_mutex_unlock(&param->mutex);
3821
3822     return NULL;
3823 }
3824
3825 static int wait_for_decompress_done(void)
3826 {
3827     int idx, thread_count;
3828
3829     if (!migrate_use_compression()) {
3830         return 0;
3831     }
3832
3833     thread_count = migrate_decompress_threads();
3834     qemu_mutex_lock(&decomp_done_lock);
3835     for (idx = 0; idx < thread_count; idx++) {
3836         while (!decomp_param[idx].done) {
3837             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3838         }
3839     }
3840     qemu_mutex_unlock(&decomp_done_lock);
3841     return qemu_file_get_error(decomp_file);
3842 }
3843
3844 static void compress_threads_load_cleanup(void)
3845 {
3846     int i, thread_count;
3847
3848     if (!migrate_use_compression()) {
3849         return;
3850     }
3851     thread_count = migrate_decompress_threads();
3852     for (i = 0; i < thread_count; i++) {
3853         /*
3854          * we use it as a indicator which shows if the thread is
3855          * properly init'd or not
3856          */
3857         if (!decomp_param[i].compbuf) {
3858             break;
3859         }
3860
3861         qemu_mutex_lock(&decomp_param[i].mutex);
3862         decomp_param[i].quit = true;
3863         qemu_cond_signal(&decomp_param[i].cond);
3864         qemu_mutex_unlock(&decomp_param[i].mutex);
3865     }
3866     for (i = 0; i < thread_count; i++) {
3867         if (!decomp_param[i].compbuf) {
3868             break;
3869         }
3870
3871         qemu_thread_join(decompress_threads + i);
3872         qemu_mutex_destroy(&decomp_param[i].mutex);
3873         qemu_cond_destroy(&decomp_param[i].cond);
3874         inflateEnd(&decomp_param[i].stream);
3875         g_free(decomp_param[i].compbuf);
3876         decomp_param[i].compbuf = NULL;
3877     }
3878     g_free(decompress_threads);
3879     g_free(decomp_param);
3880     decompress_threads = NULL;
3881     decomp_param = NULL;
3882     decomp_file = NULL;
3883 }
3884
3885 static int compress_threads_load_setup(QEMUFile *f)
3886 {
3887     int i, thread_count;
3888
3889     if (!migrate_use_compression()) {
3890         return 0;
3891     }
3892
3893     thread_count = migrate_decompress_threads();
3894     decompress_threads = g_new0(QemuThread, thread_count);
3895     decomp_param = g_new0(DecompressParam, thread_count);
3896     qemu_mutex_init(&decomp_done_lock);
3897     qemu_cond_init(&decomp_done_cond);
3898     decomp_file = f;
3899     for (i = 0; i < thread_count; i++) {
3900         if (inflateInit(&decomp_param[i].stream) != Z_OK) {
3901             goto exit;
3902         }
3903
3904         decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
3905         qemu_mutex_init(&decomp_param[i].mutex);
3906         qemu_cond_init(&decomp_param[i].cond);
3907         decomp_param[i].done = true;
3908         decomp_param[i].quit = false;
3909         qemu_thread_create(decompress_threads + i, "decompress",
3910                            do_data_decompress, decomp_param + i,
3911                            QEMU_THREAD_JOINABLE);
3912     }
3913     return 0;
3914 exit:
3915     compress_threads_load_cleanup();
3916     return -1;
3917 }
3918
3919 static void decompress_data_with_multi_threads(QEMUFile *f,
3920                                                void *host, int len)
3921 {
3922     int idx, thread_count;
3923
3924     thread_count = migrate_decompress_threads();
3925     qemu_mutex_lock(&decomp_done_lock);
3926     while (true) {
3927         for (idx = 0; idx < thread_count; idx++) {
3928             if (decomp_param[idx].done) {
3929                 decomp_param[idx].done = false;
3930                 qemu_mutex_lock(&decomp_param[idx].mutex);
3931                 qemu_get_buffer(f, decomp_param[idx].compbuf, len);
3932                 decomp_param[idx].des = host;
3933                 decomp_param[idx].len = len;
3934                 qemu_cond_signal(&decomp_param[idx].cond);
3935                 qemu_mutex_unlock(&decomp_param[idx].mutex);
3936                 break;
3937             }
3938         }
3939         if (idx < thread_count) {
3940             break;
3941         } else {
3942             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3943         }
3944     }
3945     qemu_mutex_unlock(&decomp_done_lock);
3946 }
3947
3948 /*
3949  * colo cache: this is for secondary VM, we cache the whole
3950  * memory of the secondary VM, it is need to hold the global lock
3951  * to call this helper.
3952  */
3953 int colo_init_ram_cache(void)
3954 {
3955     RAMBlock *block;
3956
3957     rcu_read_lock();
3958     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3959         block->colo_cache = qemu_anon_ram_alloc(block->used_length,
3960                                                 NULL,
3961                                                 false);
3962         if (!block->colo_cache) {
3963             error_report("%s: Can't alloc memory for COLO cache of block %s,"
3964                          "size 0x" RAM_ADDR_FMT, __func__, block->idstr,
3965                          block->used_length);
3966             goto out_locked;
3967         }
3968         memcpy(block->colo_cache, block->host, block->used_length);
3969     }
3970     rcu_read_unlock();
3971     /*
3972     * Record the dirty pages that sent by PVM, we use this dirty bitmap together
3973     * with to decide which page in cache should be flushed into SVM's RAM. Here
3974     * we use the same name 'ram_bitmap' as for migration.
3975     */
3976     if (ram_bytes_total()) {
3977         RAMBlock *block;
3978
3979         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3980             unsigned long pages = block->max_length >> TARGET_PAGE_BITS;
3981
3982             block->bmap = bitmap_new(pages);
3983             bitmap_set(block->bmap, 0, pages);
3984         }
3985     }
3986     ram_state = g_new0(RAMState, 1);
3987     ram_state->migration_dirty_pages = 0;
3988     qemu_mutex_init(&ram_state->bitmap_mutex);
3989     memory_global_dirty_log_start();
3990
3991     return 0;
3992
3993 out_locked:
3994
3995     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3996         if (block->colo_cache) {
3997             qemu_anon_ram_free(block->colo_cache, block->used_length);
3998             block->colo_cache = NULL;
3999         }
4000     }
4001
4002     rcu_read_unlock();
4003     return -errno;
4004 }
4005
4006 /* It is need to hold the global lock to call this helper */
4007 void colo_release_ram_cache(void)
4008 {
4009     RAMBlock *block;
4010
4011     memory_global_dirty_log_stop();
4012     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4013         g_free(block->bmap);
4014         block->bmap = NULL;
4015     }
4016
4017     rcu_read_lock();
4018
4019     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4020         if (block->colo_cache) {
4021             qemu_anon_ram_free(block->colo_cache, block->used_length);
4022             block->colo_cache = NULL;
4023         }
4024     }
4025
4026     rcu_read_unlock();
4027     qemu_mutex_destroy(&ram_state->bitmap_mutex);
4028     g_free(ram_state);
4029     ram_state = NULL;
4030 }
4031
4032 /**
4033  * ram_load_setup: Setup RAM for migration incoming side
4034  *
4035  * Returns zero to indicate success and negative for error
4036  *
4037  * @f: QEMUFile where to receive the data
4038  * @opaque: RAMState pointer
4039  */
4040 static int ram_load_setup(QEMUFile *f, void *opaque)
4041 {
4042     if (compress_threads_load_setup(f)) {
4043         return -1;
4044     }
4045
4046     xbzrle_load_setup();
4047     ramblock_recv_map_init();
4048
4049     return 0;
4050 }
4051
4052 static int ram_load_cleanup(void *opaque)
4053 {
4054     RAMBlock *rb;
4055
4056     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4057         if (ramblock_is_pmem(rb)) {
4058             pmem_persist(rb->host, rb->used_length);
4059         }
4060     }
4061
4062     xbzrle_load_cleanup();
4063     compress_threads_load_cleanup();
4064
4065     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4066         g_free(rb->receivedmap);
4067         rb->receivedmap = NULL;
4068     }
4069
4070     return 0;
4071 }
4072
4073 /**
4074  * ram_postcopy_incoming_init: allocate postcopy data structures
4075  *
4076  * Returns 0 for success and negative if there was one error
4077  *
4078  * @mis: current migration incoming state
4079  *
4080  * Allocate data structures etc needed by incoming migration with
4081  * postcopy-ram. postcopy-ram's similarly names
4082  * postcopy_ram_incoming_init does the work.
4083  */
4084 int ram_postcopy_incoming_init(MigrationIncomingState *mis)
4085 {
4086     return postcopy_ram_incoming_init(mis);
4087 }
4088
4089 /**
4090  * ram_load_postcopy: load a page in postcopy case
4091  *
4092  * Returns 0 for success or -errno in case of error
4093  *
4094  * Called in postcopy mode by ram_load().
4095  * rcu_read_lock is taken prior to this being called.
4096  *
4097  * @f: QEMUFile where to send the data
4098  */
4099 static int ram_load_postcopy(QEMUFile *f)
4100 {
4101     int flags = 0, ret = 0;
4102     bool place_needed = false;
4103     bool matches_target_page_size = false;
4104     MigrationIncomingState *mis = migration_incoming_get_current();
4105     /* Temporary page that is later 'placed' */
4106     void *postcopy_host_page = postcopy_get_tmp_page(mis);
4107     void *last_host = NULL;
4108     bool all_zero = false;
4109
4110     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4111         ram_addr_t addr;
4112         void *host = NULL;
4113         void *page_buffer = NULL;
4114         void *place_source = NULL;
4115         RAMBlock *block = NULL;
4116         uint8_t ch;
4117
4118         addr = qemu_get_be64(f);
4119
4120         /*
4121          * If qemu file error, we should stop here, and then "addr"
4122          * may be invalid
4123          */
4124         ret = qemu_file_get_error(f);
4125         if (ret) {
4126             break;
4127         }
4128
4129         flags = addr & ~TARGET_PAGE_MASK;
4130         addr &= TARGET_PAGE_MASK;
4131
4132         trace_ram_load_postcopy_loop((uint64_t)addr, flags);
4133         place_needed = false;
4134         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE)) {
4135             block = ram_block_from_stream(f, flags);
4136
4137             host = host_from_ram_block_offset(block, addr);
4138             if (!host) {
4139                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4140                 ret = -EINVAL;
4141                 break;
4142             }
4143             matches_target_page_size = block->page_size == TARGET_PAGE_SIZE;
4144             /*
4145              * Postcopy requires that we place whole host pages atomically;
4146              * these may be huge pages for RAMBlocks that are backed by
4147              * hugetlbfs.
4148              * To make it atomic, the data is read into a temporary page
4149              * that's moved into place later.
4150              * The migration protocol uses,  possibly smaller, target-pages
4151              * however the source ensures it always sends all the components
4152              * of a host page in order.
4153              */
4154             page_buffer = postcopy_host_page +
4155                           ((uintptr_t)host & (block->page_size - 1));
4156             /* If all TP are zero then we can optimise the place */
4157             if (!((uintptr_t)host & (block->page_size - 1))) {
4158                 all_zero = true;
4159             } else {
4160                 /* not the 1st TP within the HP */
4161                 if (host != (last_host + TARGET_PAGE_SIZE)) {
4162                     error_report("Non-sequential target page %p/%p",
4163                                   host, last_host);
4164                     ret = -EINVAL;
4165                     break;
4166                 }
4167             }
4168
4169
4170             /*
4171              * If it's the last part of a host page then we place the host
4172              * page
4173              */
4174             place_needed = (((uintptr_t)host + TARGET_PAGE_SIZE) &
4175                                      (block->page_size - 1)) == 0;
4176             place_source = postcopy_host_page;
4177         }
4178         last_host = host;
4179
4180         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4181         case RAM_SAVE_FLAG_ZERO:
4182             ch = qemu_get_byte(f);
4183             memset(page_buffer, ch, TARGET_PAGE_SIZE);
4184             if (ch) {
4185                 all_zero = false;
4186             }
4187             break;
4188
4189         case RAM_SAVE_FLAG_PAGE:
4190             all_zero = false;
4191             if (!matches_target_page_size) {
4192                 /* For huge pages, we always use temporary buffer */
4193                 qemu_get_buffer(f, page_buffer, TARGET_PAGE_SIZE);
4194             } else {
4195                 /*
4196                  * For small pages that matches target page size, we
4197                  * avoid the qemu_file copy.  Instead we directly use
4198                  * the buffer of QEMUFile to place the page.  Note: we
4199                  * cannot do any QEMUFile operation before using that
4200                  * buffer to make sure the buffer is valid when
4201                  * placing the page.
4202                  */
4203                 qemu_get_buffer_in_place(f, (uint8_t **)&place_source,
4204                                          TARGET_PAGE_SIZE);
4205             }
4206             break;
4207         case RAM_SAVE_FLAG_EOS:
4208             /* normal exit */
4209             multifd_recv_sync_main();
4210             break;
4211         default:
4212             error_report("Unknown combination of migration flags: %#x"
4213                          " (postcopy mode)", flags);
4214             ret = -EINVAL;
4215             break;
4216         }
4217
4218         /* Detect for any possible file errors */
4219         if (!ret && qemu_file_get_error(f)) {
4220             ret = qemu_file_get_error(f);
4221         }
4222
4223         if (!ret && place_needed) {
4224             /* This gets called at the last target page in the host page */
4225             void *place_dest = host + TARGET_PAGE_SIZE - block->page_size;
4226
4227             if (all_zero) {
4228                 ret = postcopy_place_page_zero(mis, place_dest,
4229                                                block);
4230             } else {
4231                 ret = postcopy_place_page(mis, place_dest,
4232                                           place_source, block);
4233             }
4234         }
4235     }
4236
4237     return ret;
4238 }
4239
4240 static bool postcopy_is_advised(void)
4241 {
4242     PostcopyState ps = postcopy_state_get();
4243     return ps >= POSTCOPY_INCOMING_ADVISE && ps < POSTCOPY_INCOMING_END;
4244 }
4245
4246 static bool postcopy_is_running(void)
4247 {
4248     PostcopyState ps = postcopy_state_get();
4249     return ps >= POSTCOPY_INCOMING_LISTENING && ps < POSTCOPY_INCOMING_END;
4250 }
4251
4252 /*
4253  * Flush content of RAM cache into SVM's memory.
4254  * Only flush the pages that be dirtied by PVM or SVM or both.
4255  */
4256 static void colo_flush_ram_cache(void)
4257 {
4258     RAMBlock *block = NULL;
4259     void *dst_host;
4260     void *src_host;
4261     unsigned long offset = 0;
4262
4263     memory_global_dirty_log_sync();
4264     rcu_read_lock();
4265     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4266         migration_bitmap_sync_range(ram_state, block);
4267     }
4268     rcu_read_unlock();
4269
4270     trace_colo_flush_ram_cache_begin(ram_state->migration_dirty_pages);
4271     rcu_read_lock();
4272     block = QLIST_FIRST_RCU(&ram_list.blocks);
4273
4274     while (block) {
4275         offset = migration_bitmap_find_dirty(ram_state, block, offset);
4276
4277         if (offset << TARGET_PAGE_BITS >= block->used_length) {
4278             offset = 0;
4279             block = QLIST_NEXT_RCU(block, next);
4280         } else {
4281             migration_bitmap_clear_dirty(ram_state, block, offset);
4282             dst_host = block->host + (offset << TARGET_PAGE_BITS);
4283             src_host = block->colo_cache + (offset << TARGET_PAGE_BITS);
4284             memcpy(dst_host, src_host, TARGET_PAGE_SIZE);
4285         }
4286     }
4287
4288     rcu_read_unlock();
4289     trace_colo_flush_ram_cache_end();
4290 }
4291
4292 /**
4293  * ram_load_precopy: load pages in precopy case
4294  *
4295  * Returns 0 for success or -errno in case of error
4296  *
4297  * Called in precopy mode by ram_load().
4298  * rcu_read_lock is taken prior to this being called.
4299  *
4300  * @f: QEMUFile where to send the data
4301  */
4302 static int ram_load_precopy(QEMUFile *f)
4303 {
4304     int flags = 0, ret = 0, invalid_flags = 0, len = 0;
4305     /* ADVISE is earlier, it shows the source has the postcopy capability on */
4306     bool postcopy_advised = postcopy_is_advised();
4307     if (!migrate_use_compression()) {
4308         invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
4309     }
4310
4311     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4312         ram_addr_t addr, total_ram_bytes;
4313         void *host = NULL;
4314         uint8_t ch;
4315
4316         addr = qemu_get_be64(f);
4317         flags = addr & ~TARGET_PAGE_MASK;
4318         addr &= TARGET_PAGE_MASK;
4319
4320         if (flags & invalid_flags) {
4321             if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
4322                 error_report("Received an unexpected compressed page");
4323             }
4324
4325             ret = -EINVAL;
4326             break;
4327         }
4328
4329         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
4330                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
4331             RAMBlock *block = ram_block_from_stream(f, flags);
4332
4333             /*
4334              * After going into COLO, we should load the Page into colo_cache.
4335              */
4336             if (migration_incoming_in_colo_state()) {
4337                 host = colo_cache_from_block_offset(block, addr);
4338             } else {
4339                 host = host_from_ram_block_offset(block, addr);
4340             }
4341             if (!host) {
4342                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4343                 ret = -EINVAL;
4344                 break;
4345             }
4346
4347             if (!migration_incoming_in_colo_state()) {
4348                 ramblock_recv_bitmap_set(block, host);
4349             }
4350
4351             trace_ram_load_loop(block->idstr, (uint64_t)addr, flags, host);
4352         }
4353
4354         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4355         case RAM_SAVE_FLAG_MEM_SIZE:
4356             /* Synchronize RAM block list */
4357             total_ram_bytes = addr;
4358             while (!ret && total_ram_bytes) {
4359                 RAMBlock *block;
4360                 char id[256];
4361                 ram_addr_t length;
4362
4363                 len = qemu_get_byte(f);
4364                 qemu_get_buffer(f, (uint8_t *)id, len);
4365                 id[len] = 0;
4366                 length = qemu_get_be64(f);
4367
4368                 block = qemu_ram_block_by_name(id);
4369                 if (block && !qemu_ram_is_migratable(block)) {
4370                     error_report("block %s should not be migrated !", id);
4371                     ret = -EINVAL;
4372                 } else if (block) {
4373                     if (length != block->used_length) {
4374                         Error *local_err = NULL;
4375
4376                         ret = qemu_ram_resize(block, length,
4377                                               &local_err);
4378                         if (local_err) {
4379                             error_report_err(local_err);
4380                         }
4381                     }
4382                     /* For postcopy we need to check hugepage sizes match */
4383                     if (postcopy_advised &&
4384                         block->page_size != qemu_host_page_size) {
4385                         uint64_t remote_page_size = qemu_get_be64(f);
4386                         if (remote_page_size != block->page_size) {
4387                             error_report("Mismatched RAM page size %s "
4388                                          "(local) %zd != %" PRId64,
4389                                          id, block->page_size,
4390                                          remote_page_size);
4391                             ret = -EINVAL;
4392                         }
4393                     }
4394                     if (migrate_ignore_shared()) {
4395                         hwaddr addr = qemu_get_be64(f);
4396                         if (ramblock_is_ignored(block) &&
4397                             block->mr->addr != addr) {
4398                             error_report("Mismatched GPAs for block %s "
4399                                          "%" PRId64 "!= %" PRId64,
4400                                          id, (uint64_t)addr,
4401                                          (uint64_t)block->mr->addr);
4402                             ret = -EINVAL;
4403                         }
4404                     }
4405                     ram_control_load_hook(f, RAM_CONTROL_BLOCK_REG,
4406                                           block->idstr);
4407                 } else {
4408                     error_report("Unknown ramblock \"%s\", cannot "
4409                                  "accept migration", id);
4410                     ret = -EINVAL;
4411                 }
4412
4413                 total_ram_bytes -= length;
4414             }
4415             break;
4416
4417         case RAM_SAVE_FLAG_ZERO:
4418             ch = qemu_get_byte(f);
4419             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
4420             break;
4421
4422         case RAM_SAVE_FLAG_PAGE:
4423             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
4424             break;
4425
4426         case RAM_SAVE_FLAG_COMPRESS_PAGE:
4427             len = qemu_get_be32(f);
4428             if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
4429                 error_report("Invalid compressed data length: %d", len);
4430                 ret = -EINVAL;
4431                 break;
4432             }
4433             decompress_data_with_multi_threads(f, host, len);
4434             break;
4435
4436         case RAM_SAVE_FLAG_XBZRLE:
4437             if (load_xbzrle(f, addr, host) < 0) {
4438                 error_report("Failed to decompress XBZRLE page at "
4439                              RAM_ADDR_FMT, addr);
4440                 ret = -EINVAL;
4441                 break;
4442             }
4443             break;
4444         case RAM_SAVE_FLAG_EOS:
4445             /* normal exit */
4446             multifd_recv_sync_main();
4447             break;
4448         default:
4449             if (flags & RAM_SAVE_FLAG_HOOK) {
4450                 ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
4451             } else {
4452                 error_report("Unknown combination of migration flags: %#x",
4453                              flags);
4454                 ret = -EINVAL;
4455             }
4456         }
4457         if (!ret) {
4458             ret = qemu_file_get_error(f);
4459         }
4460     }
4461
4462     return ret;
4463 }
4464
4465 static int ram_load(QEMUFile *f, void *opaque, int version_id)
4466 {
4467     int ret = 0;
4468     static uint64_t seq_iter;
4469     /*
4470      * If system is running in postcopy mode, page inserts to host memory must
4471      * be atomic
4472      */
4473     bool postcopy_running = postcopy_is_running();
4474
4475     seq_iter++;
4476
4477     if (version_id != 4) {
4478         return -EINVAL;
4479     }
4480
4481     /*
4482      * This RCU critical section can be very long running.
4483      * When RCU reclaims in the code start to become numerous,
4484      * it will be necessary to reduce the granularity of this
4485      * critical section.
4486      */
4487     rcu_read_lock();
4488
4489     if (postcopy_running) {
4490         ret = ram_load_postcopy(f);
4491     } else {
4492         ret = ram_load_precopy(f);
4493     }
4494
4495     ret |= wait_for_decompress_done();
4496     rcu_read_unlock();
4497     trace_ram_load_complete(ret, seq_iter);
4498
4499     if (!ret  && migration_incoming_in_colo_state()) {
4500         colo_flush_ram_cache();
4501     }
4502     return ret;
4503 }
4504
4505 static bool ram_has_postcopy(void *opaque)
4506 {
4507     RAMBlock *rb;
4508     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4509         if (ramblock_is_pmem(rb)) {
4510             info_report("Block: %s, host: %p is a nvdimm memory, postcopy"
4511                          "is not supported now!", rb->idstr, rb->host);
4512             return false;
4513         }
4514     }
4515
4516     return migrate_postcopy_ram();
4517 }
4518
4519 /* Sync all the dirty bitmap with destination VM.  */
4520 static int ram_dirty_bitmap_sync_all(MigrationState *s, RAMState *rs)
4521 {
4522     RAMBlock *block;
4523     QEMUFile *file = s->to_dst_file;
4524     int ramblock_count = 0;
4525
4526     trace_ram_dirty_bitmap_sync_start();
4527
4528     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4529         qemu_savevm_send_recv_bitmap(file, block->idstr);
4530         trace_ram_dirty_bitmap_request(block->idstr);
4531         ramblock_count++;
4532     }
4533
4534     trace_ram_dirty_bitmap_sync_wait();
4535
4536     /* Wait until all the ramblocks' dirty bitmap synced */
4537     while (ramblock_count--) {
4538         qemu_sem_wait(&s->rp_state.rp_sem);
4539     }
4540
4541     trace_ram_dirty_bitmap_sync_complete();
4542
4543     return 0;
4544 }
4545
4546 static void ram_dirty_bitmap_reload_notify(MigrationState *s)
4547 {
4548     qemu_sem_post(&s->rp_state.rp_sem);
4549 }
4550
4551 /*
4552  * Read the received bitmap, revert it as the initial dirty bitmap.
4553  * This is only used when the postcopy migration is paused but wants
4554  * to resume from a middle point.
4555  */
4556 int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *block)
4557 {
4558     int ret = -EINVAL;
4559     QEMUFile *file = s->rp_state.from_dst_file;
4560     unsigned long *le_bitmap, nbits = block->used_length >> TARGET_PAGE_BITS;
4561     uint64_t local_size = DIV_ROUND_UP(nbits, 8);
4562     uint64_t size, end_mark;
4563
4564     trace_ram_dirty_bitmap_reload_begin(block->idstr);
4565
4566     if (s->state != MIGRATION_STATUS_POSTCOPY_RECOVER) {
4567         error_report("%s: incorrect state %s", __func__,
4568                      MigrationStatus_str(s->state));
4569         return -EINVAL;
4570     }
4571
4572     /*
4573      * Note: see comments in ramblock_recv_bitmap_send() on why we
4574      * need the endianess convertion, and the paddings.
4575      */
4576     local_size = ROUND_UP(local_size, 8);
4577
4578     /* Add paddings */
4579     le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
4580
4581     size = qemu_get_be64(file);
4582
4583     /* The size of the bitmap should match with our ramblock */
4584     if (size != local_size) {
4585         error_report("%s: ramblock '%s' bitmap size mismatch "
4586                      "(0x%"PRIx64" != 0x%"PRIx64")", __func__,
4587                      block->idstr, size, local_size);
4588         ret = -EINVAL;
4589         goto out;
4590     }
4591
4592     size = qemu_get_buffer(file, (uint8_t *)le_bitmap, local_size);
4593     end_mark = qemu_get_be64(file);
4594
4595     ret = qemu_file_get_error(file);
4596     if (ret || size != local_size) {
4597         error_report("%s: read bitmap failed for ramblock '%s': %d"
4598                      " (size 0x%"PRIx64", got: 0x%"PRIx64")",
4599                      __func__, block->idstr, ret, local_size, size);
4600         ret = -EIO;
4601         goto out;
4602     }
4603
4604     if (end_mark != RAMBLOCK_RECV_BITMAP_ENDING) {
4605         error_report("%s: ramblock '%s' end mark incorrect: 0x%"PRIu64,
4606                      __func__, block->idstr, end_mark);
4607         ret = -EINVAL;
4608         goto out;
4609     }
4610
4611     /*
4612      * Endianess convertion. We are during postcopy (though paused).
4613      * The dirty bitmap won't change. We can directly modify it.
4614      */
4615     bitmap_from_le(block->bmap, le_bitmap, nbits);
4616
4617     /*
4618      * What we received is "received bitmap". Revert it as the initial
4619      * dirty bitmap for this ramblock.
4620      */
4621     bitmap_complement(block->bmap, block->bmap, nbits);
4622
4623     trace_ram_dirty_bitmap_reload_complete(block->idstr);
4624
4625     /*
4626      * We succeeded to sync bitmap for current ramblock. If this is
4627      * the last one to sync, we need to notify the main send thread.
4628      */
4629     ram_dirty_bitmap_reload_notify(s);
4630
4631     ret = 0;
4632 out:
4633     g_free(le_bitmap);
4634     return ret;
4635 }
4636
4637 static int ram_resume_prepare(MigrationState *s, void *opaque)
4638 {
4639     RAMState *rs = *(RAMState **)opaque;
4640     int ret;
4641
4642     ret = ram_dirty_bitmap_sync_all(s, rs);
4643     if (ret) {
4644         return ret;
4645     }
4646
4647     ram_state_resume_prepare(rs, s->to_dst_file);
4648
4649     return 0;
4650 }
4651
4652 static SaveVMHandlers savevm_ram_handlers = {
4653     .save_setup = ram_save_setup,
4654     .save_live_iterate = ram_save_iterate,
4655     .save_live_complete_postcopy = ram_save_complete,
4656     .save_live_complete_precopy = ram_save_complete,
4657     .has_postcopy = ram_has_postcopy,
4658     .save_live_pending = ram_save_pending,
4659     .load_state = ram_load,
4660     .save_cleanup = ram_save_cleanup,
4661     .load_setup = ram_load_setup,
4662     .load_cleanup = ram_load_cleanup,
4663     .resume_prepare = ram_resume_prepare,
4664 };
4665
4666 void ram_mig_init(void)
4667 {
4668     qemu_mutex_init(&XBZRLE.lock);
4669     register_savevm_live(NULL, "ram", 0, 4, &savevm_ram_handlers, &ram_state);
4670 }
This page took 0.270865 seconds and 4 git commands to generate.