]> Git Repo - qemu.git/blobdiff - migration/rdma.c
migration: stop decompression to allocate and free memory frequently
[qemu.git] / migration / rdma.c
index 5110ec828d26fb6da710216f7618276290b6ab91..da474fc19f81cd33b1ce4eb4eb94a2f000976912 100644 (file)
 #include "qapi/error.h"
 #include "qemu-common.h"
 #include "qemu/cutils.h"
-#include "migration/migration.h"
-#include "migration/qemu-file.h"
-#include "exec/cpu-common.h"
+#include "rdma.h"
+#include "migration.h"
+#include "qemu-file.h"
+#include "ram.h"
+#include "qemu-file-channel.h"
 #include "qemu/error-report.h"
 #include "qemu/main-loop.h"
 #include "qemu/sockets.h"
@@ -86,7 +88,7 @@ static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
             } \
             return rdma->error_state; \
         } \
-    } while (0);
+    } while (0)
 
 /*
  * A work request ID is 64-bits and we split up these bits
@@ -163,20 +165,6 @@ enum {
     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
 };
 
-static const char *control_desc[] = {
-    [RDMA_CONTROL_NONE] = "NONE",
-    [RDMA_CONTROL_ERROR] = "ERROR",
-    [RDMA_CONTROL_READY] = "READY",
-    [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
-    [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
-    [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
-    [RDMA_CONTROL_COMPRESS] = "COMPRESS",
-    [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
-    [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
-    [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
-    [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
-    [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
-};
 
 /*
  * Memory and MR structures used to represent an IB Send/Recv work request.
@@ -249,6 +237,30 @@ typedef struct QEMU_PACKED RDMADestBlock {
     uint32_t padding;
 } RDMADestBlock;
 
+static const char *control_desc(unsigned int rdma_control)
+{
+    static const char *strs[] = {
+        [RDMA_CONTROL_NONE] = "NONE",
+        [RDMA_CONTROL_ERROR] = "ERROR",
+        [RDMA_CONTROL_READY] = "READY",
+        [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
+        [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
+        [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
+        [RDMA_CONTROL_COMPRESS] = "COMPRESS",
+        [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
+        [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
+        [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
+        [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
+        [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
+    };
+
+    if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
+        return "??BAD CONTROL VALUE??";
+    }
+
+    return strs[rdma_control];
+}
+
 static uint64_t htonll(uint64_t v)
 {
     union { uint32_t lv[2]; uint64_t llv; } u;
@@ -350,6 +362,7 @@ typedef struct RDMAContext {
      */
     int error_state;
     int error_reported;
+    int received_error;
 
     /*
      * Description of ram blocks used throughout the code.
@@ -808,7 +821,7 @@ static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
  *
  * Patches are being reviewed on linux-rdma.
  */
-static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs)
+static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
 {
     struct ibv_port_attr port_attr;
 
@@ -949,7 +962,7 @@ static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
                 RDMA_RESOLVE_TIMEOUT_MS);
         if (!ret) {
             if (e->ai_family == AF_INET6) {
-                ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs);
+                ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
                 if (ret) {
                     continue;
                 }
@@ -1463,6 +1476,56 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
     return  0;
 }
 
+/* Wait for activity on the completion channel.
+ * Returns 0 on success, none-0 on error.
+ */
+static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
+{
+    /*
+     * Coroutine doesn't start until migration_fd_process_incoming()
+     * so don't yield unless we know we're running inside of a coroutine.
+     */
+    if (rdma->migration_started_on_destination) {
+        yield_until_fd_readable(rdma->comp_channel->fd);
+    } else {
+        /* This is the source side, we're in a separate thread
+         * or destination prior to migration_fd_process_incoming()
+         * we can't yield; so we have to poll the fd.
+         * But we need to be able to handle 'cancel' or an error
+         * without hanging forever.
+         */
+        while (!rdma->error_state  && !rdma->received_error) {
+            GPollFD pfds[1];
+            pfds[0].fd = rdma->comp_channel->fd;
+            pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+            /* 0.1s timeout, should be fine for a 'cancel' */
+            switch (qemu_poll_ns(pfds, 1, 100 * 1000 * 1000)) {
+            case 1: /* fd active */
+                return 0;
+
+            case 0: /* Timeout, go around again */
+                break;
+
+            default: /* Error of some type -
+                      * I don't trust errno from qemu_poll_ns
+                     */
+                error_report("%s: poll failed", __func__);
+                return -EPIPE;
+            }
+
+            if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
+                /* Bail out and let the cancellation happen */
+                return -EPIPE;
+            }
+        }
+    }
+
+    if (rdma->received_error) {
+        return -EPIPE;
+    }
+    return rdma->error_state;
+}
+
 /*
  * Block until the next work request has completed.
  *
@@ -1510,22 +1573,21 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
     }
 
     while (1) {
-        /*
-         * Coroutine doesn't start until migration_fd_process_incoming()
-         * so don't yield unless we know we're running inside of a coroutine.
-         */
-        if (rdma->migration_started_on_destination) {
-            yield_until_fd_readable(rdma->comp_channel->fd);
+        ret = qemu_rdma_wait_comp_channel(rdma);
+        if (ret) {
+            goto err_block_for_wrid;
         }
 
-        if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
+        ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
+        if (ret) {
             perror("ibv_get_cq_event");
             goto err_block_for_wrid;
         }
 
         num_cq_events++;
 
-        if (ibv_req_notify_cq(cq, 0)) {
+        ret = -ibv_req_notify_cq(cq, 0);
+        if (ret) {
             goto err_block_for_wrid;
         }
 
@@ -1561,6 +1623,8 @@ err_block_for_wrid:
     if (num_cq_events) {
         ibv_ack_cq_events(cq, num_cq_events);
     }
+
+    rdma->error_state = ret;
     return ret;
 }
 
@@ -1587,7 +1651,7 @@ static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
                                    .num_sge = 1,
                                 };
 
-    trace_qemu_rdma_post_send_control(control_desc[head->type]);
+    trace_qemu_rdma_post_send_control(control_desc(head->type));
 
     /*
      * We don't actually need to do a memcpy() in here if we used
@@ -1666,16 +1730,19 @@ static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
     network_to_control((void *) rdma->wr_data[idx].control);
     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
 
-    trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
+    trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
 
     if (expecting == RDMA_CONTROL_NONE) {
-        trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
+        trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
                                              head->type);
     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
         error_report("Was expecting a %s (%d) control message"
                 ", but got: %s (%d), length: %d",
-                control_desc[expecting], expecting,
-                control_desc[head->type], head->type, head->len);
+                control_desc(expecting), expecting,
+                control_desc(head->type), head->type, head->len);
+        if (head->type == RDMA_CONTROL_ERROR) {
+            rdma->received_error = true;
+        }
         return -EIO;
     }
     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
@@ -1782,7 +1849,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
             }
         }
 
-        trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
+        trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
         ret = qemu_rdma_exchange_get_response(rdma, resp,
                                               resp->type, RDMA_WRID_DATA);
 
@@ -1794,7 +1861,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
         if (resp_idx) {
             *resp_idx = RDMA_WRID_DATA;
         }
-        trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
+        trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
     }
 
     rdma->control_ready_expected = 1;
@@ -1934,10 +2001,7 @@ retry:
              * memset() + madvise() the entire chunk without RDMA.
              */
 
-            if (can_use_buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr,
-                                                   length)
-                   && buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr,
-                                                    length) == length) {
+            if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
                 RDMACompress comp = {
                                         .offset = current_addr,
                                         .value = 0,
@@ -2205,7 +2269,9 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
     int ret, idx;
 
     if (rdma->cm_id && rdma->connected) {
-        if (rdma->error_state) {
+        if ((rdma->error_state ||
+             migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
+            !rdma->received_error) {
             RDMAControlHeader head = { .len = 0,
                                        .type = RDMA_CONTROL_ERROR,
                                        .repeat = 1,
@@ -2276,7 +2342,7 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
 }
 
 
-static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
+static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
 {
     int ret, idx;
     Error *local_err = NULL, **temp = &local_err;
@@ -2362,6 +2428,12 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
 
     caps_to_network(&cap);
 
+    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
+    if (ret) {
+        ERROR(errp, "posting second control recv");
+        goto err_rdma_source_connect;
+    }
+
     ret = rdma_connect(rdma->cm_id, &conn_param);
     if (ret) {
         perror("rdma_connect");
@@ -2402,12 +2474,6 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
 
     rdma_ack_cm_event(cm_event);
 
-    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
-    if (ret) {
-        ERROR(errp, "posting second control recv!");
-        goto err_rdma_source_connect;
-    }
-
     rdma->control_ready_expected = 1;
     rdma->nb_sent = 0;
     return 0;
@@ -2468,7 +2534,7 @@ static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
             continue;
         }
         if (e->ai_family == AF_INET6) {
-            ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs);
+            ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
             if (ret) {
                 continue;
             }
@@ -2505,8 +2571,8 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp)
         rdma->current_index = -1;
         rdma->current_chunk = -1;
 
-        addr = inet_parse(host_port, NULL);
-        if (addr != NULL) {
+        addr = g_new(InetSocketAddress, 1);
+        if (!inet_parse(addr, host_port, NULL)) {
             rdma->port = atoi(addr->port);
             rdma->host = g_strdup(addr->host);
         } else {
@@ -2807,6 +2873,9 @@ static int qio_channel_rdma_close(QIOChannel *ioc,
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
     trace_qemu_rdma_close();
     if (rioc->rdma) {
+        if (!rioc->rdma->error_state) {
+            rioc->rdma->error_state = qemu_file_get_error(rioc->file);
+        }
         qemu_rdma_cleanup(rioc->rdma);
         g_free(rioc->rdma);
         rioc->rdma = NULL;
@@ -3344,7 +3413,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
             ret = -EIO;
             goto out;
         default:
-            error_report("Unknown control message %s", control_desc[head.type]);
+            error_report("Unknown control message %s", control_desc(head.type));
             ret = -EIO;
             goto out;
         }
@@ -3672,8 +3741,8 @@ void rdma_start_outgoing_migration(void *opaque,
         goto err;
     }
 
-    ret = qemu_rdma_source_init(rdma, errp,
-        s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]);
+    ret = qemu_rdma_source_init(rdma,
+        s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
 
     if (ret) {
         goto err;
@@ -3689,7 +3758,7 @@ void rdma_start_outgoing_migration(void *opaque,
     trace_rdma_start_outgoing_migration_after_rdma_connect();
 
     s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
-    migrate_fd_connect(s);
+    migrate_fd_connect(s, NULL);
     return;
 err:
     g_free(rdma);
This page took 0.034903 seconds and 4 git commands to generate.