]> Git Repo - qemu.git/blob - migration/rdma.c
Merge remote-tracking branch 'kraxel/tags/pull-ui-20170519-1' into staging
[qemu.git] / migration / rdma.c
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  * Copyright Red Hat, Inc. 2015-2016
6  *
7  * Authors:
8  *  Michael R. Hines <[email protected]>
9  *  Jiuxing Liu <[email protected]>
10  *  Daniel P. Berrange <[email protected]>
11  *
12  * This work is licensed under the terms of the GNU GPL, version 2 or
13  * later.  See the COPYING file in the top-level directory.
14  *
15  */
16 #include "qemu/osdep.h"
17 #include "qapi/error.h"
18 #include "qemu-common.h"
19 #include "qemu/cutils.h"
20 #include "migration/migration.h"
21 #include "migration/qemu-file.h"
22 #include "exec/cpu-common.h"
23 #include "qemu-file-channel.h"
24 #include "qemu/error-report.h"
25 #include "qemu/main-loop.h"
26 #include "qemu/sockets.h"
27 #include "qemu/bitmap.h"
28 #include "qemu/coroutine.h"
29 #include <sys/socket.h>
30 #include <netdb.h>
31 #include <arpa/inet.h>
32 #include <rdma/rdma_cma.h>
33 #include "trace.h"
34
35 /*
36  * Print and error on both the Monitor and the Log file.
37  */
38 #define ERROR(errp, fmt, ...) \
39     do { \
40         fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
41         if (errp && (*(errp) == NULL)) { \
42             error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
43         } \
44     } while (0)
45
46 #define RDMA_RESOLVE_TIMEOUT_MS 10000
47
48 /* Do not merge data if larger than this. */
49 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
50 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
51
52 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
53
54 /*
55  * This is only for non-live state being migrated.
56  * Instead of RDMA_WRITE messages, we use RDMA_SEND
57  * messages for that state, which requires a different
58  * delivery design than main memory.
59  */
60 #define RDMA_SEND_INCREMENT 32768
61
62 /*
63  * Maximum size infiniband SEND message
64  */
65 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
66 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
67
68 #define RDMA_CONTROL_VERSION_CURRENT 1
69 /*
70  * Capabilities for negotiation.
71  */
72 #define RDMA_CAPABILITY_PIN_ALL 0x01
73
74 /*
75  * Add the other flags above to this list of known capabilities
76  * as they are introduced.
77  */
78 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
79
80 #define CHECK_ERROR_STATE() \
81     do { \
82         if (rdma->error_state) { \
83             if (!rdma->error_reported) { \
84                 error_report("RDMA is in an error state waiting migration" \
85                                 " to abort!"); \
86                 rdma->error_reported = 1; \
87             } \
88             return rdma->error_state; \
89         } \
90     } while (0);
91
92 /*
93  * A work request ID is 64-bits and we split up these bits
94  * into 3 parts:
95  *
96  * bits 0-15 : type of control message, 2^16
97  * bits 16-29: ram block index, 2^14
98  * bits 30-63: ram block chunk number, 2^34
99  *
100  * The last two bit ranges are only used for RDMA writes,
101  * in order to track their completion and potentially
102  * also track unregistration status of the message.
103  */
104 #define RDMA_WRID_TYPE_SHIFT  0UL
105 #define RDMA_WRID_BLOCK_SHIFT 16UL
106 #define RDMA_WRID_CHUNK_SHIFT 30UL
107
108 #define RDMA_WRID_TYPE_MASK \
109     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
110
111 #define RDMA_WRID_BLOCK_MASK \
112     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
113
114 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
115
116 /*
117  * RDMA migration protocol:
118  * 1. RDMA Writes (data messages, i.e. RAM)
119  * 2. IB Send/Recv (control channel messages)
120  */
121 enum {
122     RDMA_WRID_NONE = 0,
123     RDMA_WRID_RDMA_WRITE = 1,
124     RDMA_WRID_SEND_CONTROL = 2000,
125     RDMA_WRID_RECV_CONTROL = 4000,
126 };
127
128 static const char *wrid_desc[] = {
129     [RDMA_WRID_NONE] = "NONE",
130     [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
131     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
132     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
133 };
134
135 /*
136  * Work request IDs for IB SEND messages only (not RDMA writes).
137  * This is used by the migration protocol to transmit
138  * control messages (such as device state and registration commands)
139  *
140  * We could use more WRs, but we have enough for now.
141  */
142 enum {
143     RDMA_WRID_READY = 0,
144     RDMA_WRID_DATA,
145     RDMA_WRID_CONTROL,
146     RDMA_WRID_MAX,
147 };
148
149 /*
150  * SEND/RECV IB Control Messages.
151  */
152 enum {
153     RDMA_CONTROL_NONE = 0,
154     RDMA_CONTROL_ERROR,
155     RDMA_CONTROL_READY,               /* ready to receive */
156     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
157     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
158     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
159     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
160     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
161     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
162     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
163     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
164     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
165 };
166
167 static const char *control_desc[] = {
168     [RDMA_CONTROL_NONE] = "NONE",
169     [RDMA_CONTROL_ERROR] = "ERROR",
170     [RDMA_CONTROL_READY] = "READY",
171     [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
172     [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
173     [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
174     [RDMA_CONTROL_COMPRESS] = "COMPRESS",
175     [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
176     [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
177     [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
178     [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
179     [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
180 };
181
182 /*
183  * Memory and MR structures used to represent an IB Send/Recv work request.
184  * This is *not* used for RDMA writes, only IB Send/Recv.
185  */
186 typedef struct {
187     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
188     struct   ibv_mr *control_mr;               /* registration metadata */
189     size_t   control_len;                      /* length of the message */
190     uint8_t *control_curr;                     /* start of unconsumed bytes */
191 } RDMAWorkRequestData;
192
193 /*
194  * Negotiate RDMA capabilities during connection-setup time.
195  */
196 typedef struct {
197     uint32_t version;
198     uint32_t flags;
199 } RDMACapabilities;
200
201 static void caps_to_network(RDMACapabilities *cap)
202 {
203     cap->version = htonl(cap->version);
204     cap->flags = htonl(cap->flags);
205 }
206
207 static void network_to_caps(RDMACapabilities *cap)
208 {
209     cap->version = ntohl(cap->version);
210     cap->flags = ntohl(cap->flags);
211 }
212
213 /*
214  * Representation of a RAMBlock from an RDMA perspective.
215  * This is not transmitted, only local.
216  * This and subsequent structures cannot be linked lists
217  * because we're using a single IB message to transmit
218  * the information. It's small anyway, so a list is overkill.
219  */
220 typedef struct RDMALocalBlock {
221     char          *block_name;
222     uint8_t       *local_host_addr; /* local virtual address */
223     uint64_t       remote_host_addr; /* remote virtual address */
224     uint64_t       offset;
225     uint64_t       length;
226     struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
227     struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
228     uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
229     uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
230     int            index;           /* which block are we */
231     unsigned int   src_index;       /* (Only used on dest) */
232     bool           is_ram_block;
233     int            nb_chunks;
234     unsigned long *transit_bitmap;
235     unsigned long *unregister_bitmap;
236 } RDMALocalBlock;
237
238 /*
239  * Also represents a RAMblock, but only on the dest.
240  * This gets transmitted by the dest during connection-time
241  * to the source VM and then is used to populate the
242  * corresponding RDMALocalBlock with
243  * the information needed to perform the actual RDMA.
244  */
245 typedef struct QEMU_PACKED RDMADestBlock {
246     uint64_t remote_host_addr;
247     uint64_t offset;
248     uint64_t length;
249     uint32_t remote_rkey;
250     uint32_t padding;
251 } RDMADestBlock;
252
253 static uint64_t htonll(uint64_t v)
254 {
255     union { uint32_t lv[2]; uint64_t llv; } u;
256     u.lv[0] = htonl(v >> 32);
257     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
258     return u.llv;
259 }
260
261 static uint64_t ntohll(uint64_t v) {
262     union { uint32_t lv[2]; uint64_t llv; } u;
263     u.llv = v;
264     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
265 }
266
267 static void dest_block_to_network(RDMADestBlock *db)
268 {
269     db->remote_host_addr = htonll(db->remote_host_addr);
270     db->offset = htonll(db->offset);
271     db->length = htonll(db->length);
272     db->remote_rkey = htonl(db->remote_rkey);
273 }
274
275 static void network_to_dest_block(RDMADestBlock *db)
276 {
277     db->remote_host_addr = ntohll(db->remote_host_addr);
278     db->offset = ntohll(db->offset);
279     db->length = ntohll(db->length);
280     db->remote_rkey = ntohl(db->remote_rkey);
281 }
282
283 /*
284  * Virtual address of the above structures used for transmitting
285  * the RAMBlock descriptions at connection-time.
286  * This structure is *not* transmitted.
287  */
288 typedef struct RDMALocalBlocks {
289     int nb_blocks;
290     bool     init;             /* main memory init complete */
291     RDMALocalBlock *block;
292 } RDMALocalBlocks;
293
294 /*
295  * Main data structure for RDMA state.
296  * While there is only one copy of this structure being allocated right now,
297  * this is the place where one would start if you wanted to consider
298  * having more than one RDMA connection open at the same time.
299  */
300 typedef struct RDMAContext {
301     char *host;
302     int port;
303
304     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
305
306     /*
307      * This is used by *_exchange_send() to figure out whether or not
308      * the initial "READY" message has already been received or not.
309      * This is because other functions may potentially poll() and detect
310      * the READY message before send() does, in which case we need to
311      * know if it completed.
312      */
313     int control_ready_expected;
314
315     /* number of outstanding writes */
316     int nb_sent;
317
318     /* store info about current buffer so that we can
319        merge it with future sends */
320     uint64_t current_addr;
321     uint64_t current_length;
322     /* index of ram block the current buffer belongs to */
323     int current_index;
324     /* index of the chunk in the current ram block */
325     int current_chunk;
326
327     bool pin_all;
328
329     /*
330      * infiniband-specific variables for opening the device
331      * and maintaining connection state and so forth.
332      *
333      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
334      * cm_id->verbs, cm_id->channel, and cm_id->qp.
335      */
336     struct rdma_cm_id *cm_id;               /* connection manager ID */
337     struct rdma_cm_id *listen_id;
338     bool connected;
339
340     struct ibv_context          *verbs;
341     struct rdma_event_channel   *channel;
342     struct ibv_qp *qp;                      /* queue pair */
343     struct ibv_comp_channel *comp_channel;  /* completion channel */
344     struct ibv_pd *pd;                      /* protection domain */
345     struct ibv_cq *cq;                      /* completion queue */
346
347     /*
348      * If a previous write failed (perhaps because of a failed
349      * memory registration, then do not attempt any future work
350      * and remember the error state.
351      */
352     int error_state;
353     int error_reported;
354     int received_error;
355
356     /*
357      * Description of ram blocks used throughout the code.
358      */
359     RDMALocalBlocks local_ram_blocks;
360     RDMADestBlock  *dest_blocks;
361
362     /* Index of the next RAMBlock received during block registration */
363     unsigned int    next_src_index;
364
365     /*
366      * Migration on *destination* started.
367      * Then use coroutine yield function.
368      * Source runs in a thread, so we don't care.
369      */
370     int migration_started_on_destination;
371
372     int total_registrations;
373     int total_writes;
374
375     int unregister_current, unregister_next;
376     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
377
378     GHashTable *blockmap;
379 } RDMAContext;
380
381 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
382 #define QIO_CHANNEL_RDMA(obj)                                     \
383     OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
384
385 typedef struct QIOChannelRDMA QIOChannelRDMA;
386
387
388 struct QIOChannelRDMA {
389     QIOChannel parent;
390     RDMAContext *rdma;
391     QEMUFile *file;
392     size_t len;
393     bool blocking; /* XXX we don't actually honour this yet */
394 };
395
396 /*
397  * Main structure for IB Send/Recv control messages.
398  * This gets prepended at the beginning of every Send/Recv.
399  */
400 typedef struct QEMU_PACKED {
401     uint32_t len;     /* Total length of data portion */
402     uint32_t type;    /* which control command to perform */
403     uint32_t repeat;  /* number of commands in data portion of same type */
404     uint32_t padding;
405 } RDMAControlHeader;
406
407 static void control_to_network(RDMAControlHeader *control)
408 {
409     control->type = htonl(control->type);
410     control->len = htonl(control->len);
411     control->repeat = htonl(control->repeat);
412 }
413
414 static void network_to_control(RDMAControlHeader *control)
415 {
416     control->type = ntohl(control->type);
417     control->len = ntohl(control->len);
418     control->repeat = ntohl(control->repeat);
419 }
420
421 /*
422  * Register a single Chunk.
423  * Information sent by the source VM to inform the dest
424  * to register an single chunk of memory before we can perform
425  * the actual RDMA operation.
426  */
427 typedef struct QEMU_PACKED {
428     union QEMU_PACKED {
429         uint64_t current_addr;  /* offset into the ram_addr_t space */
430         uint64_t chunk;         /* chunk to lookup if unregistering */
431     } key;
432     uint32_t current_index; /* which ramblock the chunk belongs to */
433     uint32_t padding;
434     uint64_t chunks;            /* how many sequential chunks to register */
435 } RDMARegister;
436
437 static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
438 {
439     RDMALocalBlock *local_block;
440     local_block  = &rdma->local_ram_blocks.block[reg->current_index];
441
442     if (local_block->is_ram_block) {
443         /*
444          * current_addr as passed in is an address in the local ram_addr_t
445          * space, we need to translate this for the destination
446          */
447         reg->key.current_addr -= local_block->offset;
448         reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
449     }
450     reg->key.current_addr = htonll(reg->key.current_addr);
451     reg->current_index = htonl(reg->current_index);
452     reg->chunks = htonll(reg->chunks);
453 }
454
455 static void network_to_register(RDMARegister *reg)
456 {
457     reg->key.current_addr = ntohll(reg->key.current_addr);
458     reg->current_index = ntohl(reg->current_index);
459     reg->chunks = ntohll(reg->chunks);
460 }
461
462 typedef struct QEMU_PACKED {
463     uint32_t value;     /* if zero, we will madvise() */
464     uint32_t block_idx; /* which ram block index */
465     uint64_t offset;    /* Address in remote ram_addr_t space */
466     uint64_t length;    /* length of the chunk */
467 } RDMACompress;
468
469 static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
470 {
471     comp->value = htonl(comp->value);
472     /*
473      * comp->offset as passed in is an address in the local ram_addr_t
474      * space, we need to translate this for the destination
475      */
476     comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
477     comp->offset += rdma->dest_blocks[comp->block_idx].offset;
478     comp->block_idx = htonl(comp->block_idx);
479     comp->offset = htonll(comp->offset);
480     comp->length = htonll(comp->length);
481 }
482
483 static void network_to_compress(RDMACompress *comp)
484 {
485     comp->value = ntohl(comp->value);
486     comp->block_idx = ntohl(comp->block_idx);
487     comp->offset = ntohll(comp->offset);
488     comp->length = ntohll(comp->length);
489 }
490
491 /*
492  * The result of the dest's memory registration produces an "rkey"
493  * which the source VM must reference in order to perform
494  * the RDMA operation.
495  */
496 typedef struct QEMU_PACKED {
497     uint32_t rkey;
498     uint32_t padding;
499     uint64_t host_addr;
500 } RDMARegisterResult;
501
502 static void result_to_network(RDMARegisterResult *result)
503 {
504     result->rkey = htonl(result->rkey);
505     result->host_addr = htonll(result->host_addr);
506 };
507
508 static void network_to_result(RDMARegisterResult *result)
509 {
510     result->rkey = ntohl(result->rkey);
511     result->host_addr = ntohll(result->host_addr);
512 };
513
514 const char *print_wrid(int wrid);
515 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
516                                    uint8_t *data, RDMAControlHeader *resp,
517                                    int *resp_idx,
518                                    int (*callback)(RDMAContext *rdma));
519
520 static inline uint64_t ram_chunk_index(const uint8_t *start,
521                                        const uint8_t *host)
522 {
523     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
524 }
525
526 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
527                                        uint64_t i)
528 {
529     return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
530                                   (i << RDMA_REG_CHUNK_SHIFT));
531 }
532
533 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
534                                      uint64_t i)
535 {
536     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
537                                          (1UL << RDMA_REG_CHUNK_SHIFT);
538
539     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
540         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
541     }
542
543     return result;
544 }
545
546 static int rdma_add_block(RDMAContext *rdma, const char *block_name,
547                          void *host_addr,
548                          ram_addr_t block_offset, uint64_t length)
549 {
550     RDMALocalBlocks *local = &rdma->local_ram_blocks;
551     RDMALocalBlock *block;
552     RDMALocalBlock *old = local->block;
553
554     local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
555
556     if (local->nb_blocks) {
557         int x;
558
559         if (rdma->blockmap) {
560             for (x = 0; x < local->nb_blocks; x++) {
561                 g_hash_table_remove(rdma->blockmap,
562                                     (void *)(uintptr_t)old[x].offset);
563                 g_hash_table_insert(rdma->blockmap,
564                                     (void *)(uintptr_t)old[x].offset,
565                                     &local->block[x]);
566             }
567         }
568         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
569         g_free(old);
570     }
571
572     block = &local->block[local->nb_blocks];
573
574     block->block_name = g_strdup(block_name);
575     block->local_host_addr = host_addr;
576     block->offset = block_offset;
577     block->length = length;
578     block->index = local->nb_blocks;
579     block->src_index = ~0U; /* Filled in by the receipt of the block list */
580     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
581     block->transit_bitmap = bitmap_new(block->nb_chunks);
582     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
583     block->unregister_bitmap = bitmap_new(block->nb_chunks);
584     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
585     block->remote_keys = g_new0(uint32_t, block->nb_chunks);
586
587     block->is_ram_block = local->init ? false : true;
588
589     if (rdma->blockmap) {
590         g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
591     }
592
593     trace_rdma_add_block(block_name, local->nb_blocks,
594                          (uintptr_t) block->local_host_addr,
595                          block->offset, block->length,
596                          (uintptr_t) (block->local_host_addr + block->length),
597                          BITS_TO_LONGS(block->nb_chunks) *
598                              sizeof(unsigned long) * 8,
599                          block->nb_chunks);
600
601     local->nb_blocks++;
602
603     return 0;
604 }
605
606 /*
607  * Memory regions need to be registered with the device and queue pairs setup
608  * in advanced before the migration starts. This tells us where the RAM blocks
609  * are so that we can register them individually.
610  */
611 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr,
612     ram_addr_t block_offset, ram_addr_t length, void *opaque)
613 {
614     return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
615 }
616
617 /*
618  * Identify the RAMBlocks and their quantity. They will be references to
619  * identify chunk boundaries inside each RAMBlock and also be referenced
620  * during dynamic page registration.
621  */
622 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
623 {
624     RDMALocalBlocks *local = &rdma->local_ram_blocks;
625
626     assert(rdma->blockmap == NULL);
627     memset(local, 0, sizeof *local);
628     qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
629     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
630     rdma->dest_blocks = g_new0(RDMADestBlock,
631                                rdma->local_ram_blocks.nb_blocks);
632     local->init = true;
633     return 0;
634 }
635
636 /*
637  * Note: If used outside of cleanup, the caller must ensure that the destination
638  * block structures are also updated
639  */
640 static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
641 {
642     RDMALocalBlocks *local = &rdma->local_ram_blocks;
643     RDMALocalBlock *old = local->block;
644     int x;
645
646     if (rdma->blockmap) {
647         g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
648     }
649     if (block->pmr) {
650         int j;
651
652         for (j = 0; j < block->nb_chunks; j++) {
653             if (!block->pmr[j]) {
654                 continue;
655             }
656             ibv_dereg_mr(block->pmr[j]);
657             rdma->total_registrations--;
658         }
659         g_free(block->pmr);
660         block->pmr = NULL;
661     }
662
663     if (block->mr) {
664         ibv_dereg_mr(block->mr);
665         rdma->total_registrations--;
666         block->mr = NULL;
667     }
668
669     g_free(block->transit_bitmap);
670     block->transit_bitmap = NULL;
671
672     g_free(block->unregister_bitmap);
673     block->unregister_bitmap = NULL;
674
675     g_free(block->remote_keys);
676     block->remote_keys = NULL;
677
678     g_free(block->block_name);
679     block->block_name = NULL;
680
681     if (rdma->blockmap) {
682         for (x = 0; x < local->nb_blocks; x++) {
683             g_hash_table_remove(rdma->blockmap,
684                                 (void *)(uintptr_t)old[x].offset);
685         }
686     }
687
688     if (local->nb_blocks > 1) {
689
690         local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
691
692         if (block->index) {
693             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
694         }
695
696         if (block->index < (local->nb_blocks - 1)) {
697             memcpy(local->block + block->index, old + (block->index + 1),
698                 sizeof(RDMALocalBlock) *
699                     (local->nb_blocks - (block->index + 1)));
700         }
701     } else {
702         assert(block == local->block);
703         local->block = NULL;
704     }
705
706     trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
707                            block->offset, block->length,
708                             (uintptr_t)(block->local_host_addr + block->length),
709                            BITS_TO_LONGS(block->nb_chunks) *
710                                sizeof(unsigned long) * 8, block->nb_chunks);
711
712     g_free(old);
713
714     local->nb_blocks--;
715
716     if (local->nb_blocks && rdma->blockmap) {
717         for (x = 0; x < local->nb_blocks; x++) {
718             g_hash_table_insert(rdma->blockmap,
719                                 (void *)(uintptr_t)local->block[x].offset,
720                                 &local->block[x]);
721         }
722     }
723
724     return 0;
725 }
726
727 /*
728  * Put in the log file which RDMA device was opened and the details
729  * associated with that device.
730  */
731 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
732 {
733     struct ibv_port_attr port;
734
735     if (ibv_query_port(verbs, 1, &port)) {
736         error_report("Failed to query port information");
737         return;
738     }
739
740     printf("%s RDMA Device opened: kernel name %s "
741            "uverbs device name %s, "
742            "infiniband_verbs class device path %s, "
743            "infiniband class device path %s, "
744            "transport: (%d) %s\n",
745                 who,
746                 verbs->device->name,
747                 verbs->device->dev_name,
748                 verbs->device->dev_path,
749                 verbs->device->ibdev_path,
750                 port.link_layer,
751                 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
752                  ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
753                     ? "Ethernet" : "Unknown"));
754 }
755
756 /*
757  * Put in the log file the RDMA gid addressing information,
758  * useful for folks who have trouble understanding the
759  * RDMA device hierarchy in the kernel.
760  */
761 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
762 {
763     char sgid[33];
764     char dgid[33];
765     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
766     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
767     trace_qemu_rdma_dump_gid(who, sgid, dgid);
768 }
769
770 /*
771  * As of now, IPv6 over RoCE / iWARP is not supported by linux.
772  * We will try the next addrinfo struct, and fail if there are
773  * no other valid addresses to bind against.
774  *
775  * If user is listening on '[::]', then we will not have a opened a device
776  * yet and have no way of verifying if the device is RoCE or not.
777  *
778  * In this case, the source VM will throw an error for ALL types of
779  * connections (both IPv4 and IPv6) if the destination machine does not have
780  * a regular infiniband network available for use.
781  *
782  * The only way to guarantee that an error is thrown for broken kernels is
783  * for the management software to choose a *specific* interface at bind time
784  * and validate what time of hardware it is.
785  *
786  * Unfortunately, this puts the user in a fix:
787  *
788  *  If the source VM connects with an IPv4 address without knowing that the
789  *  destination has bound to '[::]' the migration will unconditionally fail
790  *  unless the management software is explicitly listening on the IPv4
791  *  address while using a RoCE-based device.
792  *
793  *  If the source VM connects with an IPv6 address, then we're OK because we can
794  *  throw an error on the source (and similarly on the destination).
795  *
796  *  But in mixed environments, this will be broken for a while until it is fixed
797  *  inside linux.
798  *
799  * We do provide a *tiny* bit of help in this function: We can list all of the
800  * devices in the system and check to see if all the devices are RoCE or
801  * Infiniband.
802  *
803  * If we detect that we have a *pure* RoCE environment, then we can safely
804  * thrown an error even if the management software has specified '[::]' as the
805  * bind address.
806  *
807  * However, if there is are multiple hetergeneous devices, then we cannot make
808  * this assumption and the user just has to be sure they know what they are
809  * doing.
810  *
811  * Patches are being reviewed on linux-rdma.
812  */
813 static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
814 {
815     struct ibv_port_attr port_attr;
816
817     /* This bug only exists in linux, to our knowledge. */
818 #ifdef CONFIG_LINUX
819
820     /*
821      * Verbs are only NULL if management has bound to '[::]'.
822      *
823      * Let's iterate through all the devices and see if there any pure IB
824      * devices (non-ethernet).
825      *
826      * If not, then we can safely proceed with the migration.
827      * Otherwise, there are no guarantees until the bug is fixed in linux.
828      */
829     if (!verbs) {
830         int num_devices, x;
831         struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
832         bool roce_found = false;
833         bool ib_found = false;
834
835         for (x = 0; x < num_devices; x++) {
836             verbs = ibv_open_device(dev_list[x]);
837             if (!verbs) {
838                 if (errno == EPERM) {
839                     continue;
840                 } else {
841                     return -EINVAL;
842                 }
843             }
844
845             if (ibv_query_port(verbs, 1, &port_attr)) {
846                 ibv_close_device(verbs);
847                 ERROR(errp, "Could not query initial IB port");
848                 return -EINVAL;
849             }
850
851             if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
852                 ib_found = true;
853             } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
854                 roce_found = true;
855             }
856
857             ibv_close_device(verbs);
858
859         }
860
861         if (roce_found) {
862             if (ib_found) {
863                 fprintf(stderr, "WARN: migrations may fail:"
864                                 " IPv6 over RoCE / iWARP in linux"
865                                 " is broken. But since you appear to have a"
866                                 " mixed RoCE / IB environment, be sure to only"
867                                 " migrate over the IB fabric until the kernel "
868                                 " fixes the bug.\n");
869             } else {
870                 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
871                             " and your management software has specified '[::]'"
872                             ", but IPv6 over RoCE / iWARP is not supported in Linux.");
873                 return -ENONET;
874             }
875         }
876
877         return 0;
878     }
879
880     /*
881      * If we have a verbs context, that means that some other than '[::]' was
882      * used by the management software for binding. In which case we can
883      * actually warn the user about a potentially broken kernel.
884      */
885
886     /* IB ports start with 1, not 0 */
887     if (ibv_query_port(verbs, 1, &port_attr)) {
888         ERROR(errp, "Could not query initial IB port");
889         return -EINVAL;
890     }
891
892     if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
893         ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
894                     "(but patches on linux-rdma in progress)");
895         return -ENONET;
896     }
897
898 #endif
899
900     return 0;
901 }
902
903 /*
904  * Figure out which RDMA device corresponds to the requested IP hostname
905  * Also create the initial connection manager identifiers for opening
906  * the connection.
907  */
908 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
909 {
910     int ret;
911     struct rdma_addrinfo *res;
912     char port_str[16];
913     struct rdma_cm_event *cm_event;
914     char ip[40] = "unknown";
915     struct rdma_addrinfo *e;
916
917     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
918         ERROR(errp, "RDMA hostname has not been set");
919         return -EINVAL;
920     }
921
922     /* create CM channel */
923     rdma->channel = rdma_create_event_channel();
924     if (!rdma->channel) {
925         ERROR(errp, "could not create CM channel");
926         return -EINVAL;
927     }
928
929     /* create CM id */
930     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
931     if (ret) {
932         ERROR(errp, "could not create channel id");
933         goto err_resolve_create_id;
934     }
935
936     snprintf(port_str, 16, "%d", rdma->port);
937     port_str[15] = '\0';
938
939     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
940     if (ret < 0) {
941         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
942         goto err_resolve_get_addr;
943     }
944
945     for (e = res; e != NULL; e = e->ai_next) {
946         inet_ntop(e->ai_family,
947             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
948         trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
949
950         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
951                 RDMA_RESOLVE_TIMEOUT_MS);
952         if (!ret) {
953             if (e->ai_family == AF_INET6) {
954                 ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
955                 if (ret) {
956                     continue;
957                 }
958             }
959             goto route;
960         }
961     }
962
963     ERROR(errp, "could not resolve address %s", rdma->host);
964     goto err_resolve_get_addr;
965
966 route:
967     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
968
969     ret = rdma_get_cm_event(rdma->channel, &cm_event);
970     if (ret) {
971         ERROR(errp, "could not perform event_addr_resolved");
972         goto err_resolve_get_addr;
973     }
974
975     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
976         ERROR(errp, "result not equal to event_addr_resolved %s",
977                 rdma_event_str(cm_event->event));
978         perror("rdma_resolve_addr");
979         rdma_ack_cm_event(cm_event);
980         ret = -EINVAL;
981         goto err_resolve_get_addr;
982     }
983     rdma_ack_cm_event(cm_event);
984
985     /* resolve route */
986     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
987     if (ret) {
988         ERROR(errp, "could not resolve rdma route");
989         goto err_resolve_get_addr;
990     }
991
992     ret = rdma_get_cm_event(rdma->channel, &cm_event);
993     if (ret) {
994         ERROR(errp, "could not perform event_route_resolved");
995         goto err_resolve_get_addr;
996     }
997     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
998         ERROR(errp, "result not equal to event_route_resolved: %s",
999                         rdma_event_str(cm_event->event));
1000         rdma_ack_cm_event(cm_event);
1001         ret = -EINVAL;
1002         goto err_resolve_get_addr;
1003     }
1004     rdma_ack_cm_event(cm_event);
1005     rdma->verbs = rdma->cm_id->verbs;
1006     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
1007     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
1008     return 0;
1009
1010 err_resolve_get_addr:
1011     rdma_destroy_id(rdma->cm_id);
1012     rdma->cm_id = NULL;
1013 err_resolve_create_id:
1014     rdma_destroy_event_channel(rdma->channel);
1015     rdma->channel = NULL;
1016     return ret;
1017 }
1018
1019 /*
1020  * Create protection domain and completion queues
1021  */
1022 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
1023 {
1024     /* allocate pd */
1025     rdma->pd = ibv_alloc_pd(rdma->verbs);
1026     if (!rdma->pd) {
1027         error_report("failed to allocate protection domain");
1028         return -1;
1029     }
1030
1031     /* create completion channel */
1032     rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
1033     if (!rdma->comp_channel) {
1034         error_report("failed to allocate completion channel");
1035         goto err_alloc_pd_cq;
1036     }
1037
1038     /*
1039      * Completion queue can be filled by both read and write work requests,
1040      * so must reflect the sum of both possible queue sizes.
1041      */
1042     rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
1043             NULL, rdma->comp_channel, 0);
1044     if (!rdma->cq) {
1045         error_report("failed to allocate completion queue");
1046         goto err_alloc_pd_cq;
1047     }
1048
1049     return 0;
1050
1051 err_alloc_pd_cq:
1052     if (rdma->pd) {
1053         ibv_dealloc_pd(rdma->pd);
1054     }
1055     if (rdma->comp_channel) {
1056         ibv_destroy_comp_channel(rdma->comp_channel);
1057     }
1058     rdma->pd = NULL;
1059     rdma->comp_channel = NULL;
1060     return -1;
1061
1062 }
1063
1064 /*
1065  * Create queue pairs.
1066  */
1067 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1068 {
1069     struct ibv_qp_init_attr attr = { 0 };
1070     int ret;
1071
1072     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1073     attr.cap.max_recv_wr = 3;
1074     attr.cap.max_send_sge = 1;
1075     attr.cap.max_recv_sge = 1;
1076     attr.send_cq = rdma->cq;
1077     attr.recv_cq = rdma->cq;
1078     attr.qp_type = IBV_QPT_RC;
1079
1080     ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1081     if (ret) {
1082         return -1;
1083     }
1084
1085     rdma->qp = rdma->cm_id->qp;
1086     return 0;
1087 }
1088
1089 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1090 {
1091     int i;
1092     RDMALocalBlocks *local = &rdma->local_ram_blocks;
1093
1094     for (i = 0; i < local->nb_blocks; i++) {
1095         local->block[i].mr =
1096             ibv_reg_mr(rdma->pd,
1097                     local->block[i].local_host_addr,
1098                     local->block[i].length,
1099                     IBV_ACCESS_LOCAL_WRITE |
1100                     IBV_ACCESS_REMOTE_WRITE
1101                     );
1102         if (!local->block[i].mr) {
1103             perror("Failed to register local dest ram block!\n");
1104             break;
1105         }
1106         rdma->total_registrations++;
1107     }
1108
1109     if (i >= local->nb_blocks) {
1110         return 0;
1111     }
1112
1113     for (i--; i >= 0; i--) {
1114         ibv_dereg_mr(local->block[i].mr);
1115         rdma->total_registrations--;
1116     }
1117
1118     return -1;
1119
1120 }
1121
1122 /*
1123  * Find the ram block that corresponds to the page requested to be
1124  * transmitted by QEMU.
1125  *
1126  * Once the block is found, also identify which 'chunk' within that
1127  * block that the page belongs to.
1128  *
1129  * This search cannot fail or the migration will fail.
1130  */
1131 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1132                                       uintptr_t block_offset,
1133                                       uint64_t offset,
1134                                       uint64_t length,
1135                                       uint64_t *block_index,
1136                                       uint64_t *chunk_index)
1137 {
1138     uint64_t current_addr = block_offset + offset;
1139     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1140                                                 (void *) block_offset);
1141     assert(block);
1142     assert(current_addr >= block->offset);
1143     assert((current_addr + length) <= (block->offset + block->length));
1144
1145     *block_index = block->index;
1146     *chunk_index = ram_chunk_index(block->local_host_addr,
1147                 block->local_host_addr + (current_addr - block->offset));
1148
1149     return 0;
1150 }
1151
1152 /*
1153  * Register a chunk with IB. If the chunk was already registered
1154  * previously, then skip.
1155  *
1156  * Also return the keys associated with the registration needed
1157  * to perform the actual RDMA operation.
1158  */
1159 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1160         RDMALocalBlock *block, uintptr_t host_addr,
1161         uint32_t *lkey, uint32_t *rkey, int chunk,
1162         uint8_t *chunk_start, uint8_t *chunk_end)
1163 {
1164     if (block->mr) {
1165         if (lkey) {
1166             *lkey = block->mr->lkey;
1167         }
1168         if (rkey) {
1169             *rkey = block->mr->rkey;
1170         }
1171         return 0;
1172     }
1173
1174     /* allocate memory to store chunk MRs */
1175     if (!block->pmr) {
1176         block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
1177     }
1178
1179     /*
1180      * If 'rkey', then we're the destination, so grant access to the source.
1181      *
1182      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1183      */
1184     if (!block->pmr[chunk]) {
1185         uint64_t len = chunk_end - chunk_start;
1186
1187         trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1188
1189         block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1190                 chunk_start, len,
1191                 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1192                         IBV_ACCESS_REMOTE_WRITE) : 0));
1193
1194         if (!block->pmr[chunk]) {
1195             perror("Failed to register chunk!");
1196             fprintf(stderr, "Chunk details: block: %d chunk index %d"
1197                             " start %" PRIuPTR " end %" PRIuPTR
1198                             " host %" PRIuPTR
1199                             " local %" PRIuPTR " registrations: %d\n",
1200                             block->index, chunk, (uintptr_t)chunk_start,
1201                             (uintptr_t)chunk_end, host_addr,
1202                             (uintptr_t)block->local_host_addr,
1203                             rdma->total_registrations);
1204             return -1;
1205         }
1206         rdma->total_registrations++;
1207     }
1208
1209     if (lkey) {
1210         *lkey = block->pmr[chunk]->lkey;
1211     }
1212     if (rkey) {
1213         *rkey = block->pmr[chunk]->rkey;
1214     }
1215     return 0;
1216 }
1217
1218 /*
1219  * Register (at connection time) the memory used for control
1220  * channel messages.
1221  */
1222 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1223 {
1224     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1225             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1226             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1227     if (rdma->wr_data[idx].control_mr) {
1228         rdma->total_registrations++;
1229         return 0;
1230     }
1231     error_report("qemu_rdma_reg_control failed");
1232     return -1;
1233 }
1234
1235 const char *print_wrid(int wrid)
1236 {
1237     if (wrid >= RDMA_WRID_RECV_CONTROL) {
1238         return wrid_desc[RDMA_WRID_RECV_CONTROL];
1239     }
1240     return wrid_desc[wrid];
1241 }
1242
1243 /*
1244  * RDMA requires memory registration (mlock/pinning), but this is not good for
1245  * overcommitment.
1246  *
1247  * In preparation for the future where LRU information or workload-specific
1248  * writable writable working set memory access behavior is available to QEMU
1249  * it would be nice to have in place the ability to UN-register/UN-pin
1250  * particular memory regions from the RDMA hardware when it is determine that
1251  * those regions of memory will likely not be accessed again in the near future.
1252  *
1253  * While we do not yet have such information right now, the following
1254  * compile-time option allows us to perform a non-optimized version of this
1255  * behavior.
1256  *
1257  * By uncommenting this option, you will cause *all* RDMA transfers to be
1258  * unregistered immediately after the transfer completes on both sides of the
1259  * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1260  *
1261  * This will have a terrible impact on migration performance, so until future
1262  * workload information or LRU information is available, do not attempt to use
1263  * this feature except for basic testing.
1264  */
1265 //#define RDMA_UNREGISTRATION_EXAMPLE
1266
1267 /*
1268  * Perform a non-optimized memory unregistration after every transfer
1269  * for demonstration purposes, only if pin-all is not requested.
1270  *
1271  * Potential optimizations:
1272  * 1. Start a new thread to run this function continuously
1273         - for bit clearing
1274         - and for receipt of unregister messages
1275  * 2. Use an LRU.
1276  * 3. Use workload hints.
1277  */
1278 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1279 {
1280     while (rdma->unregistrations[rdma->unregister_current]) {
1281         int ret;
1282         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1283         uint64_t chunk =
1284             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1285         uint64_t index =
1286             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1287         RDMALocalBlock *block =
1288             &(rdma->local_ram_blocks.block[index]);
1289         RDMARegister reg = { .current_index = index };
1290         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1291                                  };
1292         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1293                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1294                                    .repeat = 1,
1295                                  };
1296
1297         trace_qemu_rdma_unregister_waiting_proc(chunk,
1298                                                 rdma->unregister_current);
1299
1300         rdma->unregistrations[rdma->unregister_current] = 0;
1301         rdma->unregister_current++;
1302
1303         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1304             rdma->unregister_current = 0;
1305         }
1306
1307
1308         /*
1309          * Unregistration is speculative (because migration is single-threaded
1310          * and we cannot break the protocol's inifinband message ordering).
1311          * Thus, if the memory is currently being used for transmission,
1312          * then abort the attempt to unregister and try again
1313          * later the next time a completion is received for this memory.
1314          */
1315         clear_bit(chunk, block->unregister_bitmap);
1316
1317         if (test_bit(chunk, block->transit_bitmap)) {
1318             trace_qemu_rdma_unregister_waiting_inflight(chunk);
1319             continue;
1320         }
1321
1322         trace_qemu_rdma_unregister_waiting_send(chunk);
1323
1324         ret = ibv_dereg_mr(block->pmr[chunk]);
1325         block->pmr[chunk] = NULL;
1326         block->remote_keys[chunk] = 0;
1327
1328         if (ret != 0) {
1329             perror("unregistration chunk failed");
1330             return -ret;
1331         }
1332         rdma->total_registrations--;
1333
1334         reg.key.chunk = chunk;
1335         register_to_network(rdma, &reg);
1336         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1337                                 &resp, NULL, NULL);
1338         if (ret < 0) {
1339             return ret;
1340         }
1341
1342         trace_qemu_rdma_unregister_waiting_complete(chunk);
1343     }
1344
1345     return 0;
1346 }
1347
1348 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1349                                          uint64_t chunk)
1350 {
1351     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1352
1353     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1354     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1355
1356     return result;
1357 }
1358
1359 /*
1360  * Set bit for unregistration in the next iteration.
1361  * We cannot transmit right here, but will unpin later.
1362  */
1363 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1364                                         uint64_t chunk, uint64_t wr_id)
1365 {
1366     if (rdma->unregistrations[rdma->unregister_next] != 0) {
1367         error_report("rdma migration: queue is full");
1368     } else {
1369         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1370
1371         if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1372             trace_qemu_rdma_signal_unregister_append(chunk,
1373                                                      rdma->unregister_next);
1374
1375             rdma->unregistrations[rdma->unregister_next++] =
1376                     qemu_rdma_make_wrid(wr_id, index, chunk);
1377
1378             if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1379                 rdma->unregister_next = 0;
1380             }
1381         } else {
1382             trace_qemu_rdma_signal_unregister_already(chunk);
1383         }
1384     }
1385 }
1386
1387 /*
1388  * Consult the connection manager to see a work request
1389  * (of any kind) has completed.
1390  * Return the work request ID that completed.
1391  */
1392 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1393                                uint32_t *byte_len)
1394 {
1395     int ret;
1396     struct ibv_wc wc;
1397     uint64_t wr_id;
1398
1399     ret = ibv_poll_cq(rdma->cq, 1, &wc);
1400
1401     if (!ret) {
1402         *wr_id_out = RDMA_WRID_NONE;
1403         return 0;
1404     }
1405
1406     if (ret < 0) {
1407         error_report("ibv_poll_cq return %d", ret);
1408         return ret;
1409     }
1410
1411     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1412
1413     if (wc.status != IBV_WC_SUCCESS) {
1414         fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1415                         wc.status, ibv_wc_status_str(wc.status));
1416         fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1417
1418         return -1;
1419     }
1420
1421     if (rdma->control_ready_expected &&
1422         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1423         trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1424                   wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1425         rdma->control_ready_expected = 0;
1426     }
1427
1428     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1429         uint64_t chunk =
1430             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1431         uint64_t index =
1432             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1433         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1434
1435         trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1436                                    index, chunk, block->local_host_addr,
1437                                    (void *)(uintptr_t)block->remote_host_addr);
1438
1439         clear_bit(chunk, block->transit_bitmap);
1440
1441         if (rdma->nb_sent > 0) {
1442             rdma->nb_sent--;
1443         }
1444
1445         if (!rdma->pin_all) {
1446             /*
1447              * FYI: If one wanted to signal a specific chunk to be unregistered
1448              * using LRU or workload-specific information, this is the function
1449              * you would call to do so. That chunk would then get asynchronously
1450              * unregistered later.
1451              */
1452 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1453             qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1454 #endif
1455         }
1456     } else {
1457         trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1458     }
1459
1460     *wr_id_out = wc.wr_id;
1461     if (byte_len) {
1462         *byte_len = wc.byte_len;
1463     }
1464
1465     return  0;
1466 }
1467
1468 /*
1469  * Block until the next work request has completed.
1470  *
1471  * First poll to see if a work request has already completed,
1472  * otherwise block.
1473  *
1474  * If we encounter completed work requests for IDs other than
1475  * the one we're interested in, then that's generally an error.
1476  *
1477  * The only exception is actual RDMA Write completions. These
1478  * completions only need to be recorded, but do not actually
1479  * need further processing.
1480  */
1481 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1482                                     uint32_t *byte_len)
1483 {
1484     int num_cq_events = 0, ret = 0;
1485     struct ibv_cq *cq;
1486     void *cq_ctx;
1487     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1488
1489     if (ibv_req_notify_cq(rdma->cq, 0)) {
1490         return -1;
1491     }
1492     /* poll cq first */
1493     while (wr_id != wrid_requested) {
1494         ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1495         if (ret < 0) {
1496             return ret;
1497         }
1498
1499         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1500
1501         if (wr_id == RDMA_WRID_NONE) {
1502             break;
1503         }
1504         if (wr_id != wrid_requested) {
1505             trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1506                        wrid_requested, print_wrid(wr_id), wr_id);
1507         }
1508     }
1509
1510     if (wr_id == wrid_requested) {
1511         return 0;
1512     }
1513
1514     while (1) {
1515         /*
1516          * Coroutine doesn't start until migration_fd_process_incoming()
1517          * so don't yield unless we know we're running inside of a coroutine.
1518          */
1519         if (rdma->migration_started_on_destination) {
1520             yield_until_fd_readable(rdma->comp_channel->fd);
1521         }
1522
1523         if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
1524             perror("ibv_get_cq_event");
1525             goto err_block_for_wrid;
1526         }
1527
1528         num_cq_events++;
1529
1530         if (ibv_req_notify_cq(cq, 0)) {
1531             goto err_block_for_wrid;
1532         }
1533
1534         while (wr_id != wrid_requested) {
1535             ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1536             if (ret < 0) {
1537                 goto err_block_for_wrid;
1538             }
1539
1540             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1541
1542             if (wr_id == RDMA_WRID_NONE) {
1543                 break;
1544             }
1545             if (wr_id != wrid_requested) {
1546                 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1547                                    wrid_requested, print_wrid(wr_id), wr_id);
1548             }
1549         }
1550
1551         if (wr_id == wrid_requested) {
1552             goto success_block_for_wrid;
1553         }
1554     }
1555
1556 success_block_for_wrid:
1557     if (num_cq_events) {
1558         ibv_ack_cq_events(cq, num_cq_events);
1559     }
1560     return 0;
1561
1562 err_block_for_wrid:
1563     if (num_cq_events) {
1564         ibv_ack_cq_events(cq, num_cq_events);
1565     }
1566     return ret;
1567 }
1568
1569 /*
1570  * Post a SEND message work request for the control channel
1571  * containing some data and block until the post completes.
1572  */
1573 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1574                                        RDMAControlHeader *head)
1575 {
1576     int ret = 0;
1577     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1578     struct ibv_send_wr *bad_wr;
1579     struct ibv_sge sge = {
1580                            .addr = (uintptr_t)(wr->control),
1581                            .length = head->len + sizeof(RDMAControlHeader),
1582                            .lkey = wr->control_mr->lkey,
1583                          };
1584     struct ibv_send_wr send_wr = {
1585                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1586                                    .opcode = IBV_WR_SEND,
1587                                    .send_flags = IBV_SEND_SIGNALED,
1588                                    .sg_list = &sge,
1589                                    .num_sge = 1,
1590                                 };
1591
1592     trace_qemu_rdma_post_send_control(control_desc[head->type]);
1593
1594     /*
1595      * We don't actually need to do a memcpy() in here if we used
1596      * the "sge" properly, but since we're only sending control messages
1597      * (not RAM in a performance-critical path), then its OK for now.
1598      *
1599      * The copy makes the RDMAControlHeader simpler to manipulate
1600      * for the time being.
1601      */
1602     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1603     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1604     control_to_network((void *) wr->control);
1605
1606     if (buf) {
1607         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1608     }
1609
1610
1611     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1612
1613     if (ret > 0) {
1614         error_report("Failed to use post IB SEND for control");
1615         return -ret;
1616     }
1617
1618     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1619     if (ret < 0) {
1620         error_report("rdma migration: send polling control error");
1621     }
1622
1623     return ret;
1624 }
1625
1626 /*
1627  * Post a RECV work request in anticipation of some future receipt
1628  * of data on the control channel.
1629  */
1630 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1631 {
1632     struct ibv_recv_wr *bad_wr;
1633     struct ibv_sge sge = {
1634                             .addr = (uintptr_t)(rdma->wr_data[idx].control),
1635                             .length = RDMA_CONTROL_MAX_BUFFER,
1636                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1637                          };
1638
1639     struct ibv_recv_wr recv_wr = {
1640                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1641                                     .sg_list = &sge,
1642                                     .num_sge = 1,
1643                                  };
1644
1645
1646     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1647         return -1;
1648     }
1649
1650     return 0;
1651 }
1652
1653 /*
1654  * Block and wait for a RECV control channel message to arrive.
1655  */
1656 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1657                 RDMAControlHeader *head, int expecting, int idx)
1658 {
1659     uint32_t byte_len;
1660     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1661                                        &byte_len);
1662
1663     if (ret < 0) {
1664         error_report("rdma migration: recv polling control error!");
1665         return ret;
1666     }
1667
1668     network_to_control((void *) rdma->wr_data[idx].control);
1669     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1670
1671     trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
1672
1673     if (expecting == RDMA_CONTROL_NONE) {
1674         trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
1675                                              head->type);
1676     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1677         error_report("Was expecting a %s (%d) control message"
1678                 ", but got: %s (%d), length: %d",
1679                 control_desc[expecting], expecting,
1680                 control_desc[head->type], head->type, head->len);
1681         if (head->type == RDMA_CONTROL_ERROR) {
1682             rdma->received_error = true;
1683         }
1684         return -EIO;
1685     }
1686     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1687         error_report("too long length: %d", head->len);
1688         return -EINVAL;
1689     }
1690     if (sizeof(*head) + head->len != byte_len) {
1691         error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1692         return -EINVAL;
1693     }
1694
1695     return 0;
1696 }
1697
1698 /*
1699  * When a RECV work request has completed, the work request's
1700  * buffer is pointed at the header.
1701  *
1702  * This will advance the pointer to the data portion
1703  * of the control message of the work request's buffer that
1704  * was populated after the work request finished.
1705  */
1706 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1707                                   RDMAControlHeader *head)
1708 {
1709     rdma->wr_data[idx].control_len = head->len;
1710     rdma->wr_data[idx].control_curr =
1711         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1712 }
1713
1714 /*
1715  * This is an 'atomic' high-level operation to deliver a single, unified
1716  * control-channel message.
1717  *
1718  * Additionally, if the user is expecting some kind of reply to this message,
1719  * they can request a 'resp' response message be filled in by posting an
1720  * additional work request on behalf of the user and waiting for an additional
1721  * completion.
1722  *
1723  * The extra (optional) response is used during registration to us from having
1724  * to perform an *additional* exchange of message just to provide a response by
1725  * instead piggy-backing on the acknowledgement.
1726  */
1727 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1728                                    uint8_t *data, RDMAControlHeader *resp,
1729                                    int *resp_idx,
1730                                    int (*callback)(RDMAContext *rdma))
1731 {
1732     int ret = 0;
1733
1734     /*
1735      * Wait until the dest is ready before attempting to deliver the message
1736      * by waiting for a READY message.
1737      */
1738     if (rdma->control_ready_expected) {
1739         RDMAControlHeader resp;
1740         ret = qemu_rdma_exchange_get_response(rdma,
1741                                     &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1742         if (ret < 0) {
1743             return ret;
1744         }
1745     }
1746
1747     /*
1748      * If the user is expecting a response, post a WR in anticipation of it.
1749      */
1750     if (resp) {
1751         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1752         if (ret) {
1753             error_report("rdma migration: error posting"
1754                     " extra control recv for anticipated result!");
1755             return ret;
1756         }
1757     }
1758
1759     /*
1760      * Post a WR to replace the one we just consumed for the READY message.
1761      */
1762     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1763     if (ret) {
1764         error_report("rdma migration: error posting first control recv!");
1765         return ret;
1766     }
1767
1768     /*
1769      * Deliver the control message that was requested.
1770      */
1771     ret = qemu_rdma_post_send_control(rdma, data, head);
1772
1773     if (ret < 0) {
1774         error_report("Failed to send control buffer!");
1775         return ret;
1776     }
1777
1778     /*
1779      * If we're expecting a response, block and wait for it.
1780      */
1781     if (resp) {
1782         if (callback) {
1783             trace_qemu_rdma_exchange_send_issue_callback();
1784             ret = callback(rdma);
1785             if (ret < 0) {
1786                 return ret;
1787             }
1788         }
1789
1790         trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
1791         ret = qemu_rdma_exchange_get_response(rdma, resp,
1792                                               resp->type, RDMA_WRID_DATA);
1793
1794         if (ret < 0) {
1795             return ret;
1796         }
1797
1798         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1799         if (resp_idx) {
1800             *resp_idx = RDMA_WRID_DATA;
1801         }
1802         trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
1803     }
1804
1805     rdma->control_ready_expected = 1;
1806
1807     return 0;
1808 }
1809
1810 /*
1811  * This is an 'atomic' high-level operation to receive a single, unified
1812  * control-channel message.
1813  */
1814 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1815                                 int expecting)
1816 {
1817     RDMAControlHeader ready = {
1818                                 .len = 0,
1819                                 .type = RDMA_CONTROL_READY,
1820                                 .repeat = 1,
1821                               };
1822     int ret;
1823
1824     /*
1825      * Inform the source that we're ready to receive a message.
1826      */
1827     ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1828
1829     if (ret < 0) {
1830         error_report("Failed to send control buffer!");
1831         return ret;
1832     }
1833
1834     /*
1835      * Block and wait for the message.
1836      */
1837     ret = qemu_rdma_exchange_get_response(rdma, head,
1838                                           expecting, RDMA_WRID_READY);
1839
1840     if (ret < 0) {
1841         return ret;
1842     }
1843
1844     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1845
1846     /*
1847      * Post a new RECV work request to replace the one we just consumed.
1848      */
1849     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1850     if (ret) {
1851         error_report("rdma migration: error posting second control recv!");
1852         return ret;
1853     }
1854
1855     return 0;
1856 }
1857
1858 /*
1859  * Write an actual chunk of memory using RDMA.
1860  *
1861  * If we're using dynamic registration on the dest-side, we have to
1862  * send a registration command first.
1863  */
1864 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1865                                int current_index, uint64_t current_addr,
1866                                uint64_t length)
1867 {
1868     struct ibv_sge sge;
1869     struct ibv_send_wr send_wr = { 0 };
1870     struct ibv_send_wr *bad_wr;
1871     int reg_result_idx, ret, count = 0;
1872     uint64_t chunk, chunks;
1873     uint8_t *chunk_start, *chunk_end;
1874     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1875     RDMARegister reg;
1876     RDMARegisterResult *reg_result;
1877     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1878     RDMAControlHeader head = { .len = sizeof(RDMARegister),
1879                                .type = RDMA_CONTROL_REGISTER_REQUEST,
1880                                .repeat = 1,
1881                              };
1882
1883 retry:
1884     sge.addr = (uintptr_t)(block->local_host_addr +
1885                             (current_addr - block->offset));
1886     sge.length = length;
1887
1888     chunk = ram_chunk_index(block->local_host_addr,
1889                             (uint8_t *)(uintptr_t)sge.addr);
1890     chunk_start = ram_chunk_start(block, chunk);
1891
1892     if (block->is_ram_block) {
1893         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1894
1895         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1896             chunks--;
1897         }
1898     } else {
1899         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1900
1901         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1902             chunks--;
1903         }
1904     }
1905
1906     trace_qemu_rdma_write_one_top(chunks + 1,
1907                                   (chunks + 1) *
1908                                   (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1909
1910     chunk_end = ram_chunk_end(block, chunk + chunks);
1911
1912     if (!rdma->pin_all) {
1913 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1914         qemu_rdma_unregister_waiting(rdma);
1915 #endif
1916     }
1917
1918     while (test_bit(chunk, block->transit_bitmap)) {
1919         (void)count;
1920         trace_qemu_rdma_write_one_block(count++, current_index, chunk,
1921                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
1922
1923         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1924
1925         if (ret < 0) {
1926             error_report("Failed to Wait for previous write to complete "
1927                     "block %d chunk %" PRIu64
1928                     " current %" PRIu64 " len %" PRIu64 " %d",
1929                     current_index, chunk, sge.addr, length, rdma->nb_sent);
1930             return ret;
1931         }
1932     }
1933
1934     if (!rdma->pin_all || !block->is_ram_block) {
1935         if (!block->remote_keys[chunk]) {
1936             /*
1937              * This chunk has not yet been registered, so first check to see
1938              * if the entire chunk is zero. If so, tell the other size to
1939              * memset() + madvise() the entire chunk without RDMA.
1940              */
1941
1942             if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
1943                 RDMACompress comp = {
1944                                         .offset = current_addr,
1945                                         .value = 0,
1946                                         .block_idx = current_index,
1947                                         .length = length,
1948                                     };
1949
1950                 head.len = sizeof(comp);
1951                 head.type = RDMA_CONTROL_COMPRESS;
1952
1953                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
1954                                                current_index, current_addr);
1955
1956                 compress_to_network(rdma, &comp);
1957                 ret = qemu_rdma_exchange_send(rdma, &head,
1958                                 (uint8_t *) &comp, NULL, NULL, NULL);
1959
1960                 if (ret < 0) {
1961                     return -EIO;
1962                 }
1963
1964                 acct_update_position(f, sge.length, true);
1965
1966                 return 1;
1967             }
1968
1969             /*
1970              * Otherwise, tell other side to register.
1971              */
1972             reg.current_index = current_index;
1973             if (block->is_ram_block) {
1974                 reg.key.current_addr = current_addr;
1975             } else {
1976                 reg.key.chunk = chunk;
1977             }
1978             reg.chunks = chunks;
1979
1980             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
1981                                               current_addr);
1982
1983             register_to_network(rdma, &reg);
1984             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1985                                     &resp, &reg_result_idx, NULL);
1986             if (ret < 0) {
1987                 return ret;
1988             }
1989
1990             /* try to overlap this single registration with the one we sent. */
1991             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1992                                                 &sge.lkey, NULL, chunk,
1993                                                 chunk_start, chunk_end)) {
1994                 error_report("cannot get lkey");
1995                 return -EINVAL;
1996             }
1997
1998             reg_result = (RDMARegisterResult *)
1999                     rdma->wr_data[reg_result_idx].control_curr;
2000
2001             network_to_result(reg_result);
2002
2003             trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
2004                                                  reg_result->rkey, chunk);
2005
2006             block->remote_keys[chunk] = reg_result->rkey;
2007             block->remote_host_addr = reg_result->host_addr;
2008         } else {
2009             /* already registered before */
2010             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2011                                                 &sge.lkey, NULL, chunk,
2012                                                 chunk_start, chunk_end)) {
2013                 error_report("cannot get lkey!");
2014                 return -EINVAL;
2015             }
2016         }
2017
2018         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
2019     } else {
2020         send_wr.wr.rdma.rkey = block->remote_rkey;
2021
2022         if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
2023                                                      &sge.lkey, NULL, chunk,
2024                                                      chunk_start, chunk_end)) {
2025             error_report("cannot get lkey!");
2026             return -EINVAL;
2027         }
2028     }
2029
2030     /*
2031      * Encode the ram block index and chunk within this wrid.
2032      * We will use this information at the time of completion
2033      * to figure out which bitmap to check against and then which
2034      * chunk in the bitmap to look for.
2035      */
2036     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
2037                                         current_index, chunk);
2038
2039     send_wr.opcode = IBV_WR_RDMA_WRITE;
2040     send_wr.send_flags = IBV_SEND_SIGNALED;
2041     send_wr.sg_list = &sge;
2042     send_wr.num_sge = 1;
2043     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2044                                 (current_addr - block->offset);
2045
2046     trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2047                                    sge.length);
2048
2049     /*
2050      * ibv_post_send() does not return negative error numbers,
2051      * per the specification they are positive - no idea why.
2052      */
2053     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2054
2055     if (ret == ENOMEM) {
2056         trace_qemu_rdma_write_one_queue_full();
2057         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2058         if (ret < 0) {
2059             error_report("rdma migration: failed to make "
2060                          "room in full send queue! %d", ret);
2061             return ret;
2062         }
2063
2064         goto retry;
2065
2066     } else if (ret > 0) {
2067         perror("rdma migration: post rdma write failed");
2068         return -ret;
2069     }
2070
2071     set_bit(chunk, block->transit_bitmap);
2072     acct_update_position(f, sge.length, false);
2073     rdma->total_writes++;
2074
2075     return 0;
2076 }
2077
2078 /*
2079  * Push out any unwritten RDMA operations.
2080  *
2081  * We support sending out multiple chunks at the same time.
2082  * Not all of them need to get signaled in the completion queue.
2083  */
2084 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2085 {
2086     int ret;
2087
2088     if (!rdma->current_length) {
2089         return 0;
2090     }
2091
2092     ret = qemu_rdma_write_one(f, rdma,
2093             rdma->current_index, rdma->current_addr, rdma->current_length);
2094
2095     if (ret < 0) {
2096         return ret;
2097     }
2098
2099     if (ret == 0) {
2100         rdma->nb_sent++;
2101         trace_qemu_rdma_write_flush(rdma->nb_sent);
2102     }
2103
2104     rdma->current_length = 0;
2105     rdma->current_addr = 0;
2106
2107     return 0;
2108 }
2109
2110 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2111                     uint64_t offset, uint64_t len)
2112 {
2113     RDMALocalBlock *block;
2114     uint8_t *host_addr;
2115     uint8_t *chunk_end;
2116
2117     if (rdma->current_index < 0) {
2118         return 0;
2119     }
2120
2121     if (rdma->current_chunk < 0) {
2122         return 0;
2123     }
2124
2125     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2126     host_addr = block->local_host_addr + (offset - block->offset);
2127     chunk_end = ram_chunk_end(block, rdma->current_chunk);
2128
2129     if (rdma->current_length == 0) {
2130         return 0;
2131     }
2132
2133     /*
2134      * Only merge into chunk sequentially.
2135      */
2136     if (offset != (rdma->current_addr + rdma->current_length)) {
2137         return 0;
2138     }
2139
2140     if (offset < block->offset) {
2141         return 0;
2142     }
2143
2144     if ((offset + len) > (block->offset + block->length)) {
2145         return 0;
2146     }
2147
2148     if ((host_addr + len) > chunk_end) {
2149         return 0;
2150     }
2151
2152     return 1;
2153 }
2154
2155 /*
2156  * We're not actually writing here, but doing three things:
2157  *
2158  * 1. Identify the chunk the buffer belongs to.
2159  * 2. If the chunk is full or the buffer doesn't belong to the current
2160  *    chunk, then start a new chunk and flush() the old chunk.
2161  * 3. To keep the hardware busy, we also group chunks into batches
2162  *    and only require that a batch gets acknowledged in the completion
2163  *    qeueue instead of each individual chunk.
2164  */
2165 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2166                            uint64_t block_offset, uint64_t offset,
2167                            uint64_t len)
2168 {
2169     uint64_t current_addr = block_offset + offset;
2170     uint64_t index = rdma->current_index;
2171     uint64_t chunk = rdma->current_chunk;
2172     int ret;
2173
2174     /* If we cannot merge it, we flush the current buffer first. */
2175     if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2176         ret = qemu_rdma_write_flush(f, rdma);
2177         if (ret) {
2178             return ret;
2179         }
2180         rdma->current_length = 0;
2181         rdma->current_addr = current_addr;
2182
2183         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2184                                          offset, len, &index, &chunk);
2185         if (ret) {
2186             error_report("ram block search failed");
2187             return ret;
2188         }
2189         rdma->current_index = index;
2190         rdma->current_chunk = chunk;
2191     }
2192
2193     /* merge it */
2194     rdma->current_length += len;
2195
2196     /* flush it if buffer is too large */
2197     if (rdma->current_length >= RDMA_MERGE_MAX) {
2198         return qemu_rdma_write_flush(f, rdma);
2199     }
2200
2201     return 0;
2202 }
2203
2204 static void qemu_rdma_cleanup(RDMAContext *rdma)
2205 {
2206     struct rdma_cm_event *cm_event;
2207     int ret, idx;
2208
2209     if (rdma->cm_id && rdma->connected) {
2210         if (rdma->error_state && !rdma->received_error) {
2211             RDMAControlHeader head = { .len = 0,
2212                                        .type = RDMA_CONTROL_ERROR,
2213                                        .repeat = 1,
2214                                      };
2215             error_report("Early error. Sending error.");
2216             qemu_rdma_post_send_control(rdma, NULL, &head);
2217         }
2218
2219         ret = rdma_disconnect(rdma->cm_id);
2220         if (!ret) {
2221             trace_qemu_rdma_cleanup_waiting_for_disconnect();
2222             ret = rdma_get_cm_event(rdma->channel, &cm_event);
2223             if (!ret) {
2224                 rdma_ack_cm_event(cm_event);
2225             }
2226         }
2227         trace_qemu_rdma_cleanup_disconnect();
2228         rdma->connected = false;
2229     }
2230
2231     g_free(rdma->dest_blocks);
2232     rdma->dest_blocks = NULL;
2233
2234     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2235         if (rdma->wr_data[idx].control_mr) {
2236             rdma->total_registrations--;
2237             ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2238         }
2239         rdma->wr_data[idx].control_mr = NULL;
2240     }
2241
2242     if (rdma->local_ram_blocks.block) {
2243         while (rdma->local_ram_blocks.nb_blocks) {
2244             rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
2245         }
2246     }
2247
2248     if (rdma->qp) {
2249         rdma_destroy_qp(rdma->cm_id);
2250         rdma->qp = NULL;
2251     }
2252     if (rdma->cq) {
2253         ibv_destroy_cq(rdma->cq);
2254         rdma->cq = NULL;
2255     }
2256     if (rdma->comp_channel) {
2257         ibv_destroy_comp_channel(rdma->comp_channel);
2258         rdma->comp_channel = NULL;
2259     }
2260     if (rdma->pd) {
2261         ibv_dealloc_pd(rdma->pd);
2262         rdma->pd = NULL;
2263     }
2264     if (rdma->cm_id) {
2265         rdma_destroy_id(rdma->cm_id);
2266         rdma->cm_id = NULL;
2267     }
2268     if (rdma->listen_id) {
2269         rdma_destroy_id(rdma->listen_id);
2270         rdma->listen_id = NULL;
2271     }
2272     if (rdma->channel) {
2273         rdma_destroy_event_channel(rdma->channel);
2274         rdma->channel = NULL;
2275     }
2276     g_free(rdma->host);
2277     rdma->host = NULL;
2278 }
2279
2280
2281 static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
2282 {
2283     int ret, idx;
2284     Error *local_err = NULL, **temp = &local_err;
2285
2286     /*
2287      * Will be validated against destination's actual capabilities
2288      * after the connect() completes.
2289      */
2290     rdma->pin_all = pin_all;
2291
2292     ret = qemu_rdma_resolve_host(rdma, temp);
2293     if (ret) {
2294         goto err_rdma_source_init;
2295     }
2296
2297     ret = qemu_rdma_alloc_pd_cq(rdma);
2298     if (ret) {
2299         ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2300                     " limits may be too low. Please check $ ulimit -a # and "
2301                     "search for 'ulimit -l' in the output");
2302         goto err_rdma_source_init;
2303     }
2304
2305     ret = qemu_rdma_alloc_qp(rdma);
2306     if (ret) {
2307         ERROR(temp, "rdma migration: error allocating qp!");
2308         goto err_rdma_source_init;
2309     }
2310
2311     ret = qemu_rdma_init_ram_blocks(rdma);
2312     if (ret) {
2313         ERROR(temp, "rdma migration: error initializing ram blocks!");
2314         goto err_rdma_source_init;
2315     }
2316
2317     /* Build the hash that maps from offset to RAMBlock */
2318     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
2319     for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
2320         g_hash_table_insert(rdma->blockmap,
2321                 (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
2322                 &rdma->local_ram_blocks.block[idx]);
2323     }
2324
2325     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2326         ret = qemu_rdma_reg_control(rdma, idx);
2327         if (ret) {
2328             ERROR(temp, "rdma migration: error registering %d control!",
2329                                                             idx);
2330             goto err_rdma_source_init;
2331         }
2332     }
2333
2334     return 0;
2335
2336 err_rdma_source_init:
2337     error_propagate(errp, local_err);
2338     qemu_rdma_cleanup(rdma);
2339     return -1;
2340 }
2341
2342 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2343 {
2344     RDMACapabilities cap = {
2345                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2346                                 .flags = 0,
2347                            };
2348     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2349                                           .retry_count = 5,
2350                                           .private_data = &cap,
2351                                           .private_data_len = sizeof(cap),
2352                                         };
2353     struct rdma_cm_event *cm_event;
2354     int ret;
2355
2356     /*
2357      * Only negotiate the capability with destination if the user
2358      * on the source first requested the capability.
2359      */
2360     if (rdma->pin_all) {
2361         trace_qemu_rdma_connect_pin_all_requested();
2362         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2363     }
2364
2365     caps_to_network(&cap);
2366
2367     ret = rdma_connect(rdma->cm_id, &conn_param);
2368     if (ret) {
2369         perror("rdma_connect");
2370         ERROR(errp, "connecting to destination!");
2371         goto err_rdma_source_connect;
2372     }
2373
2374     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2375     if (ret) {
2376         perror("rdma_get_cm_event after rdma_connect");
2377         ERROR(errp, "connecting to destination!");
2378         rdma_ack_cm_event(cm_event);
2379         goto err_rdma_source_connect;
2380     }
2381
2382     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2383         perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2384         ERROR(errp, "connecting to destination!");
2385         rdma_ack_cm_event(cm_event);
2386         goto err_rdma_source_connect;
2387     }
2388     rdma->connected = true;
2389
2390     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2391     network_to_caps(&cap);
2392
2393     /*
2394      * Verify that the *requested* capabilities are supported by the destination
2395      * and disable them otherwise.
2396      */
2397     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2398         ERROR(errp, "Server cannot support pinning all memory. "
2399                         "Will register memory dynamically.");
2400         rdma->pin_all = false;
2401     }
2402
2403     trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2404
2405     rdma_ack_cm_event(cm_event);
2406
2407     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2408     if (ret) {
2409         ERROR(errp, "posting second control recv!");
2410         goto err_rdma_source_connect;
2411     }
2412
2413     rdma->control_ready_expected = 1;
2414     rdma->nb_sent = 0;
2415     return 0;
2416
2417 err_rdma_source_connect:
2418     qemu_rdma_cleanup(rdma);
2419     return -1;
2420 }
2421
2422 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2423 {
2424     int ret, idx;
2425     struct rdma_cm_id *listen_id;
2426     char ip[40] = "unknown";
2427     struct rdma_addrinfo *res, *e;
2428     char port_str[16];
2429
2430     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2431         rdma->wr_data[idx].control_len = 0;
2432         rdma->wr_data[idx].control_curr = NULL;
2433     }
2434
2435     if (!rdma->host || !rdma->host[0]) {
2436         ERROR(errp, "RDMA host is not set!");
2437         rdma->error_state = -EINVAL;
2438         return -1;
2439     }
2440     /* create CM channel */
2441     rdma->channel = rdma_create_event_channel();
2442     if (!rdma->channel) {
2443         ERROR(errp, "could not create rdma event channel");
2444         rdma->error_state = -EINVAL;
2445         return -1;
2446     }
2447
2448     /* create CM id */
2449     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2450     if (ret) {
2451         ERROR(errp, "could not create cm_id!");
2452         goto err_dest_init_create_listen_id;
2453     }
2454
2455     snprintf(port_str, 16, "%d", rdma->port);
2456     port_str[15] = '\0';
2457
2458     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2459     if (ret < 0) {
2460         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2461         goto err_dest_init_bind_addr;
2462     }
2463
2464     for (e = res; e != NULL; e = e->ai_next) {
2465         inet_ntop(e->ai_family,
2466             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2467         trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2468         ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2469         if (ret) {
2470             continue;
2471         }
2472         if (e->ai_family == AF_INET6) {
2473             ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
2474             if (ret) {
2475                 continue;
2476             }
2477         }
2478         break;
2479     }
2480
2481     if (!e) {
2482         ERROR(errp, "Error: could not rdma_bind_addr!");
2483         goto err_dest_init_bind_addr;
2484     }
2485
2486     rdma->listen_id = listen_id;
2487     qemu_rdma_dump_gid("dest_init", listen_id);
2488     return 0;
2489
2490 err_dest_init_bind_addr:
2491     rdma_destroy_id(listen_id);
2492 err_dest_init_create_listen_id:
2493     rdma_destroy_event_channel(rdma->channel);
2494     rdma->channel = NULL;
2495     rdma->error_state = ret;
2496     return ret;
2497
2498 }
2499
2500 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2501 {
2502     RDMAContext *rdma = NULL;
2503     InetSocketAddress *addr;
2504
2505     if (host_port) {
2506         rdma = g_new0(RDMAContext, 1);
2507         rdma->current_index = -1;
2508         rdma->current_chunk = -1;
2509
2510         addr = g_new(InetSocketAddress, 1);
2511         if (!inet_parse(addr, host_port, NULL)) {
2512             rdma->port = atoi(addr->port);
2513             rdma->host = g_strdup(addr->host);
2514         } else {
2515             ERROR(errp, "bad RDMA migration address '%s'", host_port);
2516             g_free(rdma);
2517             rdma = NULL;
2518         }
2519
2520         qapi_free_InetSocketAddress(addr);
2521     }
2522
2523     return rdma;
2524 }
2525
2526 /*
2527  * QEMUFile interface to the control channel.
2528  * SEND messages for control only.
2529  * VM's ram is handled with regular RDMA messages.
2530  */
2531 static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
2532                                        const struct iovec *iov,
2533                                        size_t niov,
2534                                        int *fds,
2535                                        size_t nfds,
2536                                        Error **errp)
2537 {
2538     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2539     QEMUFile *f = rioc->file;
2540     RDMAContext *rdma = rioc->rdma;
2541     int ret;
2542     ssize_t done = 0;
2543     size_t i;
2544
2545     CHECK_ERROR_STATE();
2546
2547     /*
2548      * Push out any writes that
2549      * we're queued up for VM's ram.
2550      */
2551     ret = qemu_rdma_write_flush(f, rdma);
2552     if (ret < 0) {
2553         rdma->error_state = ret;
2554         return ret;
2555     }
2556
2557     for (i = 0; i < niov; i++) {
2558         size_t remaining = iov[i].iov_len;
2559         uint8_t * data = (void *)iov[i].iov_base;
2560         while (remaining) {
2561             RDMAControlHeader head;
2562
2563             rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
2564             remaining -= rioc->len;
2565
2566             head.len = rioc->len;
2567             head.type = RDMA_CONTROL_QEMU_FILE;
2568
2569             ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2570
2571             if (ret < 0) {
2572                 rdma->error_state = ret;
2573                 return ret;
2574             }
2575
2576             data += rioc->len;
2577             done += rioc->len;
2578         }
2579     }
2580
2581     return done;
2582 }
2583
2584 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2585                              size_t size, int idx)
2586 {
2587     size_t len = 0;
2588
2589     if (rdma->wr_data[idx].control_len) {
2590         trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2591
2592         len = MIN(size, rdma->wr_data[idx].control_len);
2593         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2594         rdma->wr_data[idx].control_curr += len;
2595         rdma->wr_data[idx].control_len -= len;
2596     }
2597
2598     return len;
2599 }
2600
2601 /*
2602  * QEMUFile interface to the control channel.
2603  * RDMA links don't use bytestreams, so we have to
2604  * return bytes to QEMUFile opportunistically.
2605  */
2606 static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
2607                                       const struct iovec *iov,
2608                                       size_t niov,
2609                                       int **fds,
2610                                       size_t *nfds,
2611                                       Error **errp)
2612 {
2613     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2614     RDMAContext *rdma = rioc->rdma;
2615     RDMAControlHeader head;
2616     int ret = 0;
2617     ssize_t i;
2618     size_t done = 0;
2619
2620     CHECK_ERROR_STATE();
2621
2622     for (i = 0; i < niov; i++) {
2623         size_t want = iov[i].iov_len;
2624         uint8_t *data = (void *)iov[i].iov_base;
2625
2626         /*
2627          * First, we hold on to the last SEND message we
2628          * were given and dish out the bytes until we run
2629          * out of bytes.
2630          */
2631         ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2632         done += ret;
2633         want -= ret;
2634         /* Got what we needed, so go to next iovec */
2635         if (want == 0) {
2636             continue;
2637         }
2638
2639         /* If we got any data so far, then don't wait
2640          * for more, just return what we have */
2641         if (done > 0) {
2642             break;
2643         }
2644
2645
2646         /* We've got nothing at all, so lets wait for
2647          * more to arrive
2648          */
2649         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2650
2651         if (ret < 0) {
2652             rdma->error_state = ret;
2653             return ret;
2654         }
2655
2656         /*
2657          * SEND was received with new bytes, now try again.
2658          */
2659         ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
2660         done += ret;
2661         want -= ret;
2662
2663         /* Still didn't get enough, so lets just return */
2664         if (want) {
2665             if (done == 0) {
2666                 return QIO_CHANNEL_ERR_BLOCK;
2667             } else {
2668                 break;
2669             }
2670         }
2671     }
2672     rioc->len = done;
2673     return rioc->len;
2674 }
2675
2676 /*
2677  * Block until all the outstanding chunks have been delivered by the hardware.
2678  */
2679 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2680 {
2681     int ret;
2682
2683     if (qemu_rdma_write_flush(f, rdma) < 0) {
2684         return -EIO;
2685     }
2686
2687     while (rdma->nb_sent) {
2688         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2689         if (ret < 0) {
2690             error_report("rdma migration: complete polling error!");
2691             return -EIO;
2692         }
2693     }
2694
2695     qemu_rdma_unregister_waiting(rdma);
2696
2697     return 0;
2698 }
2699
2700
2701 static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
2702                                          bool blocking,
2703                                          Error **errp)
2704 {
2705     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2706     /* XXX we should make readv/writev actually honour this :-) */
2707     rioc->blocking = blocking;
2708     return 0;
2709 }
2710
2711
2712 typedef struct QIOChannelRDMASource QIOChannelRDMASource;
2713 struct QIOChannelRDMASource {
2714     GSource parent;
2715     QIOChannelRDMA *rioc;
2716     GIOCondition condition;
2717 };
2718
2719 static gboolean
2720 qio_channel_rdma_source_prepare(GSource *source,
2721                                 gint *timeout)
2722 {
2723     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2724     RDMAContext *rdma = rsource->rioc->rdma;
2725     GIOCondition cond = 0;
2726     *timeout = -1;
2727
2728     if (rdma->wr_data[0].control_len) {
2729         cond |= G_IO_IN;
2730     }
2731     cond |= G_IO_OUT;
2732
2733     return cond & rsource->condition;
2734 }
2735
2736 static gboolean
2737 qio_channel_rdma_source_check(GSource *source)
2738 {
2739     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2740     RDMAContext *rdma = rsource->rioc->rdma;
2741     GIOCondition cond = 0;
2742
2743     if (rdma->wr_data[0].control_len) {
2744         cond |= G_IO_IN;
2745     }
2746     cond |= G_IO_OUT;
2747
2748     return cond & rsource->condition;
2749 }
2750
2751 static gboolean
2752 qio_channel_rdma_source_dispatch(GSource *source,
2753                                  GSourceFunc callback,
2754                                  gpointer user_data)
2755 {
2756     QIOChannelFunc func = (QIOChannelFunc)callback;
2757     QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
2758     RDMAContext *rdma = rsource->rioc->rdma;
2759     GIOCondition cond = 0;
2760
2761     if (rdma->wr_data[0].control_len) {
2762         cond |= G_IO_IN;
2763     }
2764     cond |= G_IO_OUT;
2765
2766     return (*func)(QIO_CHANNEL(rsource->rioc),
2767                    (cond & rsource->condition),
2768                    user_data);
2769 }
2770
2771 static void
2772 qio_channel_rdma_source_finalize(GSource *source)
2773 {
2774     QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
2775
2776     object_unref(OBJECT(ssource->rioc));
2777 }
2778
2779 GSourceFuncs qio_channel_rdma_source_funcs = {
2780     qio_channel_rdma_source_prepare,
2781     qio_channel_rdma_source_check,
2782     qio_channel_rdma_source_dispatch,
2783     qio_channel_rdma_source_finalize
2784 };
2785
2786 static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
2787                                               GIOCondition condition)
2788 {
2789     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2790     QIOChannelRDMASource *ssource;
2791     GSource *source;
2792
2793     source = g_source_new(&qio_channel_rdma_source_funcs,
2794                           sizeof(QIOChannelRDMASource));
2795     ssource = (QIOChannelRDMASource *)source;
2796
2797     ssource->rioc = rioc;
2798     object_ref(OBJECT(rioc));
2799
2800     ssource->condition = condition;
2801
2802     return source;
2803 }
2804
2805
2806 static int qio_channel_rdma_close(QIOChannel *ioc,
2807                                   Error **errp)
2808 {
2809     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
2810     trace_qemu_rdma_close();
2811     if (rioc->rdma) {
2812         if (!rioc->rdma->error_state) {
2813             rioc->rdma->error_state = qemu_file_get_error(rioc->file);
2814         }
2815         qemu_rdma_cleanup(rioc->rdma);
2816         g_free(rioc->rdma);
2817         rioc->rdma = NULL;
2818     }
2819     return 0;
2820 }
2821
2822 /*
2823  * Parameters:
2824  *    @offset == 0 :
2825  *        This means that 'block_offset' is a full virtual address that does not
2826  *        belong to a RAMBlock of the virtual machine and instead
2827  *        represents a private malloc'd memory area that the caller wishes to
2828  *        transfer.
2829  *
2830  *    @offset != 0 :
2831  *        Offset is an offset to be added to block_offset and used
2832  *        to also lookup the corresponding RAMBlock.
2833  *
2834  *    @size > 0 :
2835  *        Initiate an transfer this size.
2836  *
2837  *    @size == 0 :
2838  *        A 'hint' or 'advice' that means that we wish to speculatively
2839  *        and asynchronously unregister this memory. In this case, there is no
2840  *        guarantee that the unregister will actually happen, for example,
2841  *        if the memory is being actively transmitted. Additionally, the memory
2842  *        may be re-registered at any future time if a write within the same
2843  *        chunk was requested again, even if you attempted to unregister it
2844  *        here.
2845  *
2846  *    @size < 0 : TODO, not yet supported
2847  *        Unregister the memory NOW. This means that the caller does not
2848  *        expect there to be any future RDMA transfers and we just want to clean
2849  *        things up. This is used in case the upper layer owns the memory and
2850  *        cannot wait for qemu_fclose() to occur.
2851  *
2852  *    @bytes_sent : User-specificed pointer to indicate how many bytes were
2853  *                  sent. Usually, this will not be more than a few bytes of
2854  *                  the protocol because most transfers are sent asynchronously.
2855  */
2856 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2857                                   ram_addr_t block_offset, ram_addr_t offset,
2858                                   size_t size, uint64_t *bytes_sent)
2859 {
2860     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
2861     RDMAContext *rdma = rioc->rdma;
2862     int ret;
2863
2864     CHECK_ERROR_STATE();
2865
2866     qemu_fflush(f);
2867
2868     if (size > 0) {
2869         /*
2870          * Add this page to the current 'chunk'. If the chunk
2871          * is full, or the page doen't belong to the current chunk,
2872          * an actual RDMA write will occur and a new chunk will be formed.
2873          */
2874         ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2875         if (ret < 0) {
2876             error_report("rdma migration: write error! %d", ret);
2877             goto err;
2878         }
2879
2880         /*
2881          * We always return 1 bytes because the RDMA
2882          * protocol is completely asynchronous. We do not yet know
2883          * whether an  identified chunk is zero or not because we're
2884          * waiting for other pages to potentially be merged with
2885          * the current chunk. So, we have to call qemu_update_position()
2886          * later on when the actual write occurs.
2887          */
2888         if (bytes_sent) {
2889             *bytes_sent = 1;
2890         }
2891     } else {
2892         uint64_t index, chunk;
2893
2894         /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
2895         if (size < 0) {
2896             ret = qemu_rdma_drain_cq(f, rdma);
2897             if (ret < 0) {
2898                 fprintf(stderr, "rdma: failed to synchronously drain"
2899                                 " completion queue before unregistration.\n");
2900                 goto err;
2901             }
2902         }
2903         */
2904
2905         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2906                                          offset, size, &index, &chunk);
2907
2908         if (ret) {
2909             error_report("ram block search failed");
2910             goto err;
2911         }
2912
2913         qemu_rdma_signal_unregister(rdma, index, chunk, 0);
2914
2915         /*
2916          * TODO: Synchronous, guaranteed unregistration (should not occur during
2917          * fast-path). Otherwise, unregisters will process on the next call to
2918          * qemu_rdma_drain_cq()
2919         if (size < 0) {
2920             qemu_rdma_unregister_waiting(rdma);
2921         }
2922         */
2923     }
2924
2925     /*
2926      * Drain the Completion Queue if possible, but do not block,
2927      * just poll.
2928      *
2929      * If nothing to poll, the end of the iteration will do this
2930      * again to make sure we don't overflow the request queue.
2931      */
2932     while (1) {
2933         uint64_t wr_id, wr_id_in;
2934         int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
2935         if (ret < 0) {
2936             error_report("rdma migration: polling error! %d", ret);
2937             goto err;
2938         }
2939
2940         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
2941
2942         if (wr_id == RDMA_WRID_NONE) {
2943             break;
2944         }
2945     }
2946
2947     return RAM_SAVE_CONTROL_DELAYED;
2948 err:
2949     rdma->error_state = ret;
2950     return ret;
2951 }
2952
2953 static int qemu_rdma_accept(RDMAContext *rdma)
2954 {
2955     RDMACapabilities cap;
2956     struct rdma_conn_param conn_param = {
2957                                             .responder_resources = 2,
2958                                             .private_data = &cap,
2959                                             .private_data_len = sizeof(cap),
2960                                          };
2961     struct rdma_cm_event *cm_event;
2962     struct ibv_context *verbs;
2963     int ret = -EINVAL;
2964     int idx;
2965
2966     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2967     if (ret) {
2968         goto err_rdma_dest_wait;
2969     }
2970
2971     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
2972         rdma_ack_cm_event(cm_event);
2973         goto err_rdma_dest_wait;
2974     }
2975
2976     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2977
2978     network_to_caps(&cap);
2979
2980     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
2981             error_report("Unknown source RDMA version: %d, bailing...",
2982                             cap.version);
2983             rdma_ack_cm_event(cm_event);
2984             goto err_rdma_dest_wait;
2985     }
2986
2987     /*
2988      * Respond with only the capabilities this version of QEMU knows about.
2989      */
2990     cap.flags &= known_capabilities;
2991
2992     /*
2993      * Enable the ones that we do know about.
2994      * Add other checks here as new ones are introduced.
2995      */
2996     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
2997         rdma->pin_all = true;
2998     }
2999
3000     rdma->cm_id = cm_event->id;
3001     verbs = cm_event->id->verbs;
3002
3003     rdma_ack_cm_event(cm_event);
3004
3005     trace_qemu_rdma_accept_pin_state(rdma->pin_all);
3006
3007     caps_to_network(&cap);
3008
3009     trace_qemu_rdma_accept_pin_verbsc(verbs);
3010
3011     if (!rdma->verbs) {
3012         rdma->verbs = verbs;
3013     } else if (rdma->verbs != verbs) {
3014             error_report("ibv context not matching %p, %p!", rdma->verbs,
3015                          verbs);
3016             goto err_rdma_dest_wait;
3017     }
3018
3019     qemu_rdma_dump_id("dest_init", verbs);
3020
3021     ret = qemu_rdma_alloc_pd_cq(rdma);
3022     if (ret) {
3023         error_report("rdma migration: error allocating pd and cq!");
3024         goto err_rdma_dest_wait;
3025     }
3026
3027     ret = qemu_rdma_alloc_qp(rdma);
3028     if (ret) {
3029         error_report("rdma migration: error allocating qp!");
3030         goto err_rdma_dest_wait;
3031     }
3032
3033     ret = qemu_rdma_init_ram_blocks(rdma);
3034     if (ret) {
3035         error_report("rdma migration: error initializing ram blocks!");
3036         goto err_rdma_dest_wait;
3037     }
3038
3039     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
3040         ret = qemu_rdma_reg_control(rdma, idx);
3041         if (ret) {
3042             error_report("rdma: error registering %d control", idx);
3043             goto err_rdma_dest_wait;
3044         }
3045     }
3046
3047     qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
3048
3049     ret = rdma_accept(rdma->cm_id, &conn_param);
3050     if (ret) {
3051         error_report("rdma_accept returns %d", ret);
3052         goto err_rdma_dest_wait;
3053     }
3054
3055     ret = rdma_get_cm_event(rdma->channel, &cm_event);
3056     if (ret) {
3057         error_report("rdma_accept get_cm_event failed %d", ret);
3058         goto err_rdma_dest_wait;
3059     }
3060
3061     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
3062         error_report("rdma_accept not event established");
3063         rdma_ack_cm_event(cm_event);
3064         goto err_rdma_dest_wait;
3065     }
3066
3067     rdma_ack_cm_event(cm_event);
3068     rdma->connected = true;
3069
3070     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
3071     if (ret) {
3072         error_report("rdma migration: error posting second control recv");
3073         goto err_rdma_dest_wait;
3074     }
3075
3076     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
3077
3078     return 0;
3079
3080 err_rdma_dest_wait:
3081     rdma->error_state = ret;
3082     qemu_rdma_cleanup(rdma);
3083     return ret;
3084 }
3085
3086 static int dest_ram_sort_func(const void *a, const void *b)
3087 {
3088     unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
3089     unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
3090
3091     return (a_index < b_index) ? -1 : (a_index != b_index);
3092 }
3093
3094 /*
3095  * During each iteration of the migration, we listen for instructions
3096  * by the source VM to perform dynamic page registrations before they
3097  * can perform RDMA operations.
3098  *
3099  * We respond with the 'rkey'.
3100  *
3101  * Keep doing this until the source tells us to stop.
3102  */
3103 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
3104 {
3105     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
3106                                .type = RDMA_CONTROL_REGISTER_RESULT,
3107                                .repeat = 0,
3108                              };
3109     RDMAControlHeader unreg_resp = { .len = 0,
3110                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
3111                                .repeat = 0,
3112                              };
3113     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
3114                                  .repeat = 1 };
3115     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3116     RDMAContext *rdma = rioc->rdma;
3117     RDMALocalBlocks *local = &rdma->local_ram_blocks;
3118     RDMAControlHeader head;
3119     RDMARegister *reg, *registers;
3120     RDMACompress *comp;
3121     RDMARegisterResult *reg_result;
3122     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
3123     RDMALocalBlock *block;
3124     void *host_addr;
3125     int ret = 0;
3126     int idx = 0;
3127     int count = 0;
3128     int i = 0;
3129
3130     CHECK_ERROR_STATE();
3131
3132     do {
3133         trace_qemu_rdma_registration_handle_wait();
3134
3135         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
3136
3137         if (ret < 0) {
3138             break;
3139         }
3140
3141         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
3142             error_report("rdma: Too many requests in this message (%d)."
3143                             "Bailing.", head.repeat);
3144             ret = -EIO;
3145             break;
3146         }
3147
3148         switch (head.type) {
3149         case RDMA_CONTROL_COMPRESS:
3150             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
3151             network_to_compress(comp);
3152
3153             trace_qemu_rdma_registration_handle_compress(comp->length,
3154                                                          comp->block_idx,
3155                                                          comp->offset);
3156             if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
3157                 error_report("rdma: 'compress' bad block index %u (vs %d)",
3158                              (unsigned int)comp->block_idx,
3159                              rdma->local_ram_blocks.nb_blocks);
3160                 ret = -EIO;
3161                 goto out;
3162             }
3163             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
3164
3165             host_addr = block->local_host_addr +
3166                             (comp->offset - block->offset);
3167
3168             ram_handle_compressed(host_addr, comp->value, comp->length);
3169             break;
3170
3171         case RDMA_CONTROL_REGISTER_FINISHED:
3172             trace_qemu_rdma_registration_handle_finished();
3173             goto out;
3174
3175         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
3176             trace_qemu_rdma_registration_handle_ram_blocks();
3177
3178             /* Sort our local RAM Block list so it's the same as the source,
3179              * we can do this since we've filled in a src_index in the list
3180              * as we received the RAMBlock list earlier.
3181              */
3182             qsort(rdma->local_ram_blocks.block,
3183                   rdma->local_ram_blocks.nb_blocks,
3184                   sizeof(RDMALocalBlock), dest_ram_sort_func);
3185             if (rdma->pin_all) {
3186                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
3187                 if (ret) {
3188                     error_report("rdma migration: error dest "
3189                                     "registering ram blocks");
3190                     goto out;
3191                 }
3192             }
3193
3194             /*
3195              * Dest uses this to prepare to transmit the RAMBlock descriptions
3196              * to the source VM after connection setup.
3197              * Both sides use the "remote" structure to communicate and update
3198              * their "local" descriptions with what was sent.
3199              */
3200             for (i = 0; i < local->nb_blocks; i++) {
3201                 rdma->dest_blocks[i].remote_host_addr =
3202                     (uintptr_t)(local->block[i].local_host_addr);
3203
3204                 if (rdma->pin_all) {
3205                     rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
3206                 }
3207
3208                 rdma->dest_blocks[i].offset = local->block[i].offset;
3209                 rdma->dest_blocks[i].length = local->block[i].length;
3210
3211                 dest_block_to_network(&rdma->dest_blocks[i]);
3212                 trace_qemu_rdma_registration_handle_ram_blocks_loop(
3213                     local->block[i].block_name,
3214                     local->block[i].offset,
3215                     local->block[i].length,
3216                     local->block[i].local_host_addr,
3217                     local->block[i].src_index);
3218             }
3219
3220             blocks.len = rdma->local_ram_blocks.nb_blocks
3221                                                 * sizeof(RDMADestBlock);
3222
3223
3224             ret = qemu_rdma_post_send_control(rdma,
3225                                         (uint8_t *) rdma->dest_blocks, &blocks);
3226
3227             if (ret < 0) {
3228                 error_report("rdma migration: error sending remote info");
3229                 goto out;
3230             }
3231
3232             break;
3233         case RDMA_CONTROL_REGISTER_REQUEST:
3234             trace_qemu_rdma_registration_handle_register(head.repeat);
3235
3236             reg_resp.repeat = head.repeat;
3237             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3238
3239             for (count = 0; count < head.repeat; count++) {
3240                 uint64_t chunk;
3241                 uint8_t *chunk_start, *chunk_end;
3242
3243                 reg = &registers[count];
3244                 network_to_register(reg);
3245
3246                 reg_result = &results[count];
3247
3248                 trace_qemu_rdma_registration_handle_register_loop(count,
3249                          reg->current_index, reg->key.current_addr, reg->chunks);
3250
3251                 if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
3252                     error_report("rdma: 'register' bad block index %u (vs %d)",
3253                                  (unsigned int)reg->current_index,
3254                                  rdma->local_ram_blocks.nb_blocks);
3255                     ret = -ENOENT;
3256                     goto out;
3257                 }
3258                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3259                 if (block->is_ram_block) {
3260                     if (block->offset > reg->key.current_addr) {
3261                         error_report("rdma: bad register address for block %s"
3262                             " offset: %" PRIx64 " current_addr: %" PRIx64,
3263                             block->block_name, block->offset,
3264                             reg->key.current_addr);
3265                         ret = -ERANGE;
3266                         goto out;
3267                     }
3268                     host_addr = (block->local_host_addr +
3269                                 (reg->key.current_addr - block->offset));
3270                     chunk = ram_chunk_index(block->local_host_addr,
3271                                             (uint8_t *) host_addr);
3272                 } else {
3273                     chunk = reg->key.chunk;
3274                     host_addr = block->local_host_addr +
3275                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3276                     /* Check for particularly bad chunk value */
3277                     if (host_addr < (void *)block->local_host_addr) {
3278                         error_report("rdma: bad chunk for block %s"
3279                             " chunk: %" PRIx64,
3280                             block->block_name, reg->key.chunk);
3281                         ret = -ERANGE;
3282                         goto out;
3283                     }
3284                 }
3285                 chunk_start = ram_chunk_start(block, chunk);
3286                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3287                 if (qemu_rdma_register_and_get_keys(rdma, block,
3288                             (uintptr_t)host_addr, NULL, &reg_result->rkey,
3289                             chunk, chunk_start, chunk_end)) {
3290                     error_report("cannot get rkey");
3291                     ret = -EINVAL;
3292                     goto out;
3293                 }
3294
3295                 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3296
3297                 trace_qemu_rdma_registration_handle_register_rkey(
3298                                                            reg_result->rkey);
3299
3300                 result_to_network(reg_result);
3301             }
3302
3303             ret = qemu_rdma_post_send_control(rdma,
3304                             (uint8_t *) results, &reg_resp);
3305
3306             if (ret < 0) {
3307                 error_report("Failed to send control buffer");
3308                 goto out;
3309             }
3310             break;
3311         case RDMA_CONTROL_UNREGISTER_REQUEST:
3312             trace_qemu_rdma_registration_handle_unregister(head.repeat);
3313             unreg_resp.repeat = head.repeat;
3314             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3315
3316             for (count = 0; count < head.repeat; count++) {
3317                 reg = &registers[count];
3318                 network_to_register(reg);
3319
3320                 trace_qemu_rdma_registration_handle_unregister_loop(count,
3321                            reg->current_index, reg->key.chunk);
3322
3323                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3324
3325                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3326                 block->pmr[reg->key.chunk] = NULL;
3327
3328                 if (ret != 0) {
3329                     perror("rdma unregistration chunk failed");
3330                     ret = -ret;
3331                     goto out;
3332                 }
3333
3334                 rdma->total_registrations--;
3335
3336                 trace_qemu_rdma_registration_handle_unregister_success(
3337                                                        reg->key.chunk);
3338             }
3339
3340             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3341
3342             if (ret < 0) {
3343                 error_report("Failed to send control buffer");
3344                 goto out;
3345             }
3346             break;
3347         case RDMA_CONTROL_REGISTER_RESULT:
3348             error_report("Invalid RESULT message at dest.");
3349             ret = -EIO;
3350             goto out;
3351         default:
3352             error_report("Unknown control message %s", control_desc[head.type]);
3353             ret = -EIO;
3354             goto out;
3355         }
3356     } while (1);
3357 out:
3358     if (ret < 0) {
3359         rdma->error_state = ret;
3360     }
3361     return ret;
3362 }
3363
3364 /* Destination:
3365  * Called via a ram_control_load_hook during the initial RAM load section which
3366  * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
3367  * on the source.
3368  * We've already built our local RAMBlock list, but not yet sent the list to
3369  * the source.
3370  */
3371 static int
3372 rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
3373 {
3374     RDMAContext *rdma = rioc->rdma;
3375     int curr;
3376     int found = -1;
3377
3378     /* Find the matching RAMBlock in our local list */
3379     for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
3380         if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
3381             found = curr;
3382             break;
3383         }
3384     }
3385
3386     if (found == -1) {
3387         error_report("RAMBlock '%s' not found on destination", name);
3388         return -ENOENT;
3389     }
3390
3391     rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
3392     trace_rdma_block_notification_handle(name, rdma->next_src_index);
3393     rdma->next_src_index++;
3394
3395     return 0;
3396 }
3397
3398 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
3399 {
3400     switch (flags) {
3401     case RAM_CONTROL_BLOCK_REG:
3402         return rdma_block_notification_handle(opaque, data);
3403
3404     case RAM_CONTROL_HOOK:
3405         return qemu_rdma_registration_handle(f, opaque);
3406
3407     default:
3408         /* Shouldn't be called with any other values */
3409         abort();
3410     }
3411 }
3412
3413 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3414                                         uint64_t flags, void *data)
3415 {
3416     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3417     RDMAContext *rdma = rioc->rdma;
3418
3419     CHECK_ERROR_STATE();
3420
3421     trace_qemu_rdma_registration_start(flags);
3422     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3423     qemu_fflush(f);
3424
3425     return 0;
3426 }
3427
3428 /*
3429  * Inform dest that dynamic registrations are done for now.
3430  * First, flush writes, if any.
3431  */
3432 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3433                                        uint64_t flags, void *data)
3434 {
3435     Error *local_err = NULL, **errp = &local_err;
3436     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
3437     RDMAContext *rdma = rioc->rdma;
3438     RDMAControlHeader head = { .len = 0, .repeat = 1 };
3439     int ret = 0;
3440
3441     CHECK_ERROR_STATE();
3442
3443     qemu_fflush(f);
3444     ret = qemu_rdma_drain_cq(f, rdma);
3445
3446     if (ret < 0) {
3447         goto err;
3448     }
3449
3450     if (flags == RAM_CONTROL_SETUP) {
3451         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3452         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3453         int reg_result_idx, i, nb_dest_blocks;
3454
3455         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3456         trace_qemu_rdma_registration_stop_ram();
3457
3458         /*
3459          * Make sure that we parallelize the pinning on both sides.
3460          * For very large guests, doing this serially takes a really
3461          * long time, so we have to 'interleave' the pinning locally
3462          * with the control messages by performing the pinning on this
3463          * side before we receive the control response from the other
3464          * side that the pinning has completed.
3465          */
3466         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3467                     &reg_result_idx, rdma->pin_all ?
3468                     qemu_rdma_reg_whole_ram_blocks : NULL);
3469         if (ret < 0) {
3470             ERROR(errp, "receiving remote info!");
3471             return ret;
3472         }
3473
3474         nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3475
3476         /*
3477          * The protocol uses two different sets of rkeys (mutually exclusive):
3478          * 1. One key to represent the virtual address of the entire ram block.
3479          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3480          * 2. One to represent individual chunks within a ram block.
3481          *    (dynamic chunk registration enabled - pin individual chunks.)
3482          *
3483          * Once the capability is successfully negotiated, the destination transmits
3484          * the keys to use (or sends them later) including the virtual addresses
3485          * and then propagates the remote ram block descriptions to his local copy.
3486          */
3487
3488         if (local->nb_blocks != nb_dest_blocks) {
3489             ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
3490                         "Your QEMU command line parameters are probably "
3491                         "not identical on both the source and destination.",
3492                         local->nb_blocks, nb_dest_blocks);
3493             rdma->error_state = -EINVAL;
3494             return -EINVAL;
3495         }
3496
3497         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3498         memcpy(rdma->dest_blocks,
3499             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3500         for (i = 0; i < nb_dest_blocks; i++) {
3501             network_to_dest_block(&rdma->dest_blocks[i]);
3502
3503             /* We require that the blocks are in the same order */
3504             if (rdma->dest_blocks[i].length != local->block[i].length) {
3505                 ERROR(errp, "Block %s/%d has a different length %" PRIu64
3506                             "vs %" PRIu64, local->block[i].block_name, i,
3507                             local->block[i].length,
3508                             rdma->dest_blocks[i].length);
3509                 rdma->error_state = -EINVAL;
3510                 return -EINVAL;
3511             }
3512             local->block[i].remote_host_addr =
3513                     rdma->dest_blocks[i].remote_host_addr;
3514             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3515         }
3516     }
3517
3518     trace_qemu_rdma_registration_stop(flags);
3519
3520     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3521     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3522
3523     if (ret < 0) {
3524         goto err;
3525     }
3526
3527     return 0;
3528 err:
3529     rdma->error_state = ret;
3530     return ret;
3531 }
3532
3533 static const QEMUFileHooks rdma_read_hooks = {
3534     .hook_ram_load = rdma_load_hook,
3535 };
3536
3537 static const QEMUFileHooks rdma_write_hooks = {
3538     .before_ram_iterate = qemu_rdma_registration_start,
3539     .after_ram_iterate  = qemu_rdma_registration_stop,
3540     .save_page          = qemu_rdma_save_page,
3541 };
3542
3543
3544 static void qio_channel_rdma_finalize(Object *obj)
3545 {
3546     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
3547     if (rioc->rdma) {
3548         qemu_rdma_cleanup(rioc->rdma);
3549         g_free(rioc->rdma);
3550         rioc->rdma = NULL;
3551     }
3552 }
3553
3554 static void qio_channel_rdma_class_init(ObjectClass *klass,
3555                                         void *class_data G_GNUC_UNUSED)
3556 {
3557     QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
3558
3559     ioc_klass->io_writev = qio_channel_rdma_writev;
3560     ioc_klass->io_readv = qio_channel_rdma_readv;
3561     ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
3562     ioc_klass->io_close = qio_channel_rdma_close;
3563     ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
3564 }
3565
3566 static const TypeInfo qio_channel_rdma_info = {
3567     .parent = TYPE_QIO_CHANNEL,
3568     .name = TYPE_QIO_CHANNEL_RDMA,
3569     .instance_size = sizeof(QIOChannelRDMA),
3570     .instance_finalize = qio_channel_rdma_finalize,
3571     .class_init = qio_channel_rdma_class_init,
3572 };
3573
3574 static void qio_channel_rdma_register_types(void)
3575 {
3576     type_register_static(&qio_channel_rdma_info);
3577 }
3578
3579 type_init(qio_channel_rdma_register_types);
3580
3581 static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3582 {
3583     QIOChannelRDMA *rioc;
3584
3585     if (qemu_file_mode_is_not_valid(mode)) {
3586         return NULL;
3587     }
3588
3589     rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
3590     rioc->rdma = rdma;
3591
3592     if (mode[0] == 'w') {
3593         rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
3594         qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
3595     } else {
3596         rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
3597         qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
3598     }
3599
3600     return rioc->file;
3601 }
3602
3603 static void rdma_accept_incoming_migration(void *opaque)
3604 {
3605     RDMAContext *rdma = opaque;
3606     int ret;
3607     QEMUFile *f;
3608     Error *local_err = NULL, **errp = &local_err;
3609
3610     trace_qemu_rdma_accept_incoming_migration();
3611     ret = qemu_rdma_accept(rdma);
3612
3613     if (ret) {
3614         ERROR(errp, "RDMA Migration initialization failed!");
3615         return;
3616     }
3617
3618     trace_qemu_rdma_accept_incoming_migration_accepted();
3619
3620     f = qemu_fopen_rdma(rdma, "rb");
3621     if (f == NULL) {
3622         ERROR(errp, "could not qemu_fopen_rdma!");
3623         qemu_rdma_cleanup(rdma);
3624         return;
3625     }
3626
3627     rdma->migration_started_on_destination = 1;
3628     migration_fd_process_incoming(f);
3629 }
3630
3631 void rdma_start_incoming_migration(const char *host_port, Error **errp)
3632 {
3633     int ret;
3634     RDMAContext *rdma;
3635     Error *local_err = NULL;
3636
3637     trace_rdma_start_incoming_migration();
3638     rdma = qemu_rdma_data_init(host_port, &local_err);
3639
3640     if (rdma == NULL) {
3641         goto err;
3642     }
3643
3644     ret = qemu_rdma_dest_init(rdma, &local_err);
3645
3646     if (ret) {
3647         goto err;
3648     }
3649
3650     trace_rdma_start_incoming_migration_after_dest_init();
3651
3652     ret = rdma_listen(rdma->listen_id, 5);
3653
3654     if (ret) {
3655         ERROR(errp, "listening on socket!");
3656         goto err;
3657     }
3658
3659     trace_rdma_start_incoming_migration_after_rdma_listen();
3660
3661     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3662                         NULL, (void *)(intptr_t)rdma);
3663     return;
3664 err:
3665     error_propagate(errp, local_err);
3666     g_free(rdma);
3667 }
3668
3669 void rdma_start_outgoing_migration(void *opaque,
3670                             const char *host_port, Error **errp)
3671 {
3672     MigrationState *s = opaque;
3673     RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
3674     int ret = 0;
3675
3676     if (rdma == NULL) {
3677         goto err;
3678     }
3679
3680     ret = qemu_rdma_source_init(rdma,
3681         s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
3682
3683     if (ret) {
3684         goto err;
3685     }
3686
3687     trace_rdma_start_outgoing_migration_after_rdma_source_init();
3688     ret = qemu_rdma_connect(rdma, errp);
3689
3690     if (ret) {
3691         goto err;
3692     }
3693
3694     trace_rdma_start_outgoing_migration_after_rdma_connect();
3695
3696     s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
3697     migrate_fd_connect(s);
3698     return;
3699 err:
3700     g_free(rdma);
3701 }
This page took 0.221795 seconds and 4 git commands to generate.