]> Git Repo - qemu.git/blobdiff - migration/rdma.c
migration: stop decompression to allocate and free memory frequently
[qemu.git] / migration / rdma.c
index fc351eabf2c20c836e08218045e48ebd1ee0e7f7..da474fc19f81cd33b1ce4eb4eb94a2f000976912 100644 (file)
@@ -2,29 +2,34 @@
  * RDMA protocol and interfaces
  *
  * Copyright IBM, Corp. 2010-2013
+ * Copyright Red Hat, Inc. 2015-2016
  *
  * Authors:
  *  Michael R. Hines <[email protected]>
  *  Jiuxing Liu <[email protected]>
+ *  Daniel P. Berrange <[email protected]>
  *
  * This work is licensed under the terms of the GNU GPL, version 2 or
  * later.  See the COPYING file in the top-level directory.
  *
  */
+#include "qemu/osdep.h"
+#include "qapi/error.h"
 #include "qemu-common.h"
-#include "migration/migration.h"
-#include "migration/qemu-file.h"
-#include "exec/cpu-common.h"
+#include "qemu/cutils.h"
+#include "rdma.h"
+#include "migration.h"
+#include "qemu-file.h"
+#include "ram.h"
+#include "qemu-file-channel.h"
+#include "qemu/error-report.h"
 #include "qemu/main-loop.h"
 #include "qemu/sockets.h"
 #include "qemu/bitmap.h"
-#include "block/coroutine.h"
-#include <stdio.h>
-#include <sys/types.h>
+#include "qemu/coroutine.h"
 #include <sys/socket.h>
 #include <netdb.h>
 #include <arpa/inet.h>
-#include <string.h>
 #include <rdma/rdma_cma.h>
 #include "trace.h"
 
@@ -83,7 +88,7 @@ static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
             } \
             return rdma->error_state; \
         } \
-    } while (0);
+    } while (0)
 
 /*
  * A work request ID is 64-bits and we split up these bits
@@ -121,7 +126,7 @@ enum {
     RDMA_WRID_RECV_CONTROL = 4000,
 };
 
-const char *wrid_desc[] = {
+static const char *wrid_desc[] = {
     [RDMA_WRID_NONE] = "NONE",
     [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
@@ -160,20 +165,6 @@ enum {
     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
 };
 
-const char *control_desc[] = {
-    [RDMA_CONTROL_NONE] = "NONE",
-    [RDMA_CONTROL_ERROR] = "ERROR",
-    [RDMA_CONTROL_READY] = "READY",
-    [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
-    [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
-    [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
-    [RDMA_CONTROL_COMPRESS] = "COMPRESS",
-    [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
-    [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
-    [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
-    [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
-    [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
-};
 
 /*
  * Memory and MR structures used to represent an IB Send/Recv work request.
@@ -214,17 +205,19 @@ static void network_to_caps(RDMACapabilities *cap)
  * the information. It's small anyway, so a list is overkill.
  */
 typedef struct RDMALocalBlock {
-    uint8_t  *local_host_addr; /* local virtual address */
-    uint64_t remote_host_addr; /* remote virtual address */
-    uint64_t offset;
-    uint64_t length;
-    struct   ibv_mr **pmr;     /* MRs for chunk-level registration */
-    struct   ibv_mr *mr;       /* MR for non-chunk-level registration */
-    uint32_t *remote_keys;     /* rkeys for chunk-level registration */
-    uint32_t remote_rkey;      /* rkeys for non-chunk-level registration */
-    int      index;            /* which block are we */
-    bool     is_ram_block;
-    int      nb_chunks;
+    char          *block_name;
+    uint8_t       *local_host_addr; /* local virtual address */
+    uint64_t       remote_host_addr; /* remote virtual address */
+    uint64_t       offset;
+    uint64_t       length;
+    struct         ibv_mr **pmr;    /* MRs for chunk-level registration */
+    struct         ibv_mr *mr;      /* MR for non-chunk-level registration */
+    uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
+    uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration */
+    int            index;           /* which block are we */
+    unsigned int   src_index;       /* (Only used on dest) */
+    bool           is_ram_block;
+    int            nb_chunks;
     unsigned long *transit_bitmap;
     unsigned long *unregister_bitmap;
 } RDMALocalBlock;
@@ -236,13 +229,37 @@ typedef struct RDMALocalBlock {
  * corresponding RDMALocalBlock with
  * the information needed to perform the actual RDMA.
  */
-typedef struct QEMU_PACKED RDMARemoteBlock {
+typedef struct QEMU_PACKED RDMADestBlock {
     uint64_t remote_host_addr;
     uint64_t offset;
     uint64_t length;
     uint32_t remote_rkey;
     uint32_t padding;
-} RDMARemoteBlock;
+} RDMADestBlock;
+
+static const char *control_desc(unsigned int rdma_control)
+{
+    static const char *strs[] = {
+        [RDMA_CONTROL_NONE] = "NONE",
+        [RDMA_CONTROL_ERROR] = "ERROR",
+        [RDMA_CONTROL_READY] = "READY",
+        [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
+        [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
+        [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
+        [RDMA_CONTROL_COMPRESS] = "COMPRESS",
+        [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
+        [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
+        [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
+        [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
+        [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
+    };
+
+    if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
+        return "??BAD CONTROL VALUE??";
+    }
+
+    return strs[rdma_control];
+}
 
 static uint64_t htonll(uint64_t v)
 {
@@ -258,20 +275,20 @@ static uint64_t ntohll(uint64_t v) {
     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
 }
 
-static void remote_block_to_network(RDMARemoteBlock *rb)
+static void dest_block_to_network(RDMADestBlock *db)
 {
-    rb->remote_host_addr = htonll(rb->remote_host_addr);
-    rb->offset = htonll(rb->offset);
-    rb->length = htonll(rb->length);
-    rb->remote_rkey = htonl(rb->remote_rkey);
+    db->remote_host_addr = htonll(db->remote_host_addr);
+    db->offset = htonll(db->offset);
+    db->length = htonll(db->length);
+    db->remote_rkey = htonl(db->remote_rkey);
 }
 
-static void network_to_remote_block(RDMARemoteBlock *rb)
+static void network_to_dest_block(RDMADestBlock *db)
 {
-    rb->remote_host_addr = ntohll(rb->remote_host_addr);
-    rb->offset = ntohll(rb->offset);
-    rb->length = ntohll(rb->length);
-    rb->remote_rkey = ntohl(rb->remote_rkey);
+    db->remote_host_addr = ntohll(db->remote_host_addr);
+    db->offset = ntohll(db->offset);
+    db->length = ntohll(db->length);
+    db->remote_rkey = ntohl(db->remote_rkey);
 }
 
 /*
@@ -345,12 +362,16 @@ typedef struct RDMAContext {
      */
     int error_state;
     int error_reported;
+    int received_error;
 
     /*
      * Description of ram blocks used throughout the code.
      */
     RDMALocalBlocks local_ram_blocks;
-    RDMARemoteBlock *block;
+    RDMADestBlock  *dest_blocks;
+
+    /* Index of the next RAMBlock received during block registration */
+    unsigned int    next_src_index;
 
     /*
      * Migration on *destination* started.
@@ -368,14 +389,20 @@ typedef struct RDMAContext {
     GHashTable *blockmap;
 } RDMAContext;
 
-/*
- * Interface to the rest of the migration call stack.
- */
-typedef struct QEMUFileRDMA {
+#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
+#define QIO_CHANNEL_RDMA(obj)                                     \
+    OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
+
+typedef struct QIOChannelRDMA QIOChannelRDMA;
+
+
+struct QIOChannelRDMA {
+    QIOChannel parent;
     RDMAContext *rdma;
+    QEMUFile *file;
     size_t len;
-    void *file;
-} QEMUFileRDMA;
+    bool blocking; /* XXX we don't actually honour this yet */
+};
 
 /*
  * Main structure for IB Send/Recv control messages.
@@ -410,7 +437,7 @@ static void network_to_control(RDMAControlHeader *control)
  */
 typedef struct QEMU_PACKED {
     union QEMU_PACKED {
-        uint64_t current_addr;  /* offset into the ramblock of the chunk */
+        uint64_t current_addr;  /* offset into the ram_addr_t space */
         uint64_t chunk;         /* chunk to lookup if unregistering */
     } key;
     uint32_t current_index; /* which ramblock the chunk belongs to */
@@ -418,8 +445,19 @@ typedef struct QEMU_PACKED {
     uint64_t chunks;            /* how many sequential chunks to register */
 } RDMARegister;
 
-static void register_to_network(RDMARegister *reg)
+static void register_to_network(RDMAContext *rdma, RDMARegister *reg)
 {
+    RDMALocalBlock *local_block;
+    local_block  = &rdma->local_ram_blocks.block[reg->current_index];
+
+    if (local_block->is_ram_block) {
+        /*
+         * current_addr as passed in is an address in the local ram_addr_t
+         * space, we need to translate this for the destination
+         */
+        reg->key.current_addr -= local_block->offset;
+        reg->key.current_addr += rdma->dest_blocks[reg->current_index].offset;
+    }
     reg->key.current_addr = htonll(reg->key.current_addr);
     reg->current_index = htonl(reg->current_index);
     reg->chunks = htonll(reg->chunks);
@@ -435,13 +473,19 @@ static void network_to_register(RDMARegister *reg)
 typedef struct QEMU_PACKED {
     uint32_t value;     /* if zero, we will madvise() */
     uint32_t block_idx; /* which ram block index */
-    uint64_t offset;    /* where in the remote ramblock this chunk */
+    uint64_t offset;    /* Address in remote ram_addr_t space */
     uint64_t length;    /* length of the chunk */
 } RDMACompress;
 
-static void compress_to_network(RDMACompress *comp)
+static void compress_to_network(RDMAContext *rdma, RDMACompress *comp)
 {
     comp->value = htonl(comp->value);
+    /*
+     * comp->offset as passed in is an address in the local ram_addr_t
+     * space, we need to translate this for the destination
+     */
+    comp->offset -= rdma->local_ram_blocks.block[comp->block_idx].offset;
+    comp->offset += rdma->dest_blocks[comp->block_idx].offset;
     comp->block_idx = htonl(comp->block_idx);
     comp->offset = htonll(comp->offset);
     comp->length = htonll(comp->length);
@@ -493,8 +537,8 @@ static inline uint64_t ram_chunk_index(const uint8_t *start,
 static inline uint8_t *ram_chunk_start(const RDMALocalBlock *rdma_ram_block,
                                        uint64_t i)
 {
-    return (uint8_t *) (((uintptr_t) rdma_ram_block->local_host_addr)
-                                    + (i << RDMA_REG_CHUNK_SHIFT));
+    return (uint8_t *)(uintptr_t)(rdma_ram_block->local_host_addr +
+                                  (i << RDMA_REG_CHUNK_SHIFT));
 }
 
 static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
@@ -510,25 +554,27 @@ static inline uint8_t *ram_chunk_end(const RDMALocalBlock *rdma_ram_block,
     return result;
 }
 
-static int __qemu_rdma_add_block(RDMAContext *rdma, void *host_addr,
+static int rdma_add_block(RDMAContext *rdma, const char *block_name,
+                         void *host_addr,
                          ram_addr_t block_offset, uint64_t length)
 {
     RDMALocalBlocks *local = &rdma->local_ram_blocks;
-    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
-        (void *) block_offset);
+    RDMALocalBlock *block;
     RDMALocalBlock *old = local->block;
 
-    assert(block == NULL);
-
-    local->block = g_malloc0(sizeof(RDMALocalBlock) * (local->nb_blocks + 1));
+    local->block = g_new0(RDMALocalBlock, local->nb_blocks + 1);
 
     if (local->nb_blocks) {
         int x;
 
-        for (x = 0; x < local->nb_blocks; x++) {
-            g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
-            g_hash_table_insert(rdma->blockmap, (void *)old[x].offset,
-                                                &local->block[x]);
+        if (rdma->blockmap) {
+            for (x = 0; x < local->nb_blocks; x++) {
+                g_hash_table_remove(rdma->blockmap,
+                                    (void *)(uintptr_t)old[x].offset);
+                g_hash_table_insert(rdma->blockmap,
+                                    (void *)(uintptr_t)old[x].offset,
+                                    &local->block[x]);
+            }
         }
         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
         g_free(old);
@@ -536,28 +582,32 @@ static int __qemu_rdma_add_block(RDMAContext *rdma, void *host_addr,
 
     block = &local->block[local->nb_blocks];
 
+    block->block_name = g_strdup(block_name);
     block->local_host_addr = host_addr;
     block->offset = block_offset;
     block->length = length;
     block->index = local->nb_blocks;
+    block->src_index = ~0U; /* Filled in by the receipt of the block list */
     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
     block->transit_bitmap = bitmap_new(block->nb_chunks);
     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
     block->unregister_bitmap = bitmap_new(block->nb_chunks);
     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
-    block->remote_keys = g_malloc0(block->nb_chunks * sizeof(uint32_t));
+    block->remote_keys = g_new0(uint32_t, block->nb_chunks);
 
     block->is_ram_block = local->init ? false : true;
 
-    g_hash_table_insert(rdma->blockmap, (void *) block_offset, block);
+    if (rdma->blockmap) {
+        g_hash_table_insert(rdma->blockmap, (void *)(uintptr_t)block_offset, block);
+    }
 
-    trace___qemu_rdma_add_block(local->nb_blocks,
-                           (uint64_t) block->local_host_addr, block->offset,
-                           block->length,
-                           (uint64_t) (block->local_host_addr + block->length),
-                           BITS_TO_LONGS(block->nb_chunks) *
-                               sizeof(unsigned long) * 8,
-                           block->nb_chunks);
+    trace_rdma_add_block(block_name, local->nb_blocks,
+                         (uintptr_t) block->local_host_addr,
+                         block->offset, block->length,
+                         (uintptr_t) (block->local_host_addr + block->length),
+                         BITS_TO_LONGS(block->nb_chunks) *
+                             sizeof(unsigned long) * 8,
+                         block->nb_chunks);
 
     local->nb_blocks++;
 
@@ -569,10 +619,10 @@ static int __qemu_rdma_add_block(RDMAContext *rdma, void *host_addr,
  * in advanced before the migration starts. This tells us where the RAM blocks
  * are so that we can register them individually.
  */
-static void qemu_rdma_init_one_block(void *host_addr,
+static int qemu_rdma_init_one_block(const char *block_name, void *host_addr,
     ram_addr_t block_offset, ram_addr_t length, void *opaque)
 {
-    __qemu_rdma_add_block(opaque, host_addr, block_offset, length);
+    return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
 }
 
 /*
@@ -585,26 +635,28 @@ static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
     RDMALocalBlocks *local = &rdma->local_ram_blocks;
 
     assert(rdma->blockmap == NULL);
-    rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
     memset(local, 0, sizeof *local);
     qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
     trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
-    rdma->block = (RDMARemoteBlock *) g_malloc0(sizeof(RDMARemoteBlock) *
-                        rdma->local_ram_blocks.nb_blocks);
+    rdma->dest_blocks = g_new0(RDMADestBlock,
+                               rdma->local_ram_blocks.nb_blocks);
     local->init = true;
     return 0;
 }
 
-static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
+/*
+ * Note: If used outside of cleanup, the caller must ensure that the destination
+ * block structures are also updated
+ */
+static int rdma_delete_block(RDMAContext *rdma, RDMALocalBlock *block)
 {
     RDMALocalBlocks *local = &rdma->local_ram_blocks;
-    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
-        (void *) block_offset);
     RDMALocalBlock *old = local->block;
     int x;
 
-    assert(block);
-
+    if (rdma->blockmap) {
+        g_hash_table_remove(rdma->blockmap, (void *)(uintptr_t)block->offset);
+    }
     if (block->pmr) {
         int j;
 
@@ -634,14 +686,19 @@ static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
     g_free(block->remote_keys);
     block->remote_keys = NULL;
 
-    for (x = 0; x < local->nb_blocks; x++) {
-        g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
+    g_free(block->block_name);
+    block->block_name = NULL;
+
+    if (rdma->blockmap) {
+        for (x = 0; x < local->nb_blocks; x++) {
+            g_hash_table_remove(rdma->blockmap,
+                                (void *)(uintptr_t)old[x].offset);
+        }
     }
 
     if (local->nb_blocks > 1) {
 
-        local->block = g_malloc0(sizeof(RDMALocalBlock) *
-                                    (local->nb_blocks - 1));
+        local->block = g_new0(RDMALocalBlock, local->nb_blocks - 1);
 
         if (block->index) {
             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
@@ -657,10 +714,9 @@ static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
         local->block = NULL;
     }
 
-    trace___qemu_rdma_delete_block(local->nb_blocks,
-                           (uint64_t)block->local_host_addr,
+    trace_rdma_delete_block(block, (uintptr_t)block->local_host_addr,
                            block->offset, block->length,
-                           (uint64_t)(block->local_host_addr + block->length),
+                            (uintptr_t)(block->local_host_addr + block->length),
                            BITS_TO_LONGS(block->nb_chunks) *
                                sizeof(unsigned long) * 8, block->nb_chunks);
 
@@ -668,10 +724,11 @@ static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
 
     local->nb_blocks--;
 
-    if (local->nb_blocks) {
+    if (local->nb_blocks && rdma->blockmap) {
         for (x = 0; x < local->nb_blocks; x++) {
-            g_hash_table_insert(rdma->blockmap, (void *)local->block[x].offset,
-                                                &local->block[x]);
+            g_hash_table_insert(rdma->blockmap,
+                                (void *)(uintptr_t)local->block[x].offset,
+                                &local->block[x]);
         }
     }
 
@@ -703,7 +760,7 @@ static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
                 verbs->device->ibdev_path,
                 port.link_layer,
                 (port.link_layer == IBV_LINK_LAYER_INFINIBAND) ? "Infiniband" :
-                 ((port.link_layer == IBV_LINK_LAYER_ETHERNET) 
+                 ((port.link_layer == IBV_LINK_LAYER_ETHERNET)
                     ? "Ethernet" : "Unknown"));
 }
 
@@ -738,21 +795,21 @@ static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
  * and validate what time of hardware it is.
  *
  * Unfortunately, this puts the user in a fix:
- * 
+ *
  *  If the source VM connects with an IPv4 address without knowing that the
  *  destination has bound to '[::]' the migration will unconditionally fail
- *  unless the management software is explicitly listening on the the IPv4
+ *  unless the management software is explicitly listening on the IPv4
  *  address while using a RoCE-based device.
  *
  *  If the source VM connects with an IPv6 address, then we're OK because we can
  *  throw an error on the source (and similarly on the destination).
- * 
+ *
  *  But in mixed environments, this will be broken for a while until it is fixed
  *  inside linux.
  *
  * We do provide a *tiny* bit of help in this function: We can list all of the
  * devices in the system and check to see if all the devices are RoCE or
- * Infiniband. 
+ * Infiniband.
  *
  * If we detect that we have a *pure* RoCE environment, then we can safely
  * thrown an error even if the management software has specified '[::]' as the
@@ -764,30 +821,37 @@ static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
  *
  * Patches are being reviewed on linux-rdma.
  */
-static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs)
+static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
 {
     struct ibv_port_attr port_attr;
 
     /* This bug only exists in linux, to our knowledge. */
 #ifdef CONFIG_LINUX
 
-    /* 
+    /*
      * Verbs are only NULL if management has bound to '[::]'.
-     * 
+     *
      * Let's iterate through all the devices and see if there any pure IB
      * devices (non-ethernet).
-     * 
+     *
      * If not, then we can safely proceed with the migration.
      * Otherwise, there are no guarantees until the bug is fixed in linux.
      */
     if (!verbs) {
-           int num_devices, x;
+        int num_devices, x;
         struct ibv_device ** dev_list = ibv_get_device_list(&num_devices);
         bool roce_found = false;
         bool ib_found = false;
 
         for (x = 0; x < num_devices; x++) {
             verbs = ibv_open_device(dev_list[x]);
+            if (!verbs) {
+                if (errno == EPERM) {
+                    continue;
+                } else {
+                    return -EINVAL;
+                }
+            }
 
             if (ibv_query_port(verbs, 1, &port_attr)) {
                 ibv_close_device(verbs);
@@ -826,8 +890,8 @@ static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs)
 
     /*
      * If we have a verbs context, that means that some other than '[::]' was
-     * used by the management software for binding. In which case we can actually 
-     * warn the user about a potential broken kernel;
+     * used by the management software for binding. In which case we can
+     * actually warn the user about a potentially broken kernel.
      */
 
     /* IB ports start with 1, not 0 */
@@ -898,7 +962,7 @@ static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
                 RDMA_RESOLVE_TIMEOUT_MS);
         if (!ret) {
             if (e->ai_family == AF_INET6) {
-                ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs);
+                ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
                 if (ret) {
                     continue;
                 }
@@ -1076,7 +1140,7 @@ static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
  * This search cannot fail or the migration will fail.
  */
 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
-                                      uint64_t block_offset,
+                                      uintptr_t block_offset,
                                       uint64_t offset,
                                       uint64_t length,
                                       uint64_t *block_index,
@@ -1104,7 +1168,7 @@ static int qemu_rdma_search_ram_block(RDMAContext *rdma,
  * to perform the actual RDMA operation.
  */
 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
-        RDMALocalBlock *block, uint8_t *host_addr,
+        RDMALocalBlock *block, uintptr_t host_addr,
         uint32_t *lkey, uint32_t *rkey, int chunk,
         uint8_t *chunk_start, uint8_t *chunk_end)
 {
@@ -1120,10 +1184,7 @@ static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
 
     /* allocate memory to store chunk MRs */
     if (!block->pmr) {
-        block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *));
-        if (!block->pmr) {
-            return -1;
-        }
+        block->pmr = g_new0(struct ibv_mr *, block->nb_chunks);
     }
 
     /*
@@ -1144,11 +1205,12 @@ static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
         if (!block->pmr[chunk]) {
             perror("Failed to register chunk!");
             fprintf(stderr, "Chunk details: block: %d chunk index %d"
-                            " start %" PRIu64 " end %" PRIu64 " host %" PRIu64
-                            " local %" PRIu64 " registrations: %d\n",
-                            block->index, chunk, (uint64_t) chunk_start,
-                            (uint64_t) chunk_end, (uint64_t) host_addr,
-                            (uint64_t) block->local_host_addr,
+                            " start %" PRIuPTR " end %" PRIuPTR
+                            " host %" PRIuPTR
+                            " local %" PRIuPTR " registrations: %d\n",
+                            block->index, chunk, (uintptr_t)chunk_start,
+                            (uintptr_t)chunk_end, host_addr,
+                            (uintptr_t)block->local_host_addr,
                             rdma->total_registrations);
             return -1;
         }
@@ -1215,7 +1277,7 @@ const char *print_wrid(int wrid)
 
 /*
  * Perform a non-optimized memory unregistration after every transfer
- * for demonsration purposes, only if pin-all is not requested.
+ * for demonstration purposes, only if pin-all is not requested.
  *
  * Potential optimizations:
  * 1. Start a new thread to run this function continuously
@@ -1281,7 +1343,7 @@ static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
         rdma->total_registrations--;
 
         reg.key.chunk = chunk;
-        register_to_network(&reg);
+        register_to_network(rdma, &reg);
         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
                                 &resp, NULL, NULL);
         if (ret < 0) {
@@ -1382,8 +1444,8 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
 
         trace_qemu_rdma_poll_write(print_wrid(wr_id), wr_id, rdma->nb_sent,
-                 index, chunk,
-                 block->local_host_addr, (void *)block->remote_host_addr);
+                                   index, chunk, block->local_host_addr,
+                                   (void *)(uintptr_t)block->remote_host_addr);
 
         clear_bit(chunk, block->transit_bitmap);
 
@@ -1414,6 +1476,56 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
     return  0;
 }
 
+/* Wait for activity on the completion channel.
+ * Returns 0 on success, none-0 on error.
+ */
+static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
+{
+    /*
+     * Coroutine doesn't start until migration_fd_process_incoming()
+     * so don't yield unless we know we're running inside of a coroutine.
+     */
+    if (rdma->migration_started_on_destination) {
+        yield_until_fd_readable(rdma->comp_channel->fd);
+    } else {
+        /* This is the source side, we're in a separate thread
+         * or destination prior to migration_fd_process_incoming()
+         * we can't yield; so we have to poll the fd.
+         * But we need to be able to handle 'cancel' or an error
+         * without hanging forever.
+         */
+        while (!rdma->error_state  && !rdma->received_error) {
+            GPollFD pfds[1];
+            pfds[0].fd = rdma->comp_channel->fd;
+            pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+            /* 0.1s timeout, should be fine for a 'cancel' */
+            switch (qemu_poll_ns(pfds, 1, 100 * 1000 * 1000)) {
+            case 1: /* fd active */
+                return 0;
+
+            case 0: /* Timeout, go around again */
+                break;
+
+            default: /* Error of some type -
+                      * I don't trust errno from qemu_poll_ns
+                     */
+                error_report("%s: poll failed", __func__);
+                return -EPIPE;
+            }
+
+            if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
+                /* Bail out and let the cancellation happen */
+                return -EPIPE;
+            }
+        }
+    }
+
+    if (rdma->received_error) {
+        return -EPIPE;
+    }
+    return rdma->error_state;
+}
+
 /*
  * Block until the next work request has completed.
  *
@@ -1461,22 +1573,21 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
     }
 
     while (1) {
-        /*
-         * Coroutine doesn't start until process_incoming_migration()
-         * so don't yield unless we know we're running inside of a coroutine.
-         */
-        if (rdma->migration_started_on_destination) {
-            yield_until_fd_readable(rdma->comp_channel->fd);
+        ret = qemu_rdma_wait_comp_channel(rdma);
+        if (ret) {
+            goto err_block_for_wrid;
         }
 
-        if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
+        ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
+        if (ret) {
             perror("ibv_get_cq_event");
             goto err_block_for_wrid;
         }
 
         num_cq_events++;
 
-        if (ibv_req_notify_cq(cq, 0)) {
+        ret = -ibv_req_notify_cq(cq, 0);
+        if (ret) {
             goto err_block_for_wrid;
         }
 
@@ -1512,6 +1623,8 @@ err_block_for_wrid:
     if (num_cq_events) {
         ibv_ack_cq_events(cq, num_cq_events);
     }
+
+    rdma->error_state = ret;
     return ret;
 }
 
@@ -1526,7 +1639,7 @@ static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
     struct ibv_send_wr *bad_wr;
     struct ibv_sge sge = {
-                           .addr = (uint64_t)(wr->control),
+                           .addr = (uintptr_t)(wr->control),
                            .length = head->len + sizeof(RDMAControlHeader),
                            .lkey = wr->control_mr->lkey,
                          };
@@ -1538,7 +1651,7 @@ static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
                                    .num_sge = 1,
                                 };
 
-    trace_qemu_rdma_post_send_control(control_desc[head->type]);
+    trace_qemu_rdma_post_send_control(control_desc(head->type));
 
     /*
      * We don't actually need to do a memcpy() in here if we used
@@ -1580,7 +1693,7 @@ static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
 {
     struct ibv_recv_wr *bad_wr;
     struct ibv_sge sge = {
-                            .addr = (uint64_t)(rdma->wr_data[idx].control),
+                            .addr = (uintptr_t)(rdma->wr_data[idx].control),
                             .length = RDMA_CONTROL_MAX_BUFFER,
                             .lkey = rdma->wr_data[idx].control_mr->lkey,
                          };
@@ -1617,20 +1730,23 @@ static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
     network_to_control((void *) rdma->wr_data[idx].control);
     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
 
-    trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
+    trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
 
     if (expecting == RDMA_CONTROL_NONE) {
-        trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
+        trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
                                              head->type);
     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
         error_report("Was expecting a %s (%d) control message"
                 ", but got: %s (%d), length: %d",
-                control_desc[expecting], expecting,
-                control_desc[head->type], head->type, head->len);
+                control_desc(expecting), expecting,
+                control_desc(head->type), head->type, head->len);
+        if (head->type == RDMA_CONTROL_ERROR) {
+            rdma->received_error = true;
+        }
         return -EIO;
     }
     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
-        error_report("too long length: %d\n", head->len);
+        error_report("too long length: %d", head->len);
         return -EINVAL;
     }
     if (sizeof(*head) + head->len != byte_len) {
@@ -1733,7 +1849,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
             }
         }
 
-        trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
+        trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
         ret = qemu_rdma_exchange_get_response(rdma, resp,
                                               resp->type, RDMA_WRID_DATA);
 
@@ -1745,7 +1861,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
         if (resp_idx) {
             *resp_idx = RDMA_WRID_DATA;
         }
-        trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
+        trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
     }
 
     rdma->control_ready_expected = 1;
@@ -1827,11 +1943,12 @@ static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
                              };
 
 retry:
-    sge.addr = (uint64_t)(block->local_host_addr +
+    sge.addr = (uintptr_t)(block->local_host_addr +
                             (current_addr - block->offset));
     sge.length = length;
 
-    chunk = ram_chunk_index(block->local_host_addr, (uint8_t *) sge.addr);
+    chunk = ram_chunk_index(block->local_host_addr,
+                            (uint8_t *)(uintptr_t)sge.addr);
     chunk_start = ram_chunk_start(block, chunk);
 
     if (block->is_ram_block) {
@@ -1884,9 +2001,7 @@ retry:
              * memset() + madvise() the entire chunk without RDMA.
              */
 
-            if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
-                   && buffer_find_nonzero_offset((void *)sge.addr,
-                                                    length) == length) {
+            if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
                 RDMACompress comp = {
                                         .offset = current_addr,
                                         .value = 0,
@@ -1900,7 +2015,7 @@ retry:
                 trace_qemu_rdma_write_one_zero(chunk, sge.length,
                                                current_index, current_addr);
 
-                compress_to_network(&comp);
+                compress_to_network(rdma, &comp);
                 ret = qemu_rdma_exchange_send(rdma, &head,
                                 (uint8_t *) &comp, NULL, NULL, NULL);
 
@@ -1927,7 +2042,7 @@ retry:
             trace_qemu_rdma_write_one_sendreg(chunk, sge.length, current_index,
                                               current_addr);
 
-            register_to_network(&reg);
+            register_to_network(rdma, &reg);
             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
                                     &resp, &reg_result_idx, NULL);
             if (ret < 0) {
@@ -1935,8 +2050,7 @@ retry:
             }
 
             /* try to overlap this single registration with the one we sent. */
-            if (qemu_rdma_register_and_get_keys(rdma, block,
-                                                (uint8_t *) sge.addr,
+            if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
                                                 &sge.lkey, NULL, chunk,
                                                 chunk_start, chunk_end)) {
                 error_report("cannot get lkey");
@@ -1955,8 +2069,7 @@ retry:
             block->remote_host_addr = reg_result->host_addr;
         } else {
             /* already registered before */
-            if (qemu_rdma_register_and_get_keys(rdma, block,
-                                                (uint8_t *)sge.addr,
+            if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
                                                 &sge.lkey, NULL, chunk,
                                                 chunk_start, chunk_end)) {
                 error_report("cannot get lkey!");
@@ -1968,7 +2081,7 @@ retry:
     } else {
         send_wr.wr.rdma.rkey = block->remote_rkey;
 
-        if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
+        if (qemu_rdma_register_and_get_keys(rdma, block, sge.addr,
                                                      &sge.lkey, NULL, chunk,
                                                      chunk_start, chunk_end)) {
             error_report("cannot get lkey!");
@@ -2156,7 +2269,9 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
     int ret, idx;
 
     if (rdma->cm_id && rdma->connected) {
-        if (rdma->error_state) {
+        if ((rdma->error_state ||
+             migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
+            !rdma->received_error) {
             RDMAControlHeader head = { .len = 0,
                                        .type = RDMA_CONTROL_ERROR,
                                        .repeat = 1,
@@ -2177,8 +2292,8 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
         rdma->connected = false;
     }
 
-    g_free(rdma->block);
-    rdma->block = NULL;
+    g_free(rdma->dest_blocks);
+    rdma->dest_blocks = NULL;
 
     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
         if (rdma->wr_data[idx].control_mr) {
@@ -2190,11 +2305,14 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
 
     if (rdma->local_ram_blocks.block) {
         while (rdma->local_ram_blocks.nb_blocks) {
-            __qemu_rdma_delete_block(rdma,
-                    rdma->local_ram_blocks.block->offset);
+            rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
         }
     }
 
+    if (rdma->qp) {
+        rdma_destroy_qp(rdma->cm_id);
+        rdma->qp = NULL;
+    }
     if (rdma->cq) {
         ibv_destroy_cq(rdma->cq);
         rdma->cq = NULL;
@@ -2207,18 +2325,14 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
         ibv_dealloc_pd(rdma->pd);
         rdma->pd = NULL;
     }
-    if (rdma->listen_id) {
-        rdma_destroy_id(rdma->listen_id);
-        rdma->listen_id = NULL;
-    }
     if (rdma->cm_id) {
-        if (rdma->qp) {
-            rdma_destroy_qp(rdma->cm_id);
-            rdma->qp = NULL;
-        }
         rdma_destroy_id(rdma->cm_id);
         rdma->cm_id = NULL;
     }
+    if (rdma->listen_id) {
+        rdma_destroy_id(rdma->listen_id);
+        rdma->listen_id = NULL;
+    }
     if (rdma->channel) {
         rdma_destroy_event_channel(rdma->channel);
         rdma->channel = NULL;
@@ -2228,7 +2342,7 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
 }
 
 
-static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
+static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
 {
     int ret, idx;
     Error *local_err = NULL, **temp = &local_err;
@@ -2264,6 +2378,14 @@ static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
         goto err_rdma_source_init;
     }
 
+    /* Build the hash that maps from offset to RAMBlock */
+    rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
+    for (idx = 0; idx < rdma->local_ram_blocks.nb_blocks; idx++) {
+        g_hash_table_insert(rdma->blockmap,
+                (void *)(uintptr_t)rdma->local_ram_blocks.block[idx].offset,
+                &rdma->local_ram_blocks.block[idx]);
+    }
+
     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
         ret = qemu_rdma_reg_control(rdma, idx);
         if (ret) {
@@ -2306,12 +2428,16 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
 
     caps_to_network(&cap);
 
+    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
+    if (ret) {
+        ERROR(errp, "posting second control recv");
+        goto err_rdma_source_connect;
+    }
+
     ret = rdma_connect(rdma->cm_id, &conn_param);
     if (ret) {
         perror("rdma_connect");
         ERROR(errp, "connecting to destination!");
-        rdma_destroy_id(rdma->cm_id);
-        rdma->cm_id = NULL;
         goto err_rdma_source_connect;
     }
 
@@ -2320,8 +2446,6 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
         perror("rdma_get_cm_event after rdma_connect");
         ERROR(errp, "connecting to destination!");
         rdma_ack_cm_event(cm_event);
-        rdma_destroy_id(rdma->cm_id);
-        rdma->cm_id = NULL;
         goto err_rdma_source_connect;
     }
 
@@ -2329,8 +2453,6 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
         perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
         ERROR(errp, "connecting to destination!");
         rdma_ack_cm_event(cm_event);
-        rdma_destroy_id(rdma->cm_id);
-        rdma->cm_id = NULL;
         goto err_rdma_source_connect;
     }
     rdma->connected = true;
@@ -2352,12 +2474,6 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
 
     rdma_ack_cm_event(cm_event);
 
-    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
-    if (ret) {
-        ERROR(errp, "posting second control recv!");
-        goto err_rdma_source_connect;
-    }
-
     rdma->control_ready_expected = 1;
     rdma->nb_sent = 0;
     return 0;
@@ -2369,10 +2485,10 @@ err_rdma_source_connect:
 
 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
 {
-    int ret = -EINVAL, idx;
+    int ret, idx;
     struct rdma_cm_id *listen_id;
     char ip[40] = "unknown";
-    struct rdma_addrinfo *res;
+    struct rdma_addrinfo *res, *e;
     char port_str[16];
 
     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
@@ -2380,7 +2496,7 @@ static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
         rdma->wr_data[idx].control_curr = NULL;
     }
 
-    if (rdma->host == NULL) {
+    if (!rdma->host || !rdma->host[0]) {
         ERROR(errp, "RDMA host is not set!");
         rdma->error_state = -EINVAL;
         return -1;
@@ -2403,40 +2519,33 @@ static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
     snprintf(port_str, 16, "%d", rdma->port);
     port_str[15] = '\0';
 
-    if (rdma->host && strcmp("", rdma->host)) {
-        struct rdma_addrinfo *e;
+    ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
+    if (ret < 0) {
+        ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
+        goto err_dest_init_bind_addr;
+    }
 
-        ret = rdma_getaddrinfo(rdma->host, port_str, NULL, &res);
-        if (ret < 0) {
-            ERROR(errp, "could not rdma_getaddrinfo address %s", rdma->host);
-            goto err_dest_init_bind_addr;
+    for (e = res; e != NULL; e = e->ai_next) {
+        inet_ntop(e->ai_family,
+            &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
+        trace_qemu_rdma_dest_init_trying(rdma->host, ip);
+        ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
+        if (ret) {
+            continue;
         }
-
-        for (e = res; e != NULL; e = e->ai_next) {
-            inet_ntop(e->ai_family,
-                &((struct sockaddr_in *) e->ai_dst_addr)->sin_addr, ip, sizeof ip);
-            trace_qemu_rdma_dest_init_trying(rdma->host, ip);
-            ret = rdma_bind_addr(listen_id, e->ai_dst_addr);
-            if (!ret) {
-                if (e->ai_family == AF_INET6) {
-                    ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs);
-                    if (ret) {
-                        continue;
-                    }
-                }
-                    
-                goto listen;
+        if (e->ai_family == AF_INET6) {
+            ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
+            if (ret) {
+                continue;
             }
         }
+        break;
+    }
 
+    if (!e) {
         ERROR(errp, "Error: could not rdma_bind_addr!");
         goto err_dest_init_bind_addr;
-    } else {
-        ERROR(errp, "migration host and port not specified!");
-        ret = -EINVAL;
-        goto err_dest_init_bind_addr;
     }
-listen:
 
     rdma->listen_id = listen_id;
     qemu_rdma_dump_gid("dest_init", listen_id);
@@ -2458,13 +2567,12 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp)
     InetSocketAddress *addr;
 
     if (host_port) {
-        rdma = g_malloc0(sizeof(RDMAContext));
-        memset(rdma, 0, sizeof(RDMAContext));
+        rdma = g_new0(RDMAContext, 1);
         rdma->current_index = -1;
         rdma->current_chunk = -1;
 
-        addr = inet_parse(host_port, NULL);
-        if (addr != NULL) {
+        addr = g_new(InetSocketAddress, 1);
+        if (!inet_parse(addr, host_port, NULL)) {
             rdma->port = atoi(addr->port);
             rdma->host = g_strdup(addr->host);
         } else {
@@ -2484,15 +2592,19 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp)
  * SEND messages for control only.
  * VM's ram is handled with regular RDMA messages.
  */
-static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
-                                int64_t pos, int size)
-{
-    QEMUFileRDMA *r = opaque;
-    QEMUFile *f = r->file;
-    RDMAContext *rdma = r->rdma;
-    size_t remaining = size;
-    uint8_t * data = (void *) buf;
+static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
+                                       const struct iovec *iov,
+                                       size_t niov,
+                                       int *fds,
+                                       size_t nfds,
+                                       Error **errp)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    QEMUFile *f = rioc->file;
+    RDMAContext *rdma = rioc->rdma;
     int ret;
+    ssize_t done = 0;
+    size_t i;
 
     CHECK_ERROR_STATE();
 
@@ -2506,30 +2618,35 @@ static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
         return ret;
     }
 
-    while (remaining) {
-        RDMAControlHeader head;
+    for (i = 0; i < niov; i++) {
+        size_t remaining = iov[i].iov_len;
+        uint8_t * data = (void *)iov[i].iov_base;
+        while (remaining) {
+            RDMAControlHeader head;
 
-        r->len = MIN(remaining, RDMA_SEND_INCREMENT);
-        remaining -= r->len;
+            rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
+            remaining -= rioc->len;
 
-        head.len = r->len;
-        head.type = RDMA_CONTROL_QEMU_FILE;
+            head.len = rioc->len;
+            head.type = RDMA_CONTROL_QEMU_FILE;
 
-        ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
+            ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
 
-        if (ret < 0) {
-            rdma->error_state = ret;
-            return ret;
-        }
+            if (ret < 0) {
+                rdma->error_state = ret;
+                return ret;
+            }
 
-        data += r->len;
+            data += rioc->len;
+            done += rioc->len;
+        }
     }
 
-    return size;
+    return done;
 }
 
 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
-                             int size, int idx)
+                             size_t size, int idx)
 {
     size_t len = 0;
 
@@ -2550,41 +2667,74 @@ static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
  * RDMA links don't use bytestreams, so we have to
  * return bytes to QEMUFile opportunistically.
  */
-static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
-                                int64_t pos, int size)
-{
-    QEMUFileRDMA *r = opaque;
-    RDMAContext *rdma = r->rdma;
+static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
+                                      const struct iovec *iov,
+                                      size_t niov,
+                                      int **fds,
+                                      size_t *nfds,
+                                      Error **errp)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    RDMAContext *rdma = rioc->rdma;
     RDMAControlHeader head;
     int ret = 0;
+    ssize_t i;
+    size_t done = 0;
 
     CHECK_ERROR_STATE();
 
-    /*
-     * First, we hold on to the last SEND message we
-     * were given and dish out the bytes until we run
-     * out of bytes.
-     */
-    r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
-    if (r->len) {
-        return r->len;
-    }
+    for (i = 0; i < niov; i++) {
+        size_t want = iov[i].iov_len;
+        uint8_t *data = (void *)iov[i].iov_base;
 
-    /*
-     * Once we run out, we block and wait for another
-     * SEND message to arrive.
-     */
-    ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+        /*
+         * First, we hold on to the last SEND message we
+         * were given and dish out the bytes until we run
+         * out of bytes.
+         */
+        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+        done += ret;
+        want -= ret;
+        /* Got what we needed, so go to next iovec */
+        if (want == 0) {
+            continue;
+        }
 
-    if (ret < 0) {
-        rdma->error_state = ret;
-        return ret;
-    }
+        /* If we got any data so far, then don't wait
+         * for more, just return what we have */
+        if (done > 0) {
+            break;
+        }
 
-    /*
-     * SEND was received with new bytes, now try again.
-     */
-    return qemu_rdma_fill(r->rdma, buf, size, 0);
+
+        /* We've got nothing at all, so lets wait for
+         * more to arrive
+         */
+        ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+
+        if (ret < 0) {
+            rdma->error_state = ret;
+            return ret;
+        }
+
+        /*
+         * SEND was received with new bytes, now try again.
+         */
+        ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+        done += ret;
+        want -= ret;
+
+        /* Still didn't get enough, so lets just return */
+        if (want) {
+            if (done == 0) {
+                return QIO_CHANNEL_ERR_BLOCK;
+            } else {
+                break;
+            }
+        }
+    }
+    rioc->len = done;
+    return rioc->len;
 }
 
 /*
@@ -2611,15 +2761,125 @@ static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
     return 0;
 }
 
-static int qemu_rdma_close(void *opaque)
+
+static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
+                                         bool blocking,
+                                         Error **errp)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    /* XXX we should make readv/writev actually honour this :-) */
+    rioc->blocking = blocking;
+    return 0;
+}
+
+
+typedef struct QIOChannelRDMASource QIOChannelRDMASource;
+struct QIOChannelRDMASource {
+    GSource parent;
+    QIOChannelRDMA *rioc;
+    GIOCondition condition;
+};
+
+static gboolean
+qio_channel_rdma_source_prepare(GSource *source,
+                                gint *timeout)
+{
+    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
+    RDMAContext *rdma = rsource->rioc->rdma;
+    GIOCondition cond = 0;
+    *timeout = -1;
+
+    if (rdma->wr_data[0].control_len) {
+        cond |= G_IO_IN;
+    }
+    cond |= G_IO_OUT;
+
+    return cond & rsource->condition;
+}
+
+static gboolean
+qio_channel_rdma_source_check(GSource *source)
+{
+    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
+    RDMAContext *rdma = rsource->rioc->rdma;
+    GIOCondition cond = 0;
+
+    if (rdma->wr_data[0].control_len) {
+        cond |= G_IO_IN;
+    }
+    cond |= G_IO_OUT;
+
+    return cond & rsource->condition;
+}
+
+static gboolean
+qio_channel_rdma_source_dispatch(GSource *source,
+                                 GSourceFunc callback,
+                                 gpointer user_data)
+{
+    QIOChannelFunc func = (QIOChannelFunc)callback;
+    QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
+    RDMAContext *rdma = rsource->rioc->rdma;
+    GIOCondition cond = 0;
+
+    if (rdma->wr_data[0].control_len) {
+        cond |= G_IO_IN;
+    }
+    cond |= G_IO_OUT;
+
+    return (*func)(QIO_CHANNEL(rsource->rioc),
+                   (cond & rsource->condition),
+                   user_data);
+}
+
+static void
+qio_channel_rdma_source_finalize(GSource *source)
+{
+    QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
+
+    object_unref(OBJECT(ssource->rioc));
+}
+
+GSourceFuncs qio_channel_rdma_source_funcs = {
+    qio_channel_rdma_source_prepare,
+    qio_channel_rdma_source_check,
+    qio_channel_rdma_source_dispatch,
+    qio_channel_rdma_source_finalize
+};
+
+static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
+                                              GIOCondition condition)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    QIOChannelRDMASource *ssource;
+    GSource *source;
+
+    source = g_source_new(&qio_channel_rdma_source_funcs,
+                          sizeof(QIOChannelRDMASource));
+    ssource = (QIOChannelRDMASource *)source;
+
+    ssource->rioc = rioc;
+    object_ref(OBJECT(rioc));
+
+    ssource->condition = condition;
+
+    return source;
+}
+
+
+static int qio_channel_rdma_close(QIOChannel *ioc,
+                                  Error **errp)
 {
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
     trace_qemu_rdma_close();
-    QEMUFileRDMA *r = opaque;
-    if (r->rdma) {
-        qemu_rdma_cleanup(r->rdma);
-        g_free(r->rdma);
+    if (rioc->rdma) {
+        if (!rioc->rdma->error_state) {
+            rioc->rdma->error_state = qemu_file_get_error(rioc->file);
+        }
+        qemu_rdma_cleanup(rioc->rdma);
+        g_free(rioc->rdma);
+        rioc->rdma = NULL;
     }
-    g_free(r);
     return 0;
 }
 
@@ -2659,10 +2919,10 @@ static int qemu_rdma_close(void *opaque)
  */
 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
                                   ram_addr_t block_offset, ram_addr_t offset,
-                                  size_t size, int *bytes_sent)
+                                  size_t size, uint64_t *bytes_sent)
 {
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+    RDMAContext *rdma = rioc->rdma;
     int ret;
 
     CHECK_ERROR_STATE();
@@ -2848,7 +3108,7 @@ static int qemu_rdma_accept(RDMAContext *rdma)
         }
     }
 
-    qemu_set_fd_handler2(rdma->channel->fd, NULL, NULL, NULL, NULL);
+    qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
 
     ret = rdma_accept(rdma->cm_id, &conn_param);
     if (ret) {
@@ -2887,6 +3147,14 @@ err_rdma_dest_wait:
     return ret;
 }
 
+static int dest_ram_sort_func(const void *a, const void *b)
+{
+    unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
+    unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
+
+    return (a_index < b_index) ? -1 : (a_index != b_index);
+}
+
 /*
  * During each iteration of the migration, we listen for instructions
  * by the source VM to perform dynamic page registrations before they
@@ -2896,8 +3164,7 @@ err_rdma_dest_wait:
  *
  * Keep doing this until the source tells us to stop.
  */
-static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
-                                         uint64_t flags)
+static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
 {
     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
                                .type = RDMA_CONTROL_REGISTER_RESULT,
@@ -2909,8 +3176,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
                              };
     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
                                  .repeat = 1 };
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+    RDMAContext *rdma = rioc->rdma;
     RDMALocalBlocks *local = &rdma->local_ram_blocks;
     RDMAControlHeader head;
     RDMARegister *reg, *registers;
@@ -2927,7 +3194,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
     CHECK_ERROR_STATE();
 
     do {
-        trace_qemu_rdma_registration_handle_wait(flags);
+        trace_qemu_rdma_registration_handle_wait();
 
         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
 
@@ -2950,6 +3217,13 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
             trace_qemu_rdma_registration_handle_compress(comp->length,
                                                          comp->block_idx,
                                                          comp->offset);
+            if (comp->block_idx >= rdma->local_ram_blocks.nb_blocks) {
+                error_report("rdma: 'compress' bad block index %u (vs %d)",
+                             (unsigned int)comp->block_idx,
+                             rdma->local_ram_blocks.nb_blocks);
+                ret = -EIO;
+                goto out;
+            }
             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
 
             host_addr = block->local_host_addr +
@@ -2965,6 +3239,13 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
             trace_qemu_rdma_registration_handle_ram_blocks();
 
+            /* Sort our local RAM Block list so it's the same as the source,
+             * we can do this since we've filled in a src_index in the list
+             * as we received the RAMBlock list earlier.
+             */
+            qsort(rdma->local_ram_blocks.block,
+                  rdma->local_ram_blocks.nb_blocks,
+                  sizeof(RDMALocalBlock), dest_ram_sort_func);
             if (rdma->pin_all) {
                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
                 if (ret) {
@@ -2981,25 +3262,31 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
              * their "local" descriptions with what was sent.
              */
             for (i = 0; i < local->nb_blocks; i++) {
-                rdma->block[i].remote_host_addr =
-                    (uint64_t)(local->block[i].local_host_addr);
+                rdma->dest_blocks[i].remote_host_addr =
+                    (uintptr_t)(local->block[i].local_host_addr);
 
                 if (rdma->pin_all) {
-                    rdma->block[i].remote_rkey = local->block[i].mr->rkey;
+                    rdma->dest_blocks[i].remote_rkey = local->block[i].mr->rkey;
                 }
 
-                rdma->block[i].offset = local->block[i].offset;
-                rdma->block[i].length = local->block[i].length;
+                rdma->dest_blocks[i].offset = local->block[i].offset;
+                rdma->dest_blocks[i].length = local->block[i].length;
 
-                remote_block_to_network(&rdma->block[i]);
+                dest_block_to_network(&rdma->dest_blocks[i]);
+                trace_qemu_rdma_registration_handle_ram_blocks_loop(
+                    local->block[i].block_name,
+                    local->block[i].offset,
+                    local->block[i].length,
+                    local->block[i].local_host_addr,
+                    local->block[i].src_index);
             }
 
             blocks.len = rdma->local_ram_blocks.nb_blocks
-                                                * sizeof(RDMARemoteBlock);
+                                                * sizeof(RDMADestBlock);
 
 
             ret = qemu_rdma_post_send_control(rdma,
-                                        (uint8_t *) rdma->block, &blocks);
+                                        (uint8_t *) rdma->dest_blocks, &blocks);
 
             if (ret < 0) {
                 error_report("rdma migration: error sending remote info");
@@ -3025,8 +3312,23 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
                 trace_qemu_rdma_registration_handle_register_loop(count,
                          reg->current_index, reg->key.current_addr, reg->chunks);
 
+                if (reg->current_index >= rdma->local_ram_blocks.nb_blocks) {
+                    error_report("rdma: 'register' bad block index %u (vs %d)",
+                                 (unsigned int)reg->current_index,
+                                 rdma->local_ram_blocks.nb_blocks);
+                    ret = -ENOENT;
+                    goto out;
+                }
                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
                 if (block->is_ram_block) {
+                    if (block->offset > reg->key.current_addr) {
+                        error_report("rdma: bad register address for block %s"
+                            " offset: %" PRIx64 " current_addr: %" PRIx64,
+                            block->block_name, block->offset,
+                            reg->key.current_addr);
+                        ret = -ERANGE;
+                        goto out;
+                    }
                     host_addr = (block->local_host_addr +
                                 (reg->key.current_addr - block->offset));
                     chunk = ram_chunk_index(block->local_host_addr,
@@ -3035,18 +3337,26 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
                     chunk = reg->key.chunk;
                     host_addr = block->local_host_addr +
                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
+                    /* Check for particularly bad chunk value */
+                    if (host_addr < (void *)block->local_host_addr) {
+                        error_report("rdma: bad chunk for block %s"
+                            " chunk: %" PRIx64,
+                            block->block_name, reg->key.chunk);
+                        ret = -ERANGE;
+                        goto out;
+                    }
                 }
                 chunk_start = ram_chunk_start(block, chunk);
                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
                 if (qemu_rdma_register_and_get_keys(rdma, block,
-                            (uint8_t *)host_addr, NULL, &reg_result->rkey,
+                            (uintptr_t)host_addr, NULL, &reg_result->rkey,
                             chunk, chunk_start, chunk_end)) {
                     error_report("cannot get rkey");
                     ret = -EINVAL;
                     goto out;
                 }
 
-                reg_result->host_addr = (uint64_t) block->local_host_addr;
+                reg_result->host_addr = (uintptr_t)block->local_host_addr;
 
                 trace_qemu_rdma_registration_handle_register_rkey(
                                                            reg_result->rkey);
@@ -3103,7 +3413,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
             ret = -EIO;
             goto out;
         default:
-            error_report("Unknown control message %s", control_desc[head.type]);
+            error_report("Unknown control message %s", control_desc(head.type));
             ret = -EIO;
             goto out;
         }
@@ -3115,11 +3425,60 @@ out:
     return ret;
 }
 
+/* Destination:
+ * Called via a ram_control_load_hook during the initial RAM load section which
+ * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
+ * on the source.
+ * We've already built our local RAMBlock list, but not yet sent the list to
+ * the source.
+ */
+static int
+rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
+{
+    RDMAContext *rdma = rioc->rdma;
+    int curr;
+    int found = -1;
+
+    /* Find the matching RAMBlock in our local list */
+    for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
+        if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
+            found = curr;
+            break;
+        }
+    }
+
+    if (found == -1) {
+        error_report("RAMBlock '%s' not found on destination", name);
+        return -ENOENT;
+    }
+
+    rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
+    trace_rdma_block_notification_handle(name, rdma->next_src_index);
+    rdma->next_src_index++;
+
+    return 0;
+}
+
+static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
+{
+    switch (flags) {
+    case RAM_CONTROL_BLOCK_REG:
+        return rdma_block_notification_handle(opaque, data);
+
+    case RAM_CONTROL_HOOK:
+        return qemu_rdma_registration_handle(f, opaque);
+
+    default:
+        /* Shouldn't be called with any other values */
+        abort();
+    }
+}
+
 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
-                                        uint64_t flags)
+                                        uint64_t flags, void *data)
 {
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+    RDMAContext *rdma = rioc->rdma;
 
     CHECK_ERROR_STATE();
 
@@ -3135,11 +3494,11 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
  * First, flush writes, if any.
  */
 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
-                                       uint64_t flags)
+                                       uint64_t flags, void *data)
 {
     Error *local_err = NULL, **errp = &local_err;
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+    RDMAContext *rdma = rioc->rdma;
     RDMAControlHeader head = { .len = 0, .repeat = 1 };
     int ret = 0;
 
@@ -3155,7 +3514,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
     if (flags == RAM_CONTROL_SETUP) {
         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
         RDMALocalBlocks *local = &rdma->local_ram_blocks;
-        int reg_result_idx, i, j, nb_remote_blocks;
+        int reg_result_idx, i, nb_dest_blocks;
 
         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
         trace_qemu_rdma_registration_stop_ram();
@@ -3176,7 +3535,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
             return ret;
         }
 
-        nb_remote_blocks = resp.len / sizeof(RDMARemoteBlock);
+        nb_dest_blocks = resp.len / sizeof(RDMADestBlock);
 
         /*
          * The protocol uses two different sets of rkeys (mutually exclusive):
@@ -3190,43 +3549,33 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
          * and then propagates the remote ram block descriptions to his local copy.
          */
 
-        if (local->nb_blocks != nb_remote_blocks) {
-            ERROR(errp, "ram blocks mismatch #1! "
+        if (local->nb_blocks != nb_dest_blocks) {
+            ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
                         "Your QEMU command line parameters are probably "
-                        "not identical on both the source and destination.");
+                        "not identical on both the source and destination.",
+                        local->nb_blocks, nb_dest_blocks);
+            rdma->error_state = -EINVAL;
             return -EINVAL;
         }
 
         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
-        memcpy(rdma->block,
+        memcpy(rdma->dest_blocks,
             rdma->wr_data[reg_result_idx].control_curr, resp.len);
-        for (i = 0; i < nb_remote_blocks; i++) {
-            network_to_remote_block(&rdma->block[i]);
-
-            /* search local ram blocks */
-            for (j = 0; j < local->nb_blocks; j++) {
-                if (rdma->block[i].offset != local->block[j].offset) {
-                    continue;
-                }
-
-                if (rdma->block[i].length != local->block[j].length) {
-                    ERROR(errp, "ram blocks mismatch #2! "
-                        "Your QEMU command line parameters are probably "
-                        "not identical on both the source and destination.");
-                    return -EINVAL;
-                }
-                local->block[j].remote_host_addr =
-                        rdma->block[i].remote_host_addr;
-                local->block[j].remote_rkey = rdma->block[i].remote_rkey;
-                break;
-            }
-
-            if (j >= local->nb_blocks) {
-                ERROR(errp, "ram blocks mismatch #3! "
-                        "Your QEMU command line parameters are probably "
-                        "not identical on both the source and destination.");
+        for (i = 0; i < nb_dest_blocks; i++) {
+            network_to_dest_block(&rdma->dest_blocks[i]);
+
+            /* We require that the blocks are in the same order */
+            if (rdma->dest_blocks[i].length != local->block[i].length) {
+                ERROR(errp, "Block %s/%d has a different length %" PRIu64
+                            "vs %" PRIu64, local->block[i].block_name, i,
+                            local->block[i].length,
+                            rdma->dest_blocks[i].length);
+                rdma->error_state = -EINVAL;
                 return -EINVAL;
             }
+            local->block[i].remote_host_addr =
+                    rdma->dest_blocks[i].remote_host_addr;
+            local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
         }
     }
 
@@ -3245,46 +3594,74 @@ err:
     return ret;
 }
 
-static int qemu_rdma_get_fd(void *opaque)
-{
-    QEMUFileRDMA *rfile = opaque;
-    RDMAContext *rdma = rfile->rdma;
-
-    return rdma->comp_channel->fd;
-}
-
-const QEMUFileOps rdma_read_ops = {
-    .get_buffer    = qemu_rdma_get_buffer,
-    .get_fd        = qemu_rdma_get_fd,
-    .close         = qemu_rdma_close,
-    .hook_ram_load = qemu_rdma_registration_handle,
+static const QEMUFileHooks rdma_read_hooks = {
+    .hook_ram_load = rdma_load_hook,
 };
 
-const QEMUFileOps rdma_write_ops = {
-    .put_buffer         = qemu_rdma_put_buffer,
-    .close              = qemu_rdma_close,
+static const QEMUFileHooks rdma_write_hooks = {
     .before_ram_iterate = qemu_rdma_registration_start,
     .after_ram_iterate  = qemu_rdma_registration_stop,
     .save_page          = qemu_rdma_save_page,
 };
 
-static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
+
+static void qio_channel_rdma_finalize(Object *obj)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
+    if (rioc->rdma) {
+        qemu_rdma_cleanup(rioc->rdma);
+        g_free(rioc->rdma);
+        rioc->rdma = NULL;
+    }
+}
+
+static void qio_channel_rdma_class_init(ObjectClass *klass,
+                                        void *class_data G_GNUC_UNUSED)
 {
-    QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
+    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+    ioc_klass->io_writev = qio_channel_rdma_writev;
+    ioc_klass->io_readv = qio_channel_rdma_readv;
+    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
+    ioc_klass->io_close = qio_channel_rdma_close;
+    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
+}
+
+static const TypeInfo qio_channel_rdma_info = {
+    .parent = TYPE_QIO_CHANNEL,
+    .name = TYPE_QIO_CHANNEL_RDMA,
+    .instance_size = sizeof(QIOChannelRDMA),
+    .instance_finalize = qio_channel_rdma_finalize,
+    .class_init = qio_channel_rdma_class_init,
+};
+
+static void qio_channel_rdma_register_types(void)
+{
+    type_register_static(&qio_channel_rdma_info);
+}
+
+type_init(qio_channel_rdma_register_types);
+
+static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
+{
+    QIOChannelRDMA *rioc;
 
     if (qemu_file_mode_is_not_valid(mode)) {
         return NULL;
     }
 
-    r->rdma = rdma;
+    rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
+    rioc->rdma = rdma;
 
     if (mode[0] == 'w') {
-        r->file = qemu_fopen_ops(r, &rdma_write_ops);
+        rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
+        qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
     } else {
-        r->file = qemu_fopen_ops(r, &rdma_read_ops);
+        rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
+        qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
     }
 
-    return r->file;
+    return rioc->file;
 }
 
 static void rdma_accept_incoming_migration(void *opaque)
@@ -3294,7 +3671,7 @@ static void rdma_accept_incoming_migration(void *opaque)
     QEMUFile *f;
     Error *local_err = NULL, **errp = &local_err;
 
-    trace_qemu_dma_accept_incoming_migration();
+    trace_qemu_rdma_accept_incoming_migration();
     ret = qemu_rdma_accept(rdma);
 
     if (ret) {
@@ -3302,7 +3679,7 @@ static void rdma_accept_incoming_migration(void *opaque)
         return;
     }
 
-    trace_qemu_dma_accept_incoming_migration_accepted();
+    trace_qemu_rdma_accept_incoming_migration_accepted();
 
     f = qemu_fopen_rdma(rdma, "rb");
     if (f == NULL) {
@@ -3312,7 +3689,7 @@ static void rdma_accept_incoming_migration(void *opaque)
     }
 
     rdma->migration_started_on_destination = 1;
-    process_incoming_migration(f);
+    migration_fd_process_incoming(f);
 }
 
 void rdma_start_incoming_migration(const char *host_port, Error **errp)
@@ -3345,9 +3722,8 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp)
 
     trace_rdma_start_incoming_migration_after_rdma_listen();
 
-    qemu_set_fd_handler2(rdma->channel->fd, NULL,
-                         rdma_accept_incoming_migration, NULL,
-                            (void *)(intptr_t) rdma);
+    qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
+                        NULL, (void *)(intptr_t)rdma);
     return;
 err:
     error_propagate(errp, local_err);
@@ -3358,24 +3734,22 @@ void rdma_start_outgoing_migration(void *opaque,
                             const char *host_port, Error **errp)
 {
     MigrationState *s = opaque;
-    Error *local_err = NULL, **temp = &local_err;
-    RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err);
+    RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
     int ret = 0;
 
     if (rdma == NULL) {
-        ERROR(temp, "Failed to initialize RDMA data structures! %d", ret);
         goto err;
     }
 
-    ret = qemu_rdma_source_init(rdma, &local_err,
-        s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]);
+    ret = qemu_rdma_source_init(rdma,
+        s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
 
     if (ret) {
         goto err;
     }
 
     trace_rdma_start_outgoing_migration_after_rdma_source_init();
-    ret = qemu_rdma_connect(rdma, &local_err);
+    ret = qemu_rdma_connect(rdma, errp);
 
     if (ret) {
         goto err;
@@ -3383,11 +3757,9 @@ void rdma_start_outgoing_migration(void *opaque,
 
     trace_rdma_start_outgoing_migration_after_rdma_connect();
 
-    s->file = qemu_fopen_rdma(rdma, "wb");
-    migrate_fd_connect(s);
+    s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
+    migrate_fd_connect(s, NULL);
     return;
 err:
-    error_propagate(errp, local_err);
     g_free(rdma);
-    migrate_fd_error(s);
 }
This page took 0.088655 seconds and 4 git commands to generate.