]> Git Repo - qemu.git/blob - migration/rdma.c
Merge remote-tracking branch 'remotes/borntraeger/tags/s390x-20150615' into staging
[qemu.git] / migration / rdma.c
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  *
6  * Authors:
7  *  Michael R. Hines <[email protected]>
8  *  Jiuxing Liu <[email protected]>
9  *
10  * This work is licensed under the terms of the GNU GPL, version 2 or
11  * later.  See the COPYING file in the top-level directory.
12  *
13  */
14 #include "qemu-common.h"
15 #include "migration/migration.h"
16 #include "migration/qemu-file.h"
17 #include "exec/cpu-common.h"
18 #include "qemu/main-loop.h"
19 #include "qemu/sockets.h"
20 #include "qemu/bitmap.h"
21 #include "block/coroutine.h"
22 #include <stdio.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <netdb.h>
26 #include <arpa/inet.h>
27 #include <string.h>
28 #include <rdma/rdma_cma.h>
29 #include "trace.h"
30
31 /*
32  * Print and error on both the Monitor and the Log file.
33  */
34 #define ERROR(errp, fmt, ...) \
35     do { \
36         fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
37         if (errp && (*(errp) == NULL)) { \
38             error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
39         } \
40     } while (0)
41
42 #define RDMA_RESOLVE_TIMEOUT_MS 10000
43
44 /* Do not merge data if larger than this. */
45 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
46 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
47
48 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
49
50 /*
51  * This is only for non-live state being migrated.
52  * Instead of RDMA_WRITE messages, we use RDMA_SEND
53  * messages for that state, which requires a different
54  * delivery design than main memory.
55  */
56 #define RDMA_SEND_INCREMENT 32768
57
58 /*
59  * Maximum size infiniband SEND message
60  */
61 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
62 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
63
64 #define RDMA_CONTROL_VERSION_CURRENT 1
65 /*
66  * Capabilities for negotiation.
67  */
68 #define RDMA_CAPABILITY_PIN_ALL 0x01
69
70 /*
71  * Add the other flags above to this list of known capabilities
72  * as they are introduced.
73  */
74 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
75
76 #define CHECK_ERROR_STATE() \
77     do { \
78         if (rdma->error_state) { \
79             if (!rdma->error_reported) { \
80                 error_report("RDMA is in an error state waiting migration" \
81                                 " to abort!"); \
82                 rdma->error_reported = 1; \
83             } \
84             return rdma->error_state; \
85         } \
86     } while (0);
87
88 /*
89  * A work request ID is 64-bits and we split up these bits
90  * into 3 parts:
91  *
92  * bits 0-15 : type of control message, 2^16
93  * bits 16-29: ram block index, 2^14
94  * bits 30-63: ram block chunk number, 2^34
95  *
96  * The last two bit ranges are only used for RDMA writes,
97  * in order to track their completion and potentially
98  * also track unregistration status of the message.
99  */
100 #define RDMA_WRID_TYPE_SHIFT  0UL
101 #define RDMA_WRID_BLOCK_SHIFT 16UL
102 #define RDMA_WRID_CHUNK_SHIFT 30UL
103
104 #define RDMA_WRID_TYPE_MASK \
105     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
106
107 #define RDMA_WRID_BLOCK_MASK \
108     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
109
110 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
111
112 /*
113  * RDMA migration protocol:
114  * 1. RDMA Writes (data messages, i.e. RAM)
115  * 2. IB Send/Recv (control channel messages)
116  */
117 enum {
118     RDMA_WRID_NONE = 0,
119     RDMA_WRID_RDMA_WRITE = 1,
120     RDMA_WRID_SEND_CONTROL = 2000,
121     RDMA_WRID_RECV_CONTROL = 4000,
122 };
123
124 static const char *wrid_desc[] = {
125     [RDMA_WRID_NONE] = "NONE",
126     [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
127     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
128     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
129 };
130
131 /*
132  * Work request IDs for IB SEND messages only (not RDMA writes).
133  * This is used by the migration protocol to transmit
134  * control messages (such as device state and registration commands)
135  *
136  * We could use more WRs, but we have enough for now.
137  */
138 enum {
139     RDMA_WRID_READY = 0,
140     RDMA_WRID_DATA,
141     RDMA_WRID_CONTROL,
142     RDMA_WRID_MAX,
143 };
144
145 /*
146  * SEND/RECV IB Control Messages.
147  */
148 enum {
149     RDMA_CONTROL_NONE = 0,
150     RDMA_CONTROL_ERROR,
151     RDMA_CONTROL_READY,               /* ready to receive */
152     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
153     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
154     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
155     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
156     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
157     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
158     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
159     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
160     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
161 };
162
163 static const char *control_desc[] = {
164     [RDMA_CONTROL_NONE] = "NONE",
165     [RDMA_CONTROL_ERROR] = "ERROR",
166     [RDMA_CONTROL_READY] = "READY",
167     [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
168     [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
169     [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
170     [RDMA_CONTROL_COMPRESS] = "COMPRESS",
171     [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
172     [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
173     [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
174     [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
175     [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
176 };
177
178 /*
179  * Memory and MR structures used to represent an IB Send/Recv work request.
180  * This is *not* used for RDMA writes, only IB Send/Recv.
181  */
182 typedef struct {
183     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
184     struct   ibv_mr *control_mr;               /* registration metadata */
185     size_t   control_len;                      /* length of the message */
186     uint8_t *control_curr;                     /* start of unconsumed bytes */
187 } RDMAWorkRequestData;
188
189 /*
190  * Negotiate RDMA capabilities during connection-setup time.
191  */
192 typedef struct {
193     uint32_t version;
194     uint32_t flags;
195 } RDMACapabilities;
196
197 static void caps_to_network(RDMACapabilities *cap)
198 {
199     cap->version = htonl(cap->version);
200     cap->flags = htonl(cap->flags);
201 }
202
203 static void network_to_caps(RDMACapabilities *cap)
204 {
205     cap->version = ntohl(cap->version);
206     cap->flags = ntohl(cap->flags);
207 }
208
209 /*
210  * Representation of a RAMBlock from an RDMA perspective.
211  * This is not transmitted, only local.
212  * This and subsequent structures cannot be linked lists
213  * because we're using a single IB message to transmit
214  * the information. It's small anyway, so a list is overkill.
215  */
216 typedef struct RDMALocalBlock {
217     uint8_t  *local_host_addr; /* local virtual address */
218     uint64_t remote_host_addr; /* remote virtual address */
219     uint64_t offset;
220     uint64_t length;
221     struct   ibv_mr **pmr;     /* MRs for chunk-level registration */
222     struct   ibv_mr *mr;       /* MR for non-chunk-level registration */
223     uint32_t *remote_keys;     /* rkeys for chunk-level registration */
224     uint32_t remote_rkey;      /* rkeys for non-chunk-level registration */
225     int      index;            /* which block are we */
226     bool     is_ram_block;
227     int      nb_chunks;
228     unsigned long *transit_bitmap;
229     unsigned long *unregister_bitmap;
230 } RDMALocalBlock;
231
232 /*
233  * Also represents a RAMblock, but only on the dest.
234  * This gets transmitted by the dest during connection-time
235  * to the source VM and then is used to populate the
236  * corresponding RDMALocalBlock with
237  * the information needed to perform the actual RDMA.
238  */
239 typedef struct QEMU_PACKED RDMADestBlock {
240     uint64_t remote_host_addr;
241     uint64_t offset;
242     uint64_t length;
243     uint32_t remote_rkey;
244     uint32_t padding;
245 } RDMADestBlock;
246
247 static uint64_t htonll(uint64_t v)
248 {
249     union { uint32_t lv[2]; uint64_t llv; } u;
250     u.lv[0] = htonl(v >> 32);
251     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
252     return u.llv;
253 }
254
255 static uint64_t ntohll(uint64_t v) {
256     union { uint32_t lv[2]; uint64_t llv; } u;
257     u.llv = v;
258     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
259 }
260
261 static void dest_block_to_network(RDMADestBlock *db)
262 {
263     db->remote_host_addr = htonll(db->remote_host_addr);
264     db->offset = htonll(db->offset);
265     db->length = htonll(db->length);
266     db->remote_rkey = htonl(db->remote_rkey);
267 }
268
269 static void network_to_dest_block(RDMADestBlock *db)
270 {
271     db->remote_host_addr = ntohll(db->remote_host_addr);
272     db->offset = ntohll(db->offset);
273     db->length = ntohll(db->length);
274     db->remote_rkey = ntohl(db->remote_rkey);
275 }
276
277 /*
278  * Virtual address of the above structures used for transmitting
279  * the RAMBlock descriptions at connection-time.
280  * This structure is *not* transmitted.
281  */
282 typedef struct RDMALocalBlocks {
283     int nb_blocks;
284     bool     init;             /* main memory init complete */
285     RDMALocalBlock *block;
286 } RDMALocalBlocks;
287
288 /*
289  * Main data structure for RDMA state.
290  * While there is only one copy of this structure being allocated right now,
291  * this is the place where one would start if you wanted to consider
292  * having more than one RDMA connection open at the same time.
293  */
294 typedef struct RDMAContext {
295     char *host;
296     int port;
297
298     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
299
300     /*
301      * This is used by *_exchange_send() to figure out whether or not
302      * the initial "READY" message has already been received or not.
303      * This is because other functions may potentially poll() and detect
304      * the READY message before send() does, in which case we need to
305      * know if it completed.
306      */
307     int control_ready_expected;
308
309     /* number of outstanding writes */
310     int nb_sent;
311
312     /* store info about current buffer so that we can
313        merge it with future sends */
314     uint64_t current_addr;
315     uint64_t current_length;
316     /* index of ram block the current buffer belongs to */
317     int current_index;
318     /* index of the chunk in the current ram block */
319     int current_chunk;
320
321     bool pin_all;
322
323     /*
324      * infiniband-specific variables for opening the device
325      * and maintaining connection state and so forth.
326      *
327      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
328      * cm_id->verbs, cm_id->channel, and cm_id->qp.
329      */
330     struct rdma_cm_id *cm_id;               /* connection manager ID */
331     struct rdma_cm_id *listen_id;
332     bool connected;
333
334     struct ibv_context          *verbs;
335     struct rdma_event_channel   *channel;
336     struct ibv_qp *qp;                      /* queue pair */
337     struct ibv_comp_channel *comp_channel;  /* completion channel */
338     struct ibv_pd *pd;                      /* protection domain */
339     struct ibv_cq *cq;                      /* completion queue */
340
341     /*
342      * If a previous write failed (perhaps because of a failed
343      * memory registration, then do not attempt any future work
344      * and remember the error state.
345      */
346     int error_state;
347     int error_reported;
348
349     /*
350      * Description of ram blocks used throughout the code.
351      */
352     RDMALocalBlocks local_ram_blocks;
353     RDMADestBlock  *dest_blocks;
354
355     /*
356      * Migration on *destination* started.
357      * Then use coroutine yield function.
358      * Source runs in a thread, so we don't care.
359      */
360     int migration_started_on_destination;
361
362     int total_registrations;
363     int total_writes;
364
365     int unregister_current, unregister_next;
366     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
367
368     GHashTable *blockmap;
369 } RDMAContext;
370
371 /*
372  * Interface to the rest of the migration call stack.
373  */
374 typedef struct QEMUFileRDMA {
375     RDMAContext *rdma;
376     size_t len;
377     void *file;
378 } QEMUFileRDMA;
379
380 /*
381  * Main structure for IB Send/Recv control messages.
382  * This gets prepended at the beginning of every Send/Recv.
383  */
384 typedef struct QEMU_PACKED {
385     uint32_t len;     /* Total length of data portion */
386     uint32_t type;    /* which control command to perform */
387     uint32_t repeat;  /* number of commands in data portion of same type */
388     uint32_t padding;
389 } RDMAControlHeader;
390
391 static void control_to_network(RDMAControlHeader *control)
392 {
393     control->type = htonl(control->type);
394     control->len = htonl(control->len);
395     control->repeat = htonl(control->repeat);
396 }
397
398 static void network_to_control(RDMAControlHeader *control)
399 {
400     control->type = ntohl(control->type);
401     control->len = ntohl(control->len);
402     control->repeat = ntohl(control->repeat);
403 }
404
405 /*
406  * Register a single Chunk.
407  * Information sent by the source VM to inform the dest
408  * to register an single chunk of memory before we can perform
409  * the actual RDMA operation.
410  */
411 typedef struct QEMU_PACKED {
412     union QEMU_PACKED {
413         uint64_t current_addr;  /* offset into the ramblock of the chunk */
414         uint64_t chunk;         /* chunk to lookup if unregistering */
415     } key;
416     uint32_t current_index; /* which ramblock the chunk belongs to */
417     uint32_t padding;
418     uint64_t chunks;            /* how many sequential chunks to register */
419 } RDMARegister;
420
421 static void register_to_network(RDMARegister *reg)
422 {
423     reg->key.current_addr = htonll(reg->key.current_addr);
424     reg->current_index = htonl(reg->current_index);
425     reg->chunks = htonll(reg->chunks);
426 }
427
428 static void network_to_register(RDMARegister *reg)
429 {
430     reg->key.current_addr = ntohll(reg->key.current_addr);
431     reg->current_index = ntohl(reg->current_index);
432     reg->chunks = ntohll(reg->chunks);
433 }
434
435 typedef struct QEMU_PACKED {
436     uint32_t value;     /* if zero, we will madvise() */
437     uint32_t block_idx; /* which ram block index */
438     uint64_t offset;    /* where in the remote ramblock this chunk */
439     uint64_t length;    /* length of the chunk */
440 } RDMACompress;
441
442 static void compress_to_network(RDMACompress *comp)
443 {
444     comp->value = htonl(comp->value);
445     comp->block_idx = htonl(comp->block_idx);
446     comp->offset = htonll(comp->offset);
447     comp->length = htonll(comp->length);
448 }
449
450 static void network_to_compress(RDMACompress *comp)
451 {
452     comp->value = ntohl(comp->value);
453     comp->block_idx = ntohl(comp->block_idx);
454     comp->offset = ntohll(comp->offset);
455     comp->length = ntohll(comp->length);
456 }
457
458 /*
459  * The result of the dest's memory registration produces an "rkey"
460  * which the source VM must reference in order to perform
461  * the RDMA operation.
462  */
463 typedef struct QEMU_PACKED {
464     uint32_t rkey;
465     uint32_t padding;
466     uint64_t host_addr;
467 } RDMARegisterResult;
468
469 static void result_to_network(RDMARegisterResult *result)
470 {
471     result->rkey = htonl(result->rkey);
472     result->host_addr = htonll(result->host_addr);
473 };
474
475 static void network_to_result(RDMARegisterResult *result)
476 {
477     result->rkey = ntohl(result->rkey);
478     result->host_addr = ntohll(result->host_addr);
479 };
480
481 const char *print_wrid(int wrid);
482 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
483                                    uint8_t *data, RDMAControlHeader *resp,
484                                    int *resp_idx,
485                                    int (*callback)(RDMAContext *rdma));
486
487 static inline uint64_t ram_chunk_index(const uint8_t *start,
488                                        const uint8_t *host)
489 {
490     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
491 }
492
493 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
494                                        uint64_t i)
495 {
496     return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
497                                   (i << RDMA_REG_CHUNK_SHIFT));
498 }
499
500 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
501                                      uint64_t i)
502 {
503     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
504                                          (1UL << RDMA_REG_CHUNK_SHIFT);
505
506     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
507         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
508     }
509
510     return result;
511 }
512
513 static int rdma_add_block(RDMAContext *rdma, void *host_addr,
514                          ram_addr_t block_offset, uint64_t length)
515 {
516     RDMALocalBlocks *local = &rdma->local_ram_blocks;
517     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
518         (void *)(uintptr_t)block_offset);
519     RDMALocalBlock *old = local->block;
520
521     assert(block == NULL);
522
523     local->block = g_malloc0(sizeof(RDMALocalBlock) * (local->nb_blocks + 1));
524
525     if (local->nb_blocks) {
526         int x;
527
528         for (x = 0; x < local->nb_blocks; x++) {
529             g_hash_table_remove(rdma->blockmap,
530                                 (void *)(uintptr_t)old[x].offset);
531             g_hash_table_insert(rdma->blockmap,
532                                 (void *)(uintptr_t)old[x].offset,
533                                 &local->block[x]);
534         }
535         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
536         g_free(old);
537     }
538
539     block = &local->block[local->nb_blocks];
540
541     block->local_host_addr = host_addr;
542     block->offset = block_offset;
543     block->length = length;
544     block->index = local->nb_blocks;
545     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
546     block->transit_bitmap = bitmap_new(block->nb_chunks);
547     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
548     block->unregister_bitmap = bitmap_new(block->nb_chunks);
549     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
550     block->remote_keys = g_malloc0(block->nb_chunks * sizeof(uint32_t));
551
552     block->is_ram_block = local->init ? false : true;
553
554     g_hash_table_insert(rdma->blockmap, (void *) block_offset, block);
555
556     trace_rdma_add_block(local->nb_blocks, (uintptr_t) block->local_host_addr,
557                          block->offset, block->length,
558                          (uintptr_t) (block->local_host_addr + block->length),
559                          BITS_TO_LONGS(block->nb_chunks) *
560                              sizeof(unsigned long) * 8,
561                          block->nb_chunks);
562
563     local->nb_blocks++;
564
565     return 0;
566 }
567
568 /*
569  * Memory regions need to be registered with the device and queue pairs setup
570  * in advanced before the migration starts. This tells us where the RAM blocks
571  * are so that we can register them individually.
572  */
573 static int qemu_rdma_init_one_block(const char *block_name, void *host_addr,
574     ram_addr_t block_offset, ram_addr_t length, void *opaque)
575 {
576     return rdma_add_block(opaque, host_addr, block_offset, length);
577 }
578
579 /*
580  * Identify the RAMBlocks and their quantity. They will be references to
581  * identify chunk boundaries inside each RAMBlock and also be referenced
582  * during dynamic page registration.
583  */
584 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
585 {
586     RDMALocalBlocks *local = &rdma->local_ram_blocks;
587
588     assert(rdma->blockmap == NULL);
589     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
590     memset(local, 0, sizeof *local);
591     qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
592     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
593     rdma->dest_blocks = (RDMADestBlock *) g_malloc0(sizeof(RDMADestBlock) *
594                         rdma->local_ram_blocks.nb_blocks);
595     local->init = true;
596     return 0;
597 }
598
599 static int rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
600 {
601     RDMALocalBlocks *local = &rdma->local_ram_blocks;
602     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
603         (void *) block_offset);
604     RDMALocalBlock *old = local->block;
605     int x;
606
607     assert(block);
608
609     if (block->pmr) {
610         int j;
611
612         for (j = 0; j < block->nb_chunks; j++) {
613             if (!block->pmr[j]) {
614                 continue;
615             }
616             ibv_dereg_mr(block->pmr[j]);
617             rdma->total_registrations--;
618         }
619         g_free(block->pmr);
620         block->pmr = NULL;
621     }
622
623     if (block->mr) {
624         ibv_dereg_mr(block->mr);
625         rdma->total_registrations--;
626         block->mr = NULL;
627     }
628
629     g_free(block->transit_bitmap);
630     block->transit_bitmap = NULL;
631
632     g_free(block->unregister_bitmap);
633     block->unregister_bitmap = NULL;
634
635     g_free(block->remote_keys);
636     block->remote_keys = NULL;
637
638     for (x = 0; x < local->nb_blocks; x++) {
639         g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)old[x].offset);
640     }
641
642     if (local->nb_blocks > 1) {
643
644         local->block = g_malloc0(sizeof(RDMALocalBlock) *
645                                     (local->nb_blocks - 1));
646
647         if (block->index) {
648             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
649         }
650
651         if (block->index < (local->nb_blocks - 1)) {
652             memcpy(local->block + block->index, old + (block->index + 1),
653                 sizeof(RDMALocalBlock) *
654                     (local->nb_blocks - (block->index + 1)));
655         }
656     } else {
657         assert(block == local->block);
658         local->block = NULL;
659     }
660
661     trace_rdma_delete_block(local->nb_blocks,
662                            (uintptr_t)block->local_host_addr,
663                            block->offset, block->length,
664                             (uintptr_t)(block->local_host_addr + block->length),
665                            BITS_TO_LONGS(block->nb_chunks) *
666                                sizeof(unsigned long) * 8, block->nb_chunks);
667
668     g_free(old);
669
670     local->nb_blocks--;
671
672     if (local->nb_blocks) {
673         for (x = 0; x < local->nb_blocks; x++) {
674             g_hash_table_insert(rdma->blockmap,
675                                 (void *)(uintptr_t)local->block[x].offset,
676                                 &local->block[x]);
677         }
678     }
679
680     return 0;
681 }
682
683 /*
684  * Put in the log file which RDMA device was opened and the details
685  * associated with that device.
686  */
687 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
688 {
689     struct ibv_port_attr port;
690
691     if (ibv_query_port(verbs, 1, &port)) {
692         error_report("Failed to query port information");
693         return;
694     }
695
696     printf("%s RDMA Device opened: kernel name %s "
697            "uverbs device name %s, "
698            "infiniband_verbs class device path %s, "
699            "infiniband class device path %s, "
700            "transport: (%d) %s\n",
701                 who,
702                 verbs->device->name,
703                 verbs->device->dev_name,
704                 verbs->device->dev_path,
705                 verbs->device->ibdev_path,
706                 port.link_layer,
707                 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
708                  ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
709                     ? "Ethernet" : "Unknown"));
710 }
711
712 /*
713  * Put in the log file the RDMA gid addressing information,
714  * useful for folks who have trouble understanding the
715  * RDMA device hierarchy in the kernel.
716  */
717 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
718 {
719     char sgid[33];
720     char dgid[33];
721     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
722     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
723     trace_qemu_rdma_dump_gid(who, sgid, dgid);
724 }
725
726 /*
727  * As of now, IPv6 over RoCE / iWARP is not supported by linux.
728  * We will try the next addrinfo struct, and fail if there are
729  * no other valid addresses to bind against.
730  *
731  * If user is listening on '[::]', then we will not have a opened a device
732  * yet and have no way of verifying if the device is RoCE or not.
733  *
734  * In this case, the source VM will throw an error for ALL types of
735  * connections (both IPv4 and IPv6) if the destination machine does not have
736  * a regular infiniband network available for use.
737  *
738  * The only way to guarantee that an error is thrown for broken kernels is
739  * for the management software to choose a *specific* interface at bind time
740  * and validate what time of hardware it is.
741  *
742  * Unfortunately, this puts the user in a fix:
743  *
744  *  If the source VM connects with an IPv4 address without knowing that the
745  *  destination has bound to '[::]' the migration will unconditionally fail
746  *  unless the management software is explicitly listening on the the IPv4
747  *  address while using a RoCE-based device.
748  *
749  *  If the source VM connects with an IPv6 address, then we're OK because we can
750  *  throw an error on the source (and similarly on the destination).
751  *
752  *  But in mixed environments, this will be broken for a while until it is fixed
753  *  inside linux.
754  *
755  * We do provide a *tiny* bit of help in this function: We can list all of the
756  * devices in the system and check to see if all the devices are RoCE or
757  * Infiniband.
758  *
759  * If we detect that we have a *pure* RoCE environment, then we can safely
760  * thrown an error even if the management software has specified '[::]' as the
761  * bind address.
762  *
763  * However, if there is are multiple hetergeneous devices, then we cannot make
764  * this assumption and the user just has to be sure they know what they are
765  * doing.
766  *
767  * Patches are being reviewed on linux-rdma.
768  */
769 static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs)
770 {
771     struct ibv_port_attr port_attr;
772
773     /* This bug only exists in linux, to our knowledge. */
774 #ifdef CONFIG_LINUX
775
776     /*
777      * Verbs are only NULL if management has bound to '[::]'.
778      *
779      * Let's iterate through all the devices and see if there any pure IB
780      * devices (non-ethernet).
781      *
782      * If not, then we can safely proceed with the migration.
783      * Otherwise, there are no guarantees until the bug is fixed in linux.
784      */
785     if (!verbs) {
786         int num_devices, x;
787         struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
788         bool roce_found = false;
789         bool ib_found = false;
790
791         for (x = 0; x < num_devices; x++) {
792             verbs = ibv_open_device(dev_list[x]);
793             if (!verbs) {
794                 if (errno == EPERM) {
795                     continue;
796                 } else {
797                     return -EINVAL;
798                 }
799             }
800
801             if (ibv_query_port(verbs, 1, &port_attr)) {
802                 ibv_close_device(verbs);
803                 ERROR(errp, "Could not query initial IB port");
804                 return -EINVAL;
805             }
806
807             if (port_attr.link_layer == IBV_LINK_LAYER_INFINIBAND) {
808                 ib_found = true;
809             } else if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
810                 roce_found = true;
811             }
812
813             ibv_close_device(verbs);
814
815         }
816
817         if (roce_found) {
818             if (ib_found) {
819                 fprintf(stderr, "WARN: migrations may fail:"
820                                 " IPv6 over RoCE / iWARP in linux"
821                                 " is broken. But since you appear to have a"
822                                 " mixed RoCE / IB environment, be sure to only"
823                                 " migrate over the IB fabric until the kernel "
824                                 " fixes the bug.\n");
825             } else {
826                 ERROR(errp, "You only have RoCE / iWARP devices in your systems"
827                             " and your management software has specified '[::]'"
828                             ", but IPv6 over RoCE / iWARP is not supported in Linux.");
829                 return -ENONET;
830             }
831         }
832
833         return 0;
834     }
835
836     /*
837      * If we have a verbs context, that means that some other than '[::]' was
838      * used by the management software for binding. In which case we can
839      * actually warn the user about a potentially broken kernel.
840      */
841
842     /* IB ports start with 1, not 0 */
843     if (ibv_query_port(verbs, 1, &port_attr)) {
844         ERROR(errp, "Could not query initial IB port");
845         return -EINVAL;
846     }
847
848     if (port_attr.link_layer == IBV_LINK_LAYER_ETHERNET) {
849         ERROR(errp, "Linux kernel's RoCE / iWARP does not support IPv6 "
850                     "(but patches on linux-rdma in progress)");
851         return -ENONET;
852     }
853
854 #endif
855
856     return 0;
857 }
858
859 /*
860  * Figure out which RDMA device corresponds to the requested IP hostname
861  * Also create the initial connection manager identifiers for opening
862  * the connection.
863  */
864 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
865 {
866     int ret;
867     struct rdma_addrinfo *res;
868     char port_str[16];
869     struct rdma_cm_event *cm_event;
870     char ip[40] = "unknown";
871     struct rdma_addrinfo *e;
872
873     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
874         ERROR(errp, "RDMA hostname has not been set");
875         return -EINVAL;
876     }
877
878     /* create CM channel */
879     rdma->channel = rdma_create_event_channel();
880     if (!rdma->channel) {
881         ERROR(errp, "could not create CM channel");
882         return -EINVAL;
883     }
884
885     /* create CM id */
886     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
887     if (ret) {
888         ERROR(errp, "could not create channel id");
889         goto err_resolve_create_id;
890     }
891
892     snprintf(port_str, 16, "%d", rdma->port);
893     port_str[15] = '\0';
894
895     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
896     if (ret < 0) {
897         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
898         goto err_resolve_get_addr;
899     }
900
901     for (e = res; e != NULL; e = e->ai_next) {
902         inet_ntop(e->ai_family,
903             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
904         trace_qemu_rdma_resolve_host_trying(rdma->host, ip);
905
906         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_dst_addr,
907                 RDMA_RESOLVE_TIMEOUT_MS);
908         if (!ret) {
909             if (e->ai_family == AF_INET6) {
910                 ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs);
911                 if (ret) {
912                     continue;
913                 }
914             }
915             goto route;
916         }
917     }
918
919     ERROR(errp, "could not resolve address %s", rdma->host);
920     goto err_resolve_get_addr;
921
922 route:
923     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
924
925     ret = rdma_get_cm_event(rdma->channel, &cm_event);
926     if (ret) {
927         ERROR(errp, "could not perform event_addr_resolved");
928         goto err_resolve_get_addr;
929     }
930
931     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
932         ERROR(errp, "result not equal to event_addr_resolved %s",
933                 rdma_event_str(cm_event->event));
934         perror("rdma_resolve_addr");
935         rdma_ack_cm_event(cm_event);
936         ret = -EINVAL;
937         goto err_resolve_get_addr;
938     }
939     rdma_ack_cm_event(cm_event);
940
941     /* resolve route */
942     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
943     if (ret) {
944         ERROR(errp, "could not resolve rdma route");
945         goto err_resolve_get_addr;
946     }
947
948     ret = rdma_get_cm_event(rdma->channel, &cm_event);
949     if (ret) {
950         ERROR(errp, "could not perform event_route_resolved");
951         goto err_resolve_get_addr;
952     }
953     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
954         ERROR(errp, "result not equal to event_route_resolved: %s",
955                         rdma_event_str(cm_event->event));
956         rdma_ack_cm_event(cm_event);
957         ret = -EINVAL;
958         goto err_resolve_get_addr;
959     }
960     rdma_ack_cm_event(cm_event);
961     rdma->verbs = rdma->cm_id->verbs;
962     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
963     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
964     return 0;
965
966 err_resolve_get_addr:
967     rdma_destroy_id(rdma->cm_id);
968     rdma->cm_id = NULL;
969 err_resolve_create_id:
970     rdma_destroy_event_channel(rdma->channel);
971     rdma->channel = NULL;
972     return ret;
973 }
974
975 /*
976  * Create protection domain and completion queues
977  */
978 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
979 {
980     /* allocate pd */
981     rdma->pd = ibv_alloc_pd(rdma->verbs);
982     if (!rdma->pd) {
983         error_report("failed to allocate protection domain");
984         return -1;
985     }
986
987     /* create completion channel */
988     rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
989     if (!rdma->comp_channel) {
990         error_report("failed to allocate completion channel");
991         goto err_alloc_pd_cq;
992     }
993
994     /*
995      * Completion queue can be filled by both read and write work requests,
996      * so must reflect the sum of both possible queue sizes.
997      */
998     rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
999             NULL, rdma->comp_channel, 0);
1000     if (!rdma->cq) {
1001         error_report("failed to allocate completion queue");
1002         goto err_alloc_pd_cq;
1003     }
1004
1005     return 0;
1006
1007 err_alloc_pd_cq:
1008     if (rdma->pd) {
1009         ibv_dealloc_pd(rdma->pd);
1010     }
1011     if (rdma->comp_channel) {
1012         ibv_destroy_comp_channel(rdma->comp_channel);
1013     }
1014     rdma->pd = NULL;
1015     rdma->comp_channel = NULL;
1016     return -1;
1017
1018 }
1019
1020 /*
1021  * Create queue pairs.
1022  */
1023 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
1024 {
1025     struct ibv_qp_init_attr attr = { 0 };
1026     int ret;
1027
1028     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
1029     attr.cap.max_recv_wr = 3;
1030     attr.cap.max_send_sge = 1;
1031     attr.cap.max_recv_sge = 1;
1032     attr.send_cq = rdma->cq;
1033     attr.recv_cq = rdma->cq;
1034     attr.qp_type = IBV_QPT_RC;
1035
1036     ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
1037     if (ret) {
1038         return -1;
1039     }
1040
1041     rdma->qp = rdma->cm_id->qp;
1042     return 0;
1043 }
1044
1045 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
1046 {
1047     int i;
1048     RDMALocalBlocks *local = &rdma->local_ram_blocks;
1049
1050     for (i = 0; i < local->nb_blocks; i++) {
1051         local->block[i].mr =
1052             ibv_reg_mr(rdma->pd,
1053                     local->block[i].local_host_addr,
1054                     local->block[i].length,
1055                     IBV_ACCESS_LOCAL_WRITE |
1056                     IBV_ACCESS_REMOTE_WRITE
1057                     );
1058         if (!local->block[i].mr) {
1059             perror("Failed to register local dest ram block!\n");
1060             break;
1061         }
1062         rdma->total_registrations++;
1063     }
1064
1065     if (i >= local->nb_blocks) {
1066         return 0;
1067     }
1068
1069     for (i--; i >= 0; i--) {
1070         ibv_dereg_mr(local->block[i].mr);
1071         rdma->total_registrations--;
1072     }
1073
1074     return -1;
1075
1076 }
1077
1078 /*
1079  * Find the ram block that corresponds to the page requested to be
1080  * transmitted by QEMU.
1081  *
1082  * Once the block is found, also identify which 'chunk' within that
1083  * block that the page belongs to.
1084  *
1085  * This search cannot fail or the migration will fail.
1086  */
1087 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
1088                                       uintptr_t block_offset,
1089                                       uint64_t offset,
1090                                       uint64_t length,
1091                                       uint64_t *block_index,
1092                                       uint64_t *chunk_index)
1093 {
1094     uint64_t current_addr = block_offset + offset;
1095     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
1096                                                 (void *) block_offset);
1097     assert(block);
1098     assert(current_addr >= block->offset);
1099     assert((current_addr + length) <= (block->offset + block->length));
1100
1101     *block_index = block->index;
1102     *chunk_index = ram_chunk_index(block->local_host_addr,
1103                 block->local_host_addr + (current_addr - block->offset));
1104
1105     return 0;
1106 }
1107
1108 /*
1109  * Register a chunk with IB. If the chunk was already registered
1110  * previously, then skip.
1111  *
1112  * Also return the keys associated with the registration needed
1113  * to perform the actual RDMA operation.
1114  */
1115 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
1116         RDMALocalBlock *block, uintptr_t host_addr,
1117         uint32_t *lkey, uint32_t *rkey, int chunk,
1118         uint8_t *chunk_start, uint8_t *chunk_end)
1119 {
1120     if (block->mr) {
1121         if (lkey) {
1122             *lkey = block->mr->lkey;
1123         }
1124         if (rkey) {
1125             *rkey = block->mr->rkey;
1126         }
1127         return 0;
1128     }
1129
1130     /* allocate memory to store chunk MRs */
1131     if (!block->pmr) {
1132         block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *));
1133     }
1134
1135     /*
1136      * If 'rkey', then we're the destination, so grant access to the source.
1137      *
1138      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1139      */
1140     if (!block->pmr[chunk]) {
1141         uint64_t len = chunk_end - chunk_start;
1142
1143         trace_qemu_rdma_register_and_get_keys(len, chunk_start);
1144
1145         block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1146                 chunk_start, len,
1147                 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1148                         IBV_ACCESS_REMOTE_WRITE) : 0));
1149
1150         if (!block->pmr[chunk]) {
1151             perror("Failed to register chunk!");
1152             fprintf(stderr, "Chunk details: block: %d chunk index %d"
1153                             " start %" PRIuPTR " end %" PRIuPTR
1154                             " host %" PRIuPTR
1155                             " local %" PRIuPTR " registrations: %d\n",
1156                             block->index, chunk, (uintptr_t)chunk_start,
1157                             (uintptr_t)chunk_end, host_addr,
1158                             (uintptr_t)block->local_host_addr,
1159                             rdma->total_registrations);
1160             return -1;
1161         }
1162         rdma->total_registrations++;
1163     }
1164
1165     if (lkey) {
1166         *lkey = block->pmr[chunk]->lkey;
1167     }
1168     if (rkey) {
1169         *rkey = block->pmr[chunk]->rkey;
1170     }
1171     return 0;
1172 }
1173
1174 /*
1175  * Register (at connection time) the memory used for control
1176  * channel messages.
1177  */
1178 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1179 {
1180     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1181             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1182             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1183     if (rdma->wr_data[idx].control_mr) {
1184         rdma->total_registrations++;
1185         return 0;
1186     }
1187     error_report("qemu_rdma_reg_control failed");
1188     return -1;
1189 }
1190
1191 const char *print_wrid(int wrid)
1192 {
1193     if (wrid >= RDMA_WRID_RECV_CONTROL) {
1194         return wrid_desc[RDMA_WRID_RECV_CONTROL];
1195     }
1196     return wrid_desc[wrid];
1197 }
1198
1199 /*
1200  * RDMA requires memory registration (mlock/pinning), but this is not good for
1201  * overcommitment.
1202  *
1203  * In preparation for the future where LRU information or workload-specific
1204  * writable writable working set memory access behavior is available to QEMU
1205  * it would be nice to have in place the ability to UN-register/UN-pin
1206  * particular memory regions from the RDMA hardware when it is determine that
1207  * those regions of memory will likely not be accessed again in the near future.
1208  *
1209  * While we do not yet have such information right now, the following
1210  * compile-time option allows us to perform a non-optimized version of this
1211  * behavior.
1212  *
1213  * By uncommenting this option, you will cause *all* RDMA transfers to be
1214  * unregistered immediately after the transfer completes on both sides of the
1215  * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1216  *
1217  * This will have a terrible impact on migration performance, so until future
1218  * workload information or LRU information is available, do not attempt to use
1219  * this feature except for basic testing.
1220  */
1221 //#define RDMA_UNREGISTRATION_EXAMPLE
1222
1223 /*
1224  * Perform a non-optimized memory unregistration after every transfer
1225  * for demonsration purposes, only if pin-all is not requested.
1226  *
1227  * Potential optimizations:
1228  * 1. Start a new thread to run this function continuously
1229         - for bit clearing
1230         - and for receipt of unregister messages
1231  * 2. Use an LRU.
1232  * 3. Use workload hints.
1233  */
1234 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1235 {
1236     while (rdma->unregistrations[rdma->unregister_current]) {
1237         int ret;
1238         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1239         uint64_t chunk =
1240             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1241         uint64_t index =
1242             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1243         RDMALocalBlock *block =
1244             &(rdma->local_ram_blocks.block[index]);
1245         RDMARegister reg = { .current_index = index };
1246         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1247                                  };
1248         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1249                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1250                                    .repeat = 1,
1251                                  };
1252
1253         trace_qemu_rdma_unregister_waiting_proc(chunk,
1254                                                 rdma->unregister_current);
1255
1256         rdma->unregistrations[rdma->unregister_current] = 0;
1257         rdma->unregister_current++;
1258
1259         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1260             rdma->unregister_current = 0;
1261         }
1262
1263
1264         /*
1265          * Unregistration is speculative (because migration is single-threaded
1266          * and we cannot break the protocol's inifinband message ordering).
1267          * Thus, if the memory is currently being used for transmission,
1268          * then abort the attempt to unregister and try again
1269          * later the next time a completion is received for this memory.
1270          */
1271         clear_bit(chunk, block->unregister_bitmap);
1272
1273         if (test_bit(chunk, block->transit_bitmap)) {
1274             trace_qemu_rdma_unregister_waiting_inflight(chunk);
1275             continue;
1276         }
1277
1278         trace_qemu_rdma_unregister_waiting_send(chunk);
1279
1280         ret = ibv_dereg_mr(block->pmr[chunk]);
1281         block->pmr[chunk] = NULL;
1282         block->remote_keys[chunk] = 0;
1283
1284         if (ret != 0) {
1285             perror("unregistration chunk failed");
1286             return -ret;
1287         }
1288         rdma->total_registrations--;
1289
1290         reg.key.chunk = chunk;
1291         register_to_network(&reg);
1292         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1293                                 &resp, NULL, NULL);
1294         if (ret < 0) {
1295             return ret;
1296         }
1297
1298         trace_qemu_rdma_unregister_waiting_complete(chunk);
1299     }
1300
1301     return 0;
1302 }
1303
1304 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1305                                          uint64_t chunk)
1306 {
1307     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1308
1309     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1310     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1311
1312     return result;
1313 }
1314
1315 /*
1316  * Set bit for unregistration in the next iteration.
1317  * We cannot transmit right here, but will unpin later.
1318  */
1319 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1320                                         uint64_t chunk, uint64_t wr_id)
1321 {
1322     if (rdma->unregistrations[rdma->unregister_next] != 0) {
1323         error_report("rdma migration: queue is full");
1324     } else {
1325         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1326
1327         if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1328             trace_qemu_rdma_signal_unregister_append(chunk,
1329                                                      rdma->unregister_next);
1330
1331             rdma->unregistrations[rdma->unregister_next++] =
1332                     qemu_rdma_make_wrid(wr_id, index, chunk);
1333
1334             if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1335                 rdma->unregister_next = 0;
1336             }
1337         } else {
1338             trace_qemu_rdma_signal_unregister_already(chunk);
1339         }
1340     }
1341 }
1342
1343 /*
1344  * Consult the connection manager to see a work request
1345  * (of any kind) has completed.
1346  * Return the work request ID that completed.
1347  */
1348 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1349                                uint32_t *byte_len)
1350 {
1351     int ret;
1352     struct ibv_wc wc;
1353     uint64_t wr_id;
1354
1355     ret = ibv_poll_cq(rdma->cq, 1, &wc);
1356
1357     if (!ret) {
1358         *wr_id_out = RDMA_WRID_NONE;
1359         return 0;
1360     }
1361
1362     if (ret < 0) {
1363         error_report("ibv_poll_cq return %d", ret);
1364         return ret;
1365     }
1366
1367     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1368
1369     if (wc.status != IBV_WC_SUCCESS) {
1370         fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1371                         wc.status, ibv_wc_status_str(wc.status));
1372         fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1373
1374         return -1;
1375     }
1376
1377     if (rdma->control_ready_expected &&
1378         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1379         trace_qemu_rdma_poll_recv(wrid_desc[RDMA_WRID_RECV_CONTROL],
1380                   wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1381         rdma->control_ready_expected = 0;
1382     }
1383
1384     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1385         uint64_t chunk =
1386             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1387         uint64_t index =
1388             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1389         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1390
1391         trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
1392                                    index, chunk, block->local_host_addr,
1393                                    (void *)(uintptr_t)block->remote_host_addr);
1394
1395         clear_bit(chunk, block->transit_bitmap);
1396
1397         if (rdma->nb_sent > 0) {
1398             rdma->nb_sent--;
1399         }
1400
1401         if (!rdma->pin_all) {
1402             /*
1403              * FYI: If one wanted to signal a specific chunk to be unregistered
1404              * using LRU or workload-specific information, this is the function
1405              * you would call to do so. That chunk would then get asynchronously
1406              * unregistered later.
1407              */
1408 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1409             qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1410 #endif
1411         }
1412     } else {
1413         trace_qemu_rdma_poll_other(print_wrid(wr_id), wr_id, rdma->nb_sent);
1414     }
1415
1416     *wr_id_out = wc.wr_id;
1417     if (byte_len) {
1418         *byte_len = wc.byte_len;
1419     }
1420
1421     return  0;
1422 }
1423
1424 /*
1425  * Block until the next work request has completed.
1426  *
1427  * First poll to see if a work request has already completed,
1428  * otherwise block.
1429  *
1430  * If we encounter completed work requests for IDs other than
1431  * the one we're interested in, then that's generally an error.
1432  *
1433  * The only exception is actual RDMA Write completions. These
1434  * completions only need to be recorded, but do not actually
1435  * need further processing.
1436  */
1437 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1438                                     uint32_t *byte_len)
1439 {
1440     int num_cq_events = 0, ret = 0;
1441     struct ibv_cq *cq;
1442     void *cq_ctx;
1443     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1444
1445     if (ibv_req_notify_cq(rdma->cq, 0)) {
1446         return -1;
1447     }
1448     /* poll cq first */
1449     while (wr_id != wrid_requested) {
1450         ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1451         if (ret < 0) {
1452             return ret;
1453         }
1454
1455         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1456
1457         if (wr_id == RDMA_WRID_NONE) {
1458             break;
1459         }
1460         if (wr_id != wrid_requested) {
1461             trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1462                        wrid_requested, print_wrid(wr_id), wr_id);
1463         }
1464     }
1465
1466     if (wr_id == wrid_requested) {
1467         return 0;
1468     }
1469
1470     while (1) {
1471         /*
1472          * Coroutine doesn't start until process_incoming_migration()
1473          * so don't yield unless we know we're running inside of a coroutine.
1474          */
1475         if (rdma->migration_started_on_destination) {
1476             yield_until_fd_readable(rdma->comp_channel->fd);
1477         }
1478
1479         if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
1480             perror("ibv_get_cq_event");
1481             goto err_block_for_wrid;
1482         }
1483
1484         num_cq_events++;
1485
1486         if (ibv_req_notify_cq(cq, 0)) {
1487             goto err_block_for_wrid;
1488         }
1489
1490         while (wr_id != wrid_requested) {
1491             ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1492             if (ret < 0) {
1493                 goto err_block_for_wrid;
1494             }
1495
1496             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1497
1498             if (wr_id == RDMA_WRID_NONE) {
1499                 break;
1500             }
1501             if (wr_id != wrid_requested) {
1502                 trace_qemu_rdma_block_for_wrid_miss(print_wrid(wrid_requested),
1503                                    wrid_requested, print_wrid(wr_id), wr_id);
1504             }
1505         }
1506
1507         if (wr_id == wrid_requested) {
1508             goto success_block_for_wrid;
1509         }
1510     }
1511
1512 success_block_for_wrid:
1513     if (num_cq_events) {
1514         ibv_ack_cq_events(cq, num_cq_events);
1515     }
1516     return 0;
1517
1518 err_block_for_wrid:
1519     if (num_cq_events) {
1520         ibv_ack_cq_events(cq, num_cq_events);
1521     }
1522     return ret;
1523 }
1524
1525 /*
1526  * Post a SEND message work request for the control channel
1527  * containing some data and block until the post completes.
1528  */
1529 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1530                                        RDMAControlHeader *head)
1531 {
1532     int ret = 0;
1533     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1534     struct ibv_send_wr *bad_wr;
1535     struct ibv_sge sge = {
1536                            .addr = (uintptr_t)(wr->control),
1537                            .length = head->len + sizeof(RDMAControlHeader),
1538                            .lkey = wr->control_mr->lkey,
1539                          };
1540     struct ibv_send_wr send_wr = {
1541                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1542                                    .opcode = IBV_WR_SEND,
1543                                    .send_flags = IBV_SEND_SIGNALED,
1544                                    .sg_list = &sge,
1545                                    .num_sge = 1,
1546                                 };
1547
1548     trace_qemu_rdma_post_send_control(control_desc[head->type]);
1549
1550     /*
1551      * We don't actually need to do a memcpy() in here if we used
1552      * the "sge" properly, but since we're only sending control messages
1553      * (not RAM in a performance-critical path), then its OK for now.
1554      *
1555      * The copy makes the RDMAControlHeader simpler to manipulate
1556      * for the time being.
1557      */
1558     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1559     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1560     control_to_network((void *) wr->control);
1561
1562     if (buf) {
1563         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1564     }
1565
1566
1567     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1568
1569     if (ret > 0) {
1570         error_report("Failed to use post IB SEND for control");
1571         return -ret;
1572     }
1573
1574     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1575     if (ret < 0) {
1576         error_report("rdma migration: send polling control error");
1577     }
1578
1579     return ret;
1580 }
1581
1582 /*
1583  * Post a RECV work request in anticipation of some future receipt
1584  * of data on the control channel.
1585  */
1586 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1587 {
1588     struct ibv_recv_wr *bad_wr;
1589     struct ibv_sge sge = {
1590                             .addr = (uintptr_t)(rdma->wr_data[idx].control),
1591                             .length = RDMA_CONTROL_MAX_BUFFER,
1592                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1593                          };
1594
1595     struct ibv_recv_wr recv_wr = {
1596                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1597                                     .sg_list = &sge,
1598                                     .num_sge = 1,
1599                                  };
1600
1601
1602     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1603         return -1;
1604     }
1605
1606     return 0;
1607 }
1608
1609 /*
1610  * Block and wait for a RECV control channel message to arrive.
1611  */
1612 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1613                 RDMAControlHeader *head, int expecting, int idx)
1614 {
1615     uint32_t byte_len;
1616     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1617                                        &byte_len);
1618
1619     if (ret < 0) {
1620         error_report("rdma migration: recv polling control error!");
1621         return ret;
1622     }
1623
1624     network_to_control((void *) rdma->wr_data[idx].control);
1625     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1626
1627     trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
1628
1629     if (expecting == RDMA_CONTROL_NONE) {
1630         trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
1631                                              head->type);
1632     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1633         error_report("Was expecting a %s (%d) control message"
1634                 ", but got: %s (%d), length: %d",
1635                 control_desc[expecting], expecting,
1636                 control_desc[head->type], head->type, head->len);
1637         return -EIO;
1638     }
1639     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1640         error_report("too long length: %d", head->len);
1641         return -EINVAL;
1642     }
1643     if (sizeof(*head) + head->len != byte_len) {
1644         error_report("Malformed length: %d byte_len %d", head->len, byte_len);
1645         return -EINVAL;
1646     }
1647
1648     return 0;
1649 }
1650
1651 /*
1652  * When a RECV work request has completed, the work request's
1653  * buffer is pointed at the header.
1654  *
1655  * This will advance the pointer to the data portion
1656  * of the control message of the work request's buffer that
1657  * was populated after the work request finished.
1658  */
1659 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1660                                   RDMAControlHeader *head)
1661 {
1662     rdma->wr_data[idx].control_len = head->len;
1663     rdma->wr_data[idx].control_curr =
1664         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1665 }
1666
1667 /*
1668  * This is an 'atomic' high-level operation to deliver a single, unified
1669  * control-channel message.
1670  *
1671  * Additionally, if the user is expecting some kind of reply to this message,
1672  * they can request a 'resp' response message be filled in by posting an
1673  * additional work request on behalf of the user and waiting for an additional
1674  * completion.
1675  *
1676  * The extra (optional) response is used during registration to us from having
1677  * to perform an *additional* exchange of message just to provide a response by
1678  * instead piggy-backing on the acknowledgement.
1679  */
1680 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1681                                    uint8_t *data, RDMAControlHeader *resp,
1682                                    int *resp_idx,
1683                                    int (*callback)(RDMAContext *rdma))
1684 {
1685     int ret = 0;
1686
1687     /*
1688      * Wait until the dest is ready before attempting to deliver the message
1689      * by waiting for a READY message.
1690      */
1691     if (rdma->control_ready_expected) {
1692         RDMAControlHeader resp;
1693         ret = qemu_rdma_exchange_get_response(rdma,
1694                                     &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1695         if (ret < 0) {
1696             return ret;
1697         }
1698     }
1699
1700     /*
1701      * If the user is expecting a response, post a WR in anticipation of it.
1702      */
1703     if (resp) {
1704         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1705         if (ret) {
1706             error_report("rdma migration: error posting"
1707                     " extra control recv for anticipated result!");
1708             return ret;
1709         }
1710     }
1711
1712     /*
1713      * Post a WR to replace the one we just consumed for the READY message.
1714      */
1715     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1716     if (ret) {
1717         error_report("rdma migration: error posting first control recv!");
1718         return ret;
1719     }
1720
1721     /*
1722      * Deliver the control message that was requested.
1723      */
1724     ret = qemu_rdma_post_send_control(rdma, data, head);
1725
1726     if (ret < 0) {
1727         error_report("Failed to send control buffer!");
1728         return ret;
1729     }
1730
1731     /*
1732      * If we're expecting a response, block and wait for it.
1733      */
1734     if (resp) {
1735         if (callback) {
1736             trace_qemu_rdma_exchange_send_issue_callback();
1737             ret = callback(rdma);
1738             if (ret < 0) {
1739                 return ret;
1740             }
1741         }
1742
1743         trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
1744         ret = qemu_rdma_exchange_get_response(rdma, resp,
1745                                               resp->type, RDMA_WRID_DATA);
1746
1747         if (ret < 0) {
1748             return ret;
1749         }
1750
1751         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1752         if (resp_idx) {
1753             *resp_idx = RDMA_WRID_DATA;
1754         }
1755         trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
1756     }
1757
1758     rdma->control_ready_expected = 1;
1759
1760     return 0;
1761 }
1762
1763 /*
1764  * This is an 'atomic' high-level operation to receive a single, unified
1765  * control-channel message.
1766  */
1767 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1768                                 int expecting)
1769 {
1770     RDMAControlHeader ready = {
1771                                 .len = 0,
1772                                 .type = RDMA_CONTROL_READY,
1773                                 .repeat = 1,
1774                               };
1775     int ret;
1776
1777     /*
1778      * Inform the source that we're ready to receive a message.
1779      */
1780     ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1781
1782     if (ret < 0) {
1783         error_report("Failed to send control buffer!");
1784         return ret;
1785     }
1786
1787     /*
1788      * Block and wait for the message.
1789      */
1790     ret = qemu_rdma_exchange_get_response(rdma, head,
1791                                           expecting, RDMA_WRID_READY);
1792
1793     if (ret < 0) {
1794         return ret;
1795     }
1796
1797     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1798
1799     /*
1800      * Post a new RECV work request to replace the one we just consumed.
1801      */
1802     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1803     if (ret) {
1804         error_report("rdma migration: error posting second control recv!");
1805         return ret;
1806     }
1807
1808     return 0;
1809 }
1810
1811 /*
1812  * Write an actual chunk of memory using RDMA.
1813  *
1814  * If we're using dynamic registration on the dest-side, we have to
1815  * send a registration command first.
1816  */
1817 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1818                                int current_index, uint64_t current_addr,
1819                                uint64_t length)
1820 {
1821     struct ibv_sge sge;
1822     struct ibv_send_wr send_wr = { 0 };
1823     struct ibv_send_wr *bad_wr;
1824     int reg_result_idx, ret, count = 0;
1825     uint64_t chunk, chunks;
1826     uint8_t *chunk_start, *chunk_end;
1827     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1828     RDMARegister reg;
1829     RDMARegisterResult *reg_result;
1830     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1831     RDMAControlHeader head = { .len = sizeof(RDMARegister),
1832                                .type = RDMA_CONTROL_REGISTER_REQUEST,
1833                                .repeat = 1,
1834                              };
1835
1836 retry:
1837     sge.addr = (uintptr_t)(block->local_host_addr +
1838                             (current_addr - block->offset));
1839     sge.length = length;
1840
1841     chunk = ram_chunk_index(block->local_host_addr,
1842                             (uint8_t *)(uintptr_t)sge.addr);
1843     chunk_start = ram_chunk_start(block, chunk);
1844
1845     if (block->is_ram_block) {
1846         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1847
1848         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1849             chunks--;
1850         }
1851     } else {
1852         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1853
1854         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1855             chunks--;
1856         }
1857     }
1858
1859     trace_qemu_rdma_write_one_top(chunks + 1,
1860                                   (chunks + 1) *
1861                                   (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1862
1863     chunk_end = ram_chunk_end(block, chunk + chunks);
1864
1865     if (!rdma->pin_all) {
1866 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1867         qemu_rdma_unregister_waiting(rdma);
1868 #endif
1869     }
1870
1871     while (test_bit(chunk, block->transit_bitmap)) {
1872         (void)count;
1873         trace_qemu_rdma_write_one_block(count++, current_index, chunk,
1874                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
1875
1876         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1877
1878         if (ret < 0) {
1879             error_report("Failed to Wait for previous write to complete "
1880                     "block %d chunk %" PRIu64
1881                     " current %" PRIu64 " len %" PRIu64 " %d",
1882                     current_index, chunk, sge.addr, length, rdma->nb_sent);
1883             return ret;
1884         }
1885     }
1886
1887     if (!rdma->pin_all || !block->is_ram_block) {
1888         if (!block->remote_keys[chunk]) {
1889             /*
1890              * This chunk has not yet been registered, so first check to see
1891              * if the entire chunk is zero. If so, tell the other size to
1892              * memset() + madvise() the entire chunk without RDMA.
1893              */
1894
1895             if (can_use_buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr,
1896                                                    length)
1897                    && buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr,
1898                                                     length) == length) {
1899                 RDMACompress comp = {
1900                                         .offset = current_addr,
1901                                         .value = 0,
1902                                         .block_idx = current_index,
1903                                         .length = length,
1904                                     };
1905
1906                 head.len = sizeof(comp);
1907                 head.type = RDMA_CONTROL_COMPRESS;
1908
1909                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
1910                                                current_index, current_addr);
1911
1912                 compress_to_network(&comp);
1913                 ret = qemu_rdma_exchange_send(rdma, &head,
1914                                 (uint8_t *) &comp, NULL, NULL, NULL);
1915
1916                 if (ret < 0) {
1917                     return -EIO;
1918                 }
1919
1920                 acct_update_position(f, sge.length, true);
1921
1922                 return 1;
1923             }
1924
1925             /*
1926              * Otherwise, tell other side to register.
1927              */
1928             reg.current_index = current_index;
1929             if (block->is_ram_block) {
1930                 reg.key.current_addr = current_addr;
1931             } else {
1932                 reg.key.chunk = chunk;
1933             }
1934             reg.chunks = chunks;
1935
1936             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
1937                                               current_addr);
1938
1939             register_to_network(&reg);
1940             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1941                                     &resp, &reg_result_idx, NULL);
1942             if (ret < 0) {
1943                 return ret;
1944             }
1945
1946             /* try to overlap this single registration with the one we sent. */
1947             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1948                                                 &sge.lkey, NULL, chunk,
1949                                                 chunk_start, chunk_end)) {
1950                 error_report("cannot get lkey");
1951                 return -EINVAL;
1952             }
1953
1954             reg_result = (RDMARegisterResult *)
1955                     rdma->wr_data[reg_result_idx].control_curr;
1956
1957             network_to_result(reg_result);
1958
1959             trace_qemu_rdma_write_one_recvregres(block->remote_keys[chunk],
1960                                                  reg_result->rkey, chunk);
1961
1962             block->remote_keys[chunk] = reg_result->rkey;
1963             block->remote_host_addr = reg_result->host_addr;
1964         } else {
1965             /* already registered before */
1966             if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1967                                                 &sge.lkey, NULL, chunk,
1968                                                 chunk_start, chunk_end)) {
1969                 error_report("cannot get lkey!");
1970                 return -EINVAL;
1971             }
1972         }
1973
1974         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
1975     } else {
1976         send_wr.wr.rdma.rkey = block->remote_rkey;
1977
1978         if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
1979                                                      &sge.lkey, NULL, chunk,
1980                                                      chunk_start, chunk_end)) {
1981             error_report("cannot get lkey!");
1982             return -EINVAL;
1983         }
1984     }
1985
1986     /*
1987      * Encode the ram block index and chunk within this wrid.
1988      * We will use this information at the time of completion
1989      * to figure out which bitmap to check against and then which
1990      * chunk in the bitmap to look for.
1991      */
1992     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
1993                                         current_index, chunk);
1994
1995     send_wr.opcode = IBV_WR_RDMA_WRITE;
1996     send_wr.send_flags = IBV_SEND_SIGNALED;
1997     send_wr.sg_list = &sge;
1998     send_wr.num_sge = 1;
1999     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
2000                                 (current_addr - block->offset);
2001
2002     trace_qemu_rdma_write_one_post(chunk, sge.addr, send_wr.wr.rdma.remote_addr,
2003                                    sge.length);
2004
2005     /*
2006      * ibv_post_send() does not return negative error numbers,
2007      * per the specification they are positive - no idea why.
2008      */
2009     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
2010
2011     if (ret == ENOMEM) {
2012         trace_qemu_rdma_write_one_queue_full();
2013         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2014         if (ret < 0) {
2015             error_report("rdma migration: failed to make "
2016                          "room in full send queue! %d", ret);
2017             return ret;
2018         }
2019
2020         goto retry;
2021
2022     } else if (ret > 0) {
2023         perror("rdma migration: post rdma write failed");
2024         return -ret;
2025     }
2026
2027     set_bit(chunk, block->transit_bitmap);
2028     acct_update_position(f, sge.length, false);
2029     rdma->total_writes++;
2030
2031     return 0;
2032 }
2033
2034 /*
2035  * Push out any unwritten RDMA operations.
2036  *
2037  * We support sending out multiple chunks at the same time.
2038  * Not all of them need to get signaled in the completion queue.
2039  */
2040 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
2041 {
2042     int ret;
2043
2044     if (!rdma->current_length) {
2045         return 0;
2046     }
2047
2048     ret = qemu_rdma_write_one(f, rdma,
2049             rdma->current_index, rdma->current_addr, rdma->current_length);
2050
2051     if (ret < 0) {
2052         return ret;
2053     }
2054
2055     if (ret == 0) {
2056         rdma->nb_sent++;
2057         trace_qemu_rdma_write_flush(rdma->nb_sent);
2058     }
2059
2060     rdma->current_length = 0;
2061     rdma->current_addr = 0;
2062
2063     return 0;
2064 }
2065
2066 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
2067                     uint64_t offset, uint64_t len)
2068 {
2069     RDMALocalBlock *block;
2070     uint8_t *host_addr;
2071     uint8_t *chunk_end;
2072
2073     if (rdma->current_index < 0) {
2074         return 0;
2075     }
2076
2077     if (rdma->current_chunk < 0) {
2078         return 0;
2079     }
2080
2081     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
2082     host_addr = block->local_host_addr + (offset - block->offset);
2083     chunk_end = ram_chunk_end(block, rdma->current_chunk);
2084
2085     if (rdma->current_length == 0) {
2086         return 0;
2087     }
2088
2089     /*
2090      * Only merge into chunk sequentially.
2091      */
2092     if (offset != (rdma->current_addr + rdma->current_length)) {
2093         return 0;
2094     }
2095
2096     if (offset < block->offset) {
2097         return 0;
2098     }
2099
2100     if ((offset + len) > (block->offset + block->length)) {
2101         return 0;
2102     }
2103
2104     if ((host_addr + len) > chunk_end) {
2105         return 0;
2106     }
2107
2108     return 1;
2109 }
2110
2111 /*
2112  * We're not actually writing here, but doing three things:
2113  *
2114  * 1. Identify the chunk the buffer belongs to.
2115  * 2. If the chunk is full or the buffer doesn't belong to the current
2116  *    chunk, then start a new chunk and flush() the old chunk.
2117  * 3. To keep the hardware busy, we also group chunks into batches
2118  *    and only require that a batch gets acknowledged in the completion
2119  *    qeueue instead of each individual chunk.
2120  */
2121 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2122                            uint64_t block_offset, uint64_t offset,
2123                            uint64_t len)
2124 {
2125     uint64_t current_addr = block_offset + offset;
2126     uint64_t index = rdma->current_index;
2127     uint64_t chunk = rdma->current_chunk;
2128     int ret;
2129
2130     /* If we cannot merge it, we flush the current buffer first. */
2131     if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2132         ret = qemu_rdma_write_flush(f, rdma);
2133         if (ret) {
2134             return ret;
2135         }
2136         rdma->current_length = 0;
2137         rdma->current_addr = current_addr;
2138
2139         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2140                                          offset, len, &index, &chunk);
2141         if (ret) {
2142             error_report("ram block search failed");
2143             return ret;
2144         }
2145         rdma->current_index = index;
2146         rdma->current_chunk = chunk;
2147     }
2148
2149     /* merge it */
2150     rdma->current_length += len;
2151
2152     /* flush it if buffer is too large */
2153     if (rdma->current_length >= RDMA_MERGE_MAX) {
2154         return qemu_rdma_write_flush(f, rdma);
2155     }
2156
2157     return 0;
2158 }
2159
2160 static void qemu_rdma_cleanup(RDMAContext *rdma)
2161 {
2162     struct rdma_cm_event *cm_event;
2163     int ret, idx;
2164
2165     if (rdma->cm_id && rdma->connected) {
2166         if (rdma->error_state) {
2167             RDMAControlHeader head = { .len = 0,
2168                                        .type = RDMA_CONTROL_ERROR,
2169                                        .repeat = 1,
2170                                      };
2171             error_report("Early error. Sending error.");
2172             qemu_rdma_post_send_control(rdma, NULL, &head);
2173         }
2174
2175         ret = rdma_disconnect(rdma->cm_id);
2176         if (!ret) {
2177             trace_qemu_rdma_cleanup_waiting_for_disconnect();
2178             ret = rdma_get_cm_event(rdma->channel, &cm_event);
2179             if (!ret) {
2180                 rdma_ack_cm_event(cm_event);
2181             }
2182         }
2183         trace_qemu_rdma_cleanup_disconnect();
2184         rdma->connected = false;
2185     }
2186
2187     g_free(rdma->dest_blocks);
2188     rdma->dest_blocks = NULL;
2189
2190     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2191         if (rdma->wr_data[idx].control_mr) {
2192             rdma->total_registrations--;
2193             ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2194         }
2195         rdma->wr_data[idx].control_mr = NULL;
2196     }
2197
2198     if (rdma->local_ram_blocks.block) {
2199         while (rdma->local_ram_blocks.nb_blocks) {
2200             rdma_delete_block(rdma, rdma->local_ram_blocks.block->offset);
2201         }
2202     }
2203
2204     if (rdma->qp) {
2205         rdma_destroy_qp(rdma->cm_id);
2206         rdma->qp = NULL;
2207     }
2208     if (rdma->cq) {
2209         ibv_destroy_cq(rdma->cq);
2210         rdma->cq = NULL;
2211     }
2212     if (rdma->comp_channel) {
2213         ibv_destroy_comp_channel(rdma->comp_channel);
2214         rdma->comp_channel = NULL;
2215     }
2216     if (rdma->pd) {
2217         ibv_dealloc_pd(rdma->pd);
2218         rdma->pd = NULL;
2219     }
2220     if (rdma->cm_id) {
2221         rdma_destroy_id(rdma->cm_id);
2222         rdma->cm_id = NULL;
2223     }
2224     if (rdma->listen_id) {
2225         rdma_destroy_id(rdma->listen_id);
2226         rdma->listen_id = NULL;
2227     }
2228     if (rdma->channel) {
2229         rdma_destroy_event_channel(rdma->channel);
2230         rdma->channel = NULL;
2231     }
2232     g_free(rdma->host);
2233     rdma->host = NULL;
2234 }
2235
2236
2237 static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
2238 {
2239     int ret, idx;
2240     Error *local_err = NULL, **temp = &local_err;
2241
2242     /*
2243      * Will be validated against destination's actual capabilities
2244      * after the connect() completes.
2245      */
2246     rdma->pin_all = pin_all;
2247
2248     ret = qemu_rdma_resolve_host(rdma, temp);
2249     if (ret) {
2250         goto err_rdma_source_init;
2251     }
2252
2253     ret = qemu_rdma_alloc_pd_cq(rdma);
2254     if (ret) {
2255         ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2256                     " limits may be too low. Please check $ ulimit -a # and "
2257                     "search for 'ulimit -l' in the output");
2258         goto err_rdma_source_init;
2259     }
2260
2261     ret = qemu_rdma_alloc_qp(rdma);
2262     if (ret) {
2263         ERROR(temp, "rdma migration: error allocating qp!");
2264         goto err_rdma_source_init;
2265     }
2266
2267     ret = qemu_rdma_init_ram_blocks(rdma);
2268     if (ret) {
2269         ERROR(temp, "rdma migration: error initializing ram blocks!");
2270         goto err_rdma_source_init;
2271     }
2272
2273     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2274         ret = qemu_rdma_reg_control(rdma, idx);
2275         if (ret) {
2276             ERROR(temp, "rdma migration: error registering %d control!",
2277                                                             idx);
2278             goto err_rdma_source_init;
2279         }
2280     }
2281
2282     return 0;
2283
2284 err_rdma_source_init:
2285     error_propagate(errp, local_err);
2286     qemu_rdma_cleanup(rdma);
2287     return -1;
2288 }
2289
2290 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2291 {
2292     RDMACapabilities cap = {
2293                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2294                                 .flags = 0,
2295                            };
2296     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2297                                           .retry_count = 5,
2298                                           .private_data = &cap,
2299                                           .private_data_len = sizeof(cap),
2300                                         };
2301     struct rdma_cm_event *cm_event;
2302     int ret;
2303
2304     /*
2305      * Only negotiate the capability with destination if the user
2306      * on the source first requested the capability.
2307      */
2308     if (rdma->pin_all) {
2309         trace_qemu_rdma_connect_pin_all_requested();
2310         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2311     }
2312
2313     caps_to_network(&cap);
2314
2315     ret = rdma_connect(rdma->cm_id, &conn_param);
2316     if (ret) {
2317         perror("rdma_connect");
2318         ERROR(errp, "connecting to destination!");
2319         goto err_rdma_source_connect;
2320     }
2321
2322     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2323     if (ret) {
2324         perror("rdma_get_cm_event after rdma_connect");
2325         ERROR(errp, "connecting to destination!");
2326         rdma_ack_cm_event(cm_event);
2327         goto err_rdma_source_connect;
2328     }
2329
2330     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2331         perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2332         ERROR(errp, "connecting to destination!");
2333         rdma_ack_cm_event(cm_event);
2334         goto err_rdma_source_connect;
2335     }
2336     rdma->connected = true;
2337
2338     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2339     network_to_caps(&cap);
2340
2341     /*
2342      * Verify that the *requested* capabilities are supported by the destination
2343      * and disable them otherwise.
2344      */
2345     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2346         ERROR(errp, "Server cannot support pinning all memory. "
2347                         "Will register memory dynamically.");
2348         rdma->pin_all = false;
2349     }
2350
2351     trace_qemu_rdma_connect_pin_all_outcome(rdma->pin_all);
2352
2353     rdma_ack_cm_event(cm_event);
2354
2355     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2356     if (ret) {
2357         ERROR(errp, "posting second control recv!");
2358         goto err_rdma_source_connect;
2359     }
2360
2361     rdma->control_ready_expected = 1;
2362     rdma->nb_sent = 0;
2363     return 0;
2364
2365 err_rdma_source_connect:
2366     qemu_rdma_cleanup(rdma);
2367     return -1;
2368 }
2369
2370 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2371 {
2372     int ret, idx;
2373     struct rdma_cm_id *listen_id;
2374     char ip[40] = "unknown";
2375     struct rdma_addrinfo *res, *e;
2376     char port_str[16];
2377
2378     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2379         rdma->wr_data[idx].control_len = 0;
2380         rdma->wr_data[idx].control_curr = NULL;
2381     }
2382
2383     if (!rdma->host || !rdma->host[0]) {
2384         ERROR(errp, "RDMA host is not set!");
2385         rdma->error_state = -EINVAL;
2386         return -1;
2387     }
2388     /* create CM channel */
2389     rdma->channel = rdma_create_event_channel();
2390     if (!rdma->channel) {
2391         ERROR(errp, "could not create rdma event channel");
2392         rdma->error_state = -EINVAL;
2393         return -1;
2394     }
2395
2396     /* create CM id */
2397     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2398     if (ret) {
2399         ERROR(errp, "could not create cm_id!");
2400         goto err_dest_init_create_listen_id;
2401     }
2402
2403     snprintf(port_str, 16, "%d", rdma->port);
2404     port_str[15] = '\0';
2405
2406     ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
2407     if (ret < 0) {
2408         ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
2409         goto err_dest_init_bind_addr;
2410     }
2411
2412     for (e = res; e != NULL; e = e->ai_next) {
2413         inet_ntop(e->ai_family,
2414             &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
2415         trace_qemu_rdma_dest_init_trying(rdma->host, ip);
2416         ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
2417         if (ret) {
2418             continue;
2419         }
2420         if (e->ai_family == AF_INET6) {
2421             ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs);
2422             if (ret) {
2423                 continue;
2424             }
2425         }
2426         break;
2427     }
2428
2429     if (!e) {
2430         ERROR(errp, "Error: could not rdma_bind_addr!");
2431         goto err_dest_init_bind_addr;
2432     }
2433
2434     rdma->listen_id = listen_id;
2435     qemu_rdma_dump_gid("dest_init", listen_id);
2436     return 0;
2437
2438 err_dest_init_bind_addr:
2439     rdma_destroy_id(listen_id);
2440 err_dest_init_create_listen_id:
2441     rdma_destroy_event_channel(rdma->channel);
2442     rdma->channel = NULL;
2443     rdma->error_state = ret;
2444     return ret;
2445
2446 }
2447
2448 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2449 {
2450     RDMAContext *rdma = NULL;
2451     InetSocketAddress *addr;
2452
2453     if (host_port) {
2454         rdma = g_malloc0(sizeof(RDMAContext));
2455         rdma->current_index = -1;
2456         rdma->current_chunk = -1;
2457
2458         addr = inet_parse(host_port, NULL);
2459         if (addr != NULL) {
2460             rdma->port = atoi(addr->port);
2461             rdma->host = g_strdup(addr->host);
2462         } else {
2463             ERROR(errp, "bad RDMA migration address '%s'", host_port);
2464             g_free(rdma);
2465             rdma = NULL;
2466         }
2467
2468         qapi_free_InetSocketAddress(addr);
2469     }
2470
2471     return rdma;
2472 }
2473
2474 /*
2475  * QEMUFile interface to the control channel.
2476  * SEND messages for control only.
2477  * VM's ram is handled with regular RDMA messages.
2478  */
2479 static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
2480                                 int64_t pos, int size)
2481 {
2482     QEMUFileRDMA *r = opaque;
2483     QEMUFile *f = r->file;
2484     RDMAContext *rdma = r->rdma;
2485     size_t remaining = size;
2486     uint8_t * data = (void *) buf;
2487     int ret;
2488
2489     CHECK_ERROR_STATE();
2490
2491     /*
2492      * Push out any writes that
2493      * we're queued up for VM's ram.
2494      */
2495     ret = qemu_rdma_write_flush(f, rdma);
2496     if (ret < 0) {
2497         rdma->error_state = ret;
2498         return ret;
2499     }
2500
2501     while (remaining) {
2502         RDMAControlHeader head;
2503
2504         r->len = MIN(remaining, RDMA_SEND_INCREMENT);
2505         remaining -= r->len;
2506
2507         head.len = r->len;
2508         head.type = RDMA_CONTROL_QEMU_FILE;
2509
2510         ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2511
2512         if (ret < 0) {
2513             rdma->error_state = ret;
2514             return ret;
2515         }
2516
2517         data += r->len;
2518     }
2519
2520     return size;
2521 }
2522
2523 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2524                              int size, int idx)
2525 {
2526     size_t len = 0;
2527
2528     if (rdma->wr_data[idx].control_len) {
2529         trace_qemu_rdma_fill(rdma->wr_data[idx].control_len, size);
2530
2531         len = MIN(size, rdma->wr_data[idx].control_len);
2532         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2533         rdma->wr_data[idx].control_curr += len;
2534         rdma->wr_data[idx].control_len -= len;
2535     }
2536
2537     return len;
2538 }
2539
2540 /*
2541  * QEMUFile interface to the control channel.
2542  * RDMA links don't use bytestreams, so we have to
2543  * return bytes to QEMUFile opportunistically.
2544  */
2545 static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
2546                                 int64_t pos, int size)
2547 {
2548     QEMUFileRDMA *r = opaque;
2549     RDMAContext *rdma = r->rdma;
2550     RDMAControlHeader head;
2551     int ret = 0;
2552
2553     CHECK_ERROR_STATE();
2554
2555     /*
2556      * First, we hold on to the last SEND message we
2557      * were given and dish out the bytes until we run
2558      * out of bytes.
2559      */
2560     r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
2561     if (r->len) {
2562         return r->len;
2563     }
2564
2565     /*
2566      * Once we run out, we block and wait for another
2567      * SEND message to arrive.
2568      */
2569     ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2570
2571     if (ret < 0) {
2572         rdma->error_state = ret;
2573         return ret;
2574     }
2575
2576     /*
2577      * SEND was received with new bytes, now try again.
2578      */
2579     return qemu_rdma_fill(r->rdma, buf, size, 0);
2580 }
2581
2582 /*
2583  * Block until all the outstanding chunks have been delivered by the hardware.
2584  */
2585 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2586 {
2587     int ret;
2588
2589     if (qemu_rdma_write_flush(f, rdma) < 0) {
2590         return -EIO;
2591     }
2592
2593     while (rdma->nb_sent) {
2594         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2595         if (ret < 0) {
2596             error_report("rdma migration: complete polling error!");
2597             return -EIO;
2598         }
2599     }
2600
2601     qemu_rdma_unregister_waiting(rdma);
2602
2603     return 0;
2604 }
2605
2606 static int qemu_rdma_close(void *opaque)
2607 {
2608     trace_qemu_rdma_close();
2609     QEMUFileRDMA *r = opaque;
2610     if (r->rdma) {
2611         qemu_rdma_cleanup(r->rdma);
2612         g_free(r->rdma);
2613     }
2614     g_free(r);
2615     return 0;
2616 }
2617
2618 /*
2619  * Parameters:
2620  *    @offset == 0 :
2621  *        This means that 'block_offset' is a full virtual address that does not
2622  *        belong to a RAMBlock of the virtual machine and instead
2623  *        represents a private malloc'd memory area that the caller wishes to
2624  *        transfer.
2625  *
2626  *    @offset != 0 :
2627  *        Offset is an offset to be added to block_offset and used
2628  *        to also lookup the corresponding RAMBlock.
2629  *
2630  *    @size > 0 :
2631  *        Initiate an transfer this size.
2632  *
2633  *    @size == 0 :
2634  *        A 'hint' or 'advice' that means that we wish to speculatively
2635  *        and asynchronously unregister this memory. In this case, there is no
2636  *        guarantee that the unregister will actually happen, for example,
2637  *        if the memory is being actively transmitted. Additionally, the memory
2638  *        may be re-registered at any future time if a write within the same
2639  *        chunk was requested again, even if you attempted to unregister it
2640  *        here.
2641  *
2642  *    @size < 0 : TODO, not yet supported
2643  *        Unregister the memory NOW. This means that the caller does not
2644  *        expect there to be any future RDMA transfers and we just want to clean
2645  *        things up. This is used in case the upper layer owns the memory and
2646  *        cannot wait for qemu_fclose() to occur.
2647  *
2648  *    @bytes_sent : User-specificed pointer to indicate how many bytes were
2649  *                  sent. Usually, this will not be more than a few bytes of
2650  *                  the protocol because most transfers are sent asynchronously.
2651  */
2652 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2653                                   ram_addr_t block_offset, ram_addr_t offset,
2654                                   size_t size, uint64_t *bytes_sent)
2655 {
2656     QEMUFileRDMA *rfile = opaque;
2657     RDMAContext *rdma = rfile->rdma;
2658     int ret;
2659
2660     CHECK_ERROR_STATE();
2661
2662     qemu_fflush(f);
2663
2664     if (size > 0) {
2665         /*
2666          * Add this page to the current 'chunk'. If the chunk
2667          * is full, or the page doen't belong to the current chunk,
2668          * an actual RDMA write will occur and a new chunk will be formed.
2669          */
2670         ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2671         if (ret < 0) {
2672             error_report("rdma migration: write error! %d", ret);
2673             goto err;
2674         }
2675
2676         /*
2677          * We always return 1 bytes because the RDMA
2678          * protocol is completely asynchronous. We do not yet know
2679          * whether an  identified chunk is zero or not because we're
2680          * waiting for other pages to potentially be merged with
2681          * the current chunk. So, we have to call qemu_update_position()
2682          * later on when the actual write occurs.
2683          */
2684         if (bytes_sent) {
2685             *bytes_sent = 1;
2686         }
2687     } else {
2688         uint64_t index, chunk;
2689
2690         /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
2691         if (size < 0) {
2692             ret = qemu_rdma_drain_cq(f, rdma);
2693             if (ret < 0) {
2694                 fprintf(stderr, "rdma: failed to synchronously drain"
2695                                 " completion queue before unregistration.\n");
2696                 goto err;
2697             }
2698         }
2699         */
2700
2701         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2702                                          offset, size, &index, &chunk);
2703
2704         if (ret) {
2705             error_report("ram block search failed");
2706             goto err;
2707         }
2708
2709         qemu_rdma_signal_unregister(rdma, index, chunk, 0);
2710
2711         /*
2712          * TODO: Synchronous, guaranteed unregistration (should not occur during
2713          * fast-path). Otherwise, unregisters will process on the next call to
2714          * qemu_rdma_drain_cq()
2715         if (size < 0) {
2716             qemu_rdma_unregister_waiting(rdma);
2717         }
2718         */
2719     }
2720
2721     /*
2722      * Drain the Completion Queue if possible, but do not block,
2723      * just poll.
2724      *
2725      * If nothing to poll, the end of the iteration will do this
2726      * again to make sure we don't overflow the request queue.
2727      */
2728     while (1) {
2729         uint64_t wr_id, wr_id_in;
2730         int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
2731         if (ret < 0) {
2732             error_report("rdma migration: polling error! %d", ret);
2733             goto err;
2734         }
2735
2736         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
2737
2738         if (wr_id == RDMA_WRID_NONE) {
2739             break;
2740         }
2741     }
2742
2743     return RAM_SAVE_CONTROL_DELAYED;
2744 err:
2745     rdma->error_state = ret;
2746     return ret;
2747 }
2748
2749 static int qemu_rdma_accept(RDMAContext *rdma)
2750 {
2751     RDMACapabilities cap;
2752     struct rdma_conn_param conn_param = {
2753                                             .responder_resources = 2,
2754                                             .private_data = &cap,
2755                                             .private_data_len = sizeof(cap),
2756                                          };
2757     struct rdma_cm_event *cm_event;
2758     struct ibv_context *verbs;
2759     int ret = -EINVAL;
2760     int idx;
2761
2762     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2763     if (ret) {
2764         goto err_rdma_dest_wait;
2765     }
2766
2767     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
2768         rdma_ack_cm_event(cm_event);
2769         goto err_rdma_dest_wait;
2770     }
2771
2772     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2773
2774     network_to_caps(&cap);
2775
2776     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
2777             error_report("Unknown source RDMA version: %d, bailing...",
2778                             cap.version);
2779             rdma_ack_cm_event(cm_event);
2780             goto err_rdma_dest_wait;
2781     }
2782
2783     /*
2784      * Respond with only the capabilities this version of QEMU knows about.
2785      */
2786     cap.flags &= known_capabilities;
2787
2788     /*
2789      * Enable the ones that we do know about.
2790      * Add other checks here as new ones are introduced.
2791      */
2792     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
2793         rdma->pin_all = true;
2794     }
2795
2796     rdma->cm_id = cm_event->id;
2797     verbs = cm_event->id->verbs;
2798
2799     rdma_ack_cm_event(cm_event);
2800
2801     trace_qemu_rdma_accept_pin_state(rdma->pin_all);
2802
2803     caps_to_network(&cap);
2804
2805     trace_qemu_rdma_accept_pin_verbsc(verbs);
2806
2807     if (!rdma->verbs) {
2808         rdma->verbs = verbs;
2809     } else if (rdma->verbs != verbs) {
2810             error_report("ibv context not matching %p, %p!", rdma->verbs,
2811                          verbs);
2812             goto err_rdma_dest_wait;
2813     }
2814
2815     qemu_rdma_dump_id("dest_init", verbs);
2816
2817     ret = qemu_rdma_alloc_pd_cq(rdma);
2818     if (ret) {
2819         error_report("rdma migration: error allocating pd and cq!");
2820         goto err_rdma_dest_wait;
2821     }
2822
2823     ret = qemu_rdma_alloc_qp(rdma);
2824     if (ret) {
2825         error_report("rdma migration: error allocating qp!");
2826         goto err_rdma_dest_wait;
2827     }
2828
2829     ret = qemu_rdma_init_ram_blocks(rdma);
2830     if (ret) {
2831         error_report("rdma migration: error initializing ram blocks!");
2832         goto err_rdma_dest_wait;
2833     }
2834
2835     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2836         ret = qemu_rdma_reg_control(rdma, idx);
2837         if (ret) {
2838             error_report("rdma: error registering %d control", idx);
2839             goto err_rdma_dest_wait;
2840         }
2841     }
2842
2843     qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
2844
2845     ret = rdma_accept(rdma->cm_id, &conn_param);
2846     if (ret) {
2847         error_report("rdma_accept returns %d", ret);
2848         goto err_rdma_dest_wait;
2849     }
2850
2851     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2852     if (ret) {
2853         error_report("rdma_accept get_cm_event failed %d", ret);
2854         goto err_rdma_dest_wait;
2855     }
2856
2857     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2858         error_report("rdma_accept not event established");
2859         rdma_ack_cm_event(cm_event);
2860         goto err_rdma_dest_wait;
2861     }
2862
2863     rdma_ack_cm_event(cm_event);
2864     rdma->connected = true;
2865
2866     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2867     if (ret) {
2868         error_report("rdma migration: error posting second control recv");
2869         goto err_rdma_dest_wait;
2870     }
2871
2872     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
2873
2874     return 0;
2875
2876 err_rdma_dest_wait:
2877     rdma->error_state = ret;
2878     qemu_rdma_cleanup(rdma);
2879     return ret;
2880 }
2881
2882 /*
2883  * During each iteration of the migration, we listen for instructions
2884  * by the source VM to perform dynamic page registrations before they
2885  * can perform RDMA operations.
2886  *
2887  * We respond with the 'rkey'.
2888  *
2889  * Keep doing this until the source tells us to stop.
2890  */
2891 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
2892                                          uint64_t flags)
2893 {
2894     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
2895                                .type = RDMA_CONTROL_REGISTER_RESULT,
2896                                .repeat = 0,
2897                              };
2898     RDMAControlHeader unreg_resp = { .len = 0,
2899                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
2900                                .repeat = 0,
2901                              };
2902     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
2903                                  .repeat = 1 };
2904     QEMUFileRDMA *rfile = opaque;
2905     RDMAContext *rdma = rfile->rdma;
2906     RDMALocalBlocks *local = &rdma->local_ram_blocks;
2907     RDMAControlHeader head;
2908     RDMARegister *reg, *registers;
2909     RDMACompress *comp;
2910     RDMARegisterResult *reg_result;
2911     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
2912     RDMALocalBlock *block;
2913     void *host_addr;
2914     int ret = 0;
2915     int idx = 0;
2916     int count = 0;
2917     int i = 0;
2918
2919     CHECK_ERROR_STATE();
2920
2921     do {
2922         trace_qemu_rdma_registration_handle_wait(flags);
2923
2924         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
2925
2926         if (ret < 0) {
2927             break;
2928         }
2929
2930         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
2931             error_report("rdma: Too many requests in this message (%d)."
2932                             "Bailing.", head.repeat);
2933             ret = -EIO;
2934             break;
2935         }
2936
2937         switch (head.type) {
2938         case RDMA_CONTROL_COMPRESS:
2939             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
2940             network_to_compress(comp);
2941
2942             trace_qemu_rdma_registration_handle_compress(comp->length,
2943                                                          comp->block_idx,
2944                                                          comp->offset);
2945             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
2946
2947             host_addr = block->local_host_addr +
2948                             (comp->offset - block->offset);
2949
2950             ram_handle_compressed(host_addr, comp->value, comp->length);
2951             break;
2952
2953         case RDMA_CONTROL_REGISTER_FINISHED:
2954             trace_qemu_rdma_registration_handle_finished();
2955             goto out;
2956
2957         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
2958             trace_qemu_rdma_registration_handle_ram_blocks();
2959
2960             if (rdma->pin_all) {
2961                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
2962                 if (ret) {
2963                     error_report("rdma migration: error dest "
2964                                     "registering ram blocks");
2965                     goto out;
2966                 }
2967             }
2968
2969             /*
2970              * Dest uses this to prepare to transmit the RAMBlock descriptions
2971              * to the source VM after connection setup.
2972              * Both sides use the "remote" structure to communicate and update
2973              * their "local" descriptions with what was sent.
2974              */
2975             for (i = 0; i < local->nb_blocks; i++) {
2976                 rdma->dest_blocks[i].remote_host_addr =
2977                     (uintptr_t)(local->block[i].local_host_addr);
2978
2979                 if (rdma->pin_all) {
2980                     rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
2981                 }
2982
2983                 rdma->dest_blocks[i].offset = local->block[i].offset;
2984                 rdma->dest_blocks[i].length = local->block[i].length;
2985
2986                 dest_block_to_network(&rdma->dest_blocks[i]);
2987             }
2988
2989             blocks.len = rdma->local_ram_blocks.nb_blocks
2990                                                 * sizeof(RDMADestBlock);
2991
2992
2993             ret = qemu_rdma_post_send_control(rdma,
2994                                         (uint8_t *) rdma->dest_blocks, &blocks);
2995
2996             if (ret < 0) {
2997                 error_report("rdma migration: error sending remote info");
2998                 goto out;
2999             }
3000
3001             break;
3002         case RDMA_CONTROL_REGISTER_REQUEST:
3003             trace_qemu_rdma_registration_handle_register(head.repeat);
3004
3005             reg_resp.repeat = head.repeat;
3006             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3007
3008             for (count = 0; count < head.repeat; count++) {
3009                 uint64_t chunk;
3010                 uint8_t *chunk_start, *chunk_end;
3011
3012                 reg = &registers[count];
3013                 network_to_register(reg);
3014
3015                 reg_result = &results[count];
3016
3017                 trace_qemu_rdma_registration_handle_register_loop(count,
3018                          reg->current_index, reg->key.current_addr, reg->chunks);
3019
3020                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3021                 if (block->is_ram_block) {
3022                     host_addr = (block->local_host_addr +
3023                                 (reg->key.current_addr - block->offset));
3024                     chunk = ram_chunk_index(block->local_host_addr,
3025                                             (uint8_t *) host_addr);
3026                 } else {
3027                     chunk = reg->key.chunk;
3028                     host_addr = block->local_host_addr +
3029                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
3030                 }
3031                 chunk_start = ram_chunk_start(block, chunk);
3032                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
3033                 if (qemu_rdma_register_and_get_keys(rdma, block,
3034                             (uintptr_t)host_addr, NULL, &reg_result->rkey,
3035                             chunk, chunk_start, chunk_end)) {
3036                     error_report("cannot get rkey");
3037                     ret = -EINVAL;
3038                     goto out;
3039                 }
3040
3041                 reg_result->host_addr = (uintptr_t)block->local_host_addr;
3042
3043                 trace_qemu_rdma_registration_handle_register_rkey(
3044                                                            reg_result->rkey);
3045
3046                 result_to_network(reg_result);
3047             }
3048
3049             ret = qemu_rdma_post_send_control(rdma,
3050                             (uint8_t *) results, &reg_resp);
3051
3052             if (ret < 0) {
3053                 error_report("Failed to send control buffer");
3054                 goto out;
3055             }
3056             break;
3057         case RDMA_CONTROL_UNREGISTER_REQUEST:
3058             trace_qemu_rdma_registration_handle_unregister(head.repeat);
3059             unreg_resp.repeat = head.repeat;
3060             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
3061
3062             for (count = 0; count < head.repeat; count++) {
3063                 reg = &registers[count];
3064                 network_to_register(reg);
3065
3066                 trace_qemu_rdma_registration_handle_unregister_loop(count,
3067                            reg->current_index, reg->key.chunk);
3068
3069                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
3070
3071                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
3072                 block->pmr[reg->key.chunk] = NULL;
3073
3074                 if (ret != 0) {
3075                     perror("rdma unregistration chunk failed");
3076                     ret = -ret;
3077                     goto out;
3078                 }
3079
3080                 rdma->total_registrations--;
3081
3082                 trace_qemu_rdma_registration_handle_unregister_success(
3083                                                        reg->key.chunk);
3084             }
3085
3086             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
3087
3088             if (ret < 0) {
3089                 error_report("Failed to send control buffer");
3090                 goto out;
3091             }
3092             break;
3093         case RDMA_CONTROL_REGISTER_RESULT:
3094             error_report("Invalid RESULT message at dest.");
3095             ret = -EIO;
3096             goto out;
3097         default:
3098             error_report("Unknown control message %s", control_desc[head.type]);
3099             ret = -EIO;
3100             goto out;
3101         }
3102     } while (1);
3103 out:
3104     if (ret < 0) {
3105         rdma->error_state = ret;
3106     }
3107     return ret;
3108 }
3109
3110 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3111                                         uint64_t flags)
3112 {
3113     QEMUFileRDMA *rfile = opaque;
3114     RDMAContext *rdma = rfile->rdma;
3115
3116     CHECK_ERROR_STATE();
3117
3118     trace_qemu_rdma_registration_start(flags);
3119     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3120     qemu_fflush(f);
3121
3122     return 0;
3123 }
3124
3125 /*
3126  * Inform dest that dynamic registrations are done for now.
3127  * First, flush writes, if any.
3128  */
3129 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3130                                        uint64_t flags)
3131 {
3132     Error *local_err = NULL, **errp = &local_err;
3133     QEMUFileRDMA *rfile = opaque;
3134     RDMAContext *rdma = rfile->rdma;
3135     RDMAControlHeader head = { .len = 0, .repeat = 1 };
3136     int ret = 0;
3137
3138     CHECK_ERROR_STATE();
3139
3140     qemu_fflush(f);
3141     ret = qemu_rdma_drain_cq(f, rdma);
3142
3143     if (ret < 0) {
3144         goto err;
3145     }
3146
3147     if (flags == RAM_CONTROL_SETUP) {
3148         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3149         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3150         int reg_result_idx, i, j, nb_dest_blocks;
3151
3152         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3153         trace_qemu_rdma_registration_stop_ram();
3154
3155         /*
3156          * Make sure that we parallelize the pinning on both sides.
3157          * For very large guests, doing this serially takes a really
3158          * long time, so we have to 'interleave' the pinning locally
3159          * with the control messages by performing the pinning on this
3160          * side before we receive the control response from the other
3161          * side that the pinning has completed.
3162          */
3163         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3164                     &reg_result_idx, rdma->pin_all ?
3165                     qemu_rdma_reg_whole_ram_blocks : NULL);
3166         if (ret < 0) {
3167             ERROR(errp, "receiving remote info!");
3168             return ret;
3169         }
3170
3171         nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
3172
3173         /*
3174          * The protocol uses two different sets of rkeys (mutually exclusive):
3175          * 1. One key to represent the virtual address of the entire ram block.
3176          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3177          * 2. One to represent individual chunks within a ram block.
3178          *    (dynamic chunk registration enabled - pin individual chunks.)
3179          *
3180          * Once the capability is successfully negotiated, the destination transmits
3181          * the keys to use (or sends them later) including the virtual addresses
3182          * and then propagates the remote ram block descriptions to his local copy.
3183          */
3184
3185         if (local->nb_blocks != nb_dest_blocks) {
3186             ERROR(errp, "ram blocks mismatch #1! "
3187                         "Your QEMU command line parameters are probably "
3188                         "not identical on both the source and destination.");
3189             return -EINVAL;
3190         }
3191
3192         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3193         memcpy(rdma->dest_blocks,
3194             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3195         for (i = 0; i < nb_dest_blocks; i++) {
3196             network_to_dest_block(&rdma->dest_blocks[i]);
3197
3198             /* search local ram blocks */
3199             for (j = 0; j < local->nb_blocks; j++) {
3200                 if (rdma->dest_blocks[i].offset != local->block[j].offset) {
3201                     continue;
3202                 }
3203
3204                 if (rdma->dest_blocks[i].length != local->block[j].length) {
3205                     ERROR(errp, "ram blocks mismatch #2! "
3206                         "Your QEMU command line parameters are probably "
3207                         "not identical on both the source and destination.");
3208                     return -EINVAL;
3209                 }
3210                 local->block[j].remote_host_addr =
3211                         rdma->dest_blocks[i].remote_host_addr;
3212                 local->block[j].remote_rkey = rdma->dest_blocks[i].remote_rkey;
3213                 break;
3214             }
3215
3216             if (j >= local->nb_blocks) {
3217                 ERROR(errp, "ram blocks mismatch #3! "
3218                         "Your QEMU command line parameters are probably "
3219                         "not identical on both the source and destination.");
3220                 return -EINVAL;
3221             }
3222         }
3223     }
3224
3225     trace_qemu_rdma_registration_stop(flags);
3226
3227     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3228     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3229
3230     if (ret < 0) {
3231         goto err;
3232     }
3233
3234     return 0;
3235 err:
3236     rdma->error_state = ret;
3237     return ret;
3238 }
3239
3240 static int qemu_rdma_get_fd(void *opaque)
3241 {
3242     QEMUFileRDMA *rfile = opaque;
3243     RDMAContext *rdma = rfile->rdma;
3244
3245     return rdma->comp_channel->fd;
3246 }
3247
3248 static const QEMUFileOps rdma_read_ops = {
3249     .get_buffer    = qemu_rdma_get_buffer,
3250     .get_fd        = qemu_rdma_get_fd,
3251     .close         = qemu_rdma_close,
3252     .hook_ram_load = qemu_rdma_registration_handle,
3253 };
3254
3255 static const QEMUFileOps rdma_write_ops = {
3256     .put_buffer         = qemu_rdma_put_buffer,
3257     .close              = qemu_rdma_close,
3258     .before_ram_iterate = qemu_rdma_registration_start,
3259     .after_ram_iterate  = qemu_rdma_registration_stop,
3260     .save_page          = qemu_rdma_save_page,
3261 };
3262
3263 static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3264 {
3265     QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
3266
3267     if (qemu_file_mode_is_not_valid(mode)) {
3268         return NULL;
3269     }
3270
3271     r->rdma = rdma;
3272
3273     if (mode[0] == 'w') {
3274         r->file = qemu_fopen_ops(r, &rdma_write_ops);
3275     } else {
3276         r->file = qemu_fopen_ops(r, &rdma_read_ops);
3277     }
3278
3279     return r->file;
3280 }
3281
3282 static void rdma_accept_incoming_migration(void *opaque)
3283 {
3284     RDMAContext *rdma = opaque;
3285     int ret;
3286     QEMUFile *f;
3287     Error *local_err = NULL, **errp = &local_err;
3288
3289     trace_qemu_dma_accept_incoming_migration();
3290     ret = qemu_rdma_accept(rdma);
3291
3292     if (ret) {
3293         ERROR(errp, "RDMA Migration initialization failed!");
3294         return;
3295     }
3296
3297     trace_qemu_dma_accept_incoming_migration_accepted();
3298
3299     f = qemu_fopen_rdma(rdma, "rb");
3300     if (f == NULL) {
3301         ERROR(errp, "could not qemu_fopen_rdma!");
3302         qemu_rdma_cleanup(rdma);
3303         return;
3304     }
3305
3306     rdma->migration_started_on_destination = 1;
3307     process_incoming_migration(f);
3308 }
3309
3310 void rdma_start_incoming_migration(const char *host_port, Error **errp)
3311 {
3312     int ret;
3313     RDMAContext *rdma;
3314     Error *local_err = NULL;
3315
3316     trace_rdma_start_incoming_migration();
3317     rdma = qemu_rdma_data_init(host_port, &local_err);
3318
3319     if (rdma == NULL) {
3320         goto err;
3321     }
3322
3323     ret = qemu_rdma_dest_init(rdma, &local_err);
3324
3325     if (ret) {
3326         goto err;
3327     }
3328
3329     trace_rdma_start_incoming_migration_after_dest_init();
3330
3331     ret = rdma_listen(rdma->listen_id, 5);
3332
3333     if (ret) {
3334         ERROR(errp, "listening on socket!");
3335         goto err;
3336     }
3337
3338     trace_rdma_start_incoming_migration_after_rdma_listen();
3339
3340     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
3341                         NULL, (void *)(intptr_t)rdma);
3342     return;
3343 err:
3344     error_propagate(errp, local_err);
3345     g_free(rdma);
3346 }
3347
3348 void rdma_start_outgoing_migration(void *opaque,
3349                             const char *host_port, Error **errp)
3350 {
3351     MigrationState *s = opaque;
3352     Error *local_err = NULL, **temp = &local_err;
3353     RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err);
3354     int ret = 0;
3355
3356     if (rdma == NULL) {
3357         ERROR(temp, "Failed to initialize RDMA data structures! %d", ret);
3358         goto err;
3359     }
3360
3361     ret = qemu_rdma_source_init(rdma, &local_err,
3362         s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]);
3363
3364     if (ret) {
3365         goto err;
3366     }
3367
3368     trace_rdma_start_outgoing_migration_after_rdma_source_init();
3369     ret = qemu_rdma_connect(rdma, &local_err);
3370
3371     if (ret) {
3372         goto err;
3373     }
3374
3375     trace_rdma_start_outgoing_migration_after_rdma_connect();
3376
3377     s->file = qemu_fopen_rdma(rdma, "wb");
3378     migrate_fd_connect(s);
3379     return;
3380 err:
3381     error_propagate(errp, local_err);
3382     g_free(rdma);
3383     migrate_fd_error(s);
3384 }
This page took 0.207282 seconds and 4 git commands to generate.