#include "qapi/error.h"
#include "qemu-common.h"
#include "qemu/cutils.h"
-#include "migration/migration.h"
-#include "migration/qemu-file.h"
-#include "exec/cpu-common.h"
+#include "rdma.h"
+#include "migration.h"
+#include "qemu-file.h"
+#include "ram.h"
+#include "qemu-file-channel.h"
#include "qemu/error-report.h"
#include "qemu/main-loop.h"
#include "qemu/sockets.h"
} \
return rdma->error_state; \
} \
- } while (0);
+ } while (0)
/*
* A work request ID is 64-bits and we split up these bits
RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
};
-static const char *control_desc[] = {
- [RDMA_CONTROL_NONE] = "NONE",
- [RDMA_CONTROL_ERROR] = "ERROR",
- [RDMA_CONTROL_READY] = "READY",
- [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
- [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
- [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
- [RDMA_CONTROL_COMPRESS] = "COMPRESS",
- [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
- [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
- [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
- [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
- [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
-};
/*
* Memory and MR structures used to represent an IB Send/Recv work request.
uint32_t padding;
} RDMADestBlock;
+static const char *control_desc(unsigned int rdma_control)
+{
+ static const char *strs[] = {
+ [RDMA_CONTROL_NONE] = "NONE",
+ [RDMA_CONTROL_ERROR] = "ERROR",
+ [RDMA_CONTROL_READY] = "READY",
+ [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
+ [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
+ [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
+ [RDMA_CONTROL_COMPRESS] = "COMPRESS",
+ [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
+ [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
+ [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
+ [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
+ [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
+ };
+
+ if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
+ return "??BAD CONTROL VALUE??";
+ }
+
+ return strs[rdma_control];
+}
+
static uint64_t htonll(uint64_t v)
{
union { uint32_t lv[2]; uint64_t llv; } u;
*/
int error_state;
int error_reported;
+ int received_error;
/*
* Description of ram blocks used throughout the code.
*
* Patches are being reviewed on linux-rdma.
*/
-static int qemu_rdma_broken_ipv6_kernel(Error **errp, struct ibv_context *verbs)
+static int qemu_rdma_broken_ipv6_kernel(struct ibv_context *verbs, Error **errp)
{
struct ibv_port_attr port_attr;
RDMA_RESOLVE_TIMEOUT_MS);
if (!ret) {
if (e->ai_family == AF_INET6) {
- ret = qemu_rdma_broken_ipv6_kernel(errp, rdma->cm_id->verbs);
+ ret = qemu_rdma_broken_ipv6_kernel(rdma->cm_id->verbs, errp);
if (ret) {
continue;
}
return 0;
}
+/* Wait for activity on the completion channel.
+ * Returns 0 on success, none-0 on error.
+ */
+static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
+{
+ /*
+ * Coroutine doesn't start until migration_fd_process_incoming()
+ * so don't yield unless we know we're running inside of a coroutine.
+ */
+ if (rdma->migration_started_on_destination) {
+ yield_until_fd_readable(rdma->comp_channel->fd);
+ } else {
+ /* This is the source side, we're in a separate thread
+ * or destination prior to migration_fd_process_incoming()
+ * we can't yield; so we have to poll the fd.
+ * But we need to be able to handle 'cancel' or an error
+ * without hanging forever.
+ */
+ while (!rdma->error_state && !rdma->received_error) {
+ GPollFD pfds[1];
+ pfds[0].fd = rdma->comp_channel->fd;
+ pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+ /* 0.1s timeout, should be fine for a 'cancel' */
+ switch (qemu_poll_ns(pfds, 1, 100 * 1000 * 1000)) {
+ case 1: /* fd active */
+ return 0;
+
+ case 0: /* Timeout, go around again */
+ break;
+
+ default: /* Error of some type -
+ * I don't trust errno from qemu_poll_ns
+ */
+ error_report("%s: poll failed", __func__);
+ return -EPIPE;
+ }
+
+ if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
+ /* Bail out and let the cancellation happen */
+ return -EPIPE;
+ }
+ }
+ }
+
+ if (rdma->received_error) {
+ return -EPIPE;
+ }
+ return rdma->error_state;
+}
+
/*
* Block until the next work request has completed.
*
}
while (1) {
- /*
- * Coroutine doesn't start until migration_fd_process_incoming()
- * so don't yield unless we know we're running inside of a coroutine.
- */
- if (rdma->migration_started_on_destination) {
- yield_until_fd_readable(rdma->comp_channel->fd);
+ ret = qemu_rdma_wait_comp_channel(rdma);
+ if (ret) {
+ goto err_block_for_wrid;
}
- if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
+ ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
+ if (ret) {
perror("ibv_get_cq_event");
goto err_block_for_wrid;
}
num_cq_events++;
- if (ibv_req_notify_cq(cq, 0)) {
+ ret = -ibv_req_notify_cq(cq, 0);
+ if (ret) {
goto err_block_for_wrid;
}
if (num_cq_events) {
ibv_ack_cq_events(cq, num_cq_events);
}
+
+ rdma->error_state = ret;
return ret;
}
.num_sge = 1,
};
- trace_qemu_rdma_post_send_control(control_desc[head->type]);
+ trace_qemu_rdma_post_send_control(control_desc(head->type));
/*
* We don't actually need to do a memcpy() in here if we used
network_to_control((void *) rdma->wr_data[idx].control);
memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
- trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
+ trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
if (expecting == RDMA_CONTROL_NONE) {
- trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
+ trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
head->type);
} else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
error_report("Was expecting a %s (%d) control message"
", but got: %s (%d), length: %d",
- control_desc[expecting], expecting,
- control_desc[head->type], head->type, head->len);
+ control_desc(expecting), expecting,
+ control_desc(head->type), head->type, head->len);
+ if (head->type == RDMA_CONTROL_ERROR) {
+ rdma->received_error = true;
+ }
return -EIO;
}
if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
}
}
- trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
+ trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
ret = qemu_rdma_exchange_get_response(rdma, resp,
resp->type, RDMA_WRID_DATA);
if (resp_idx) {
*resp_idx = RDMA_WRID_DATA;
}
- trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
+ trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
}
rdma->control_ready_expected = 1;
* memset() + madvise() the entire chunk without RDMA.
*/
- if (can_use_buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr,
- length)
- && buffer_find_nonzero_offset((void *)(uintptr_t)sge.addr,
- length) == length) {
+ if (buffer_is_zero((void *)(uintptr_t)sge.addr, length)) {
RDMACompress comp = {
.offset = current_addr,
.value = 0,
int ret, idx;
if (rdma->cm_id && rdma->connected) {
- if (rdma->error_state) {
+ if ((rdma->error_state ||
+ migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
+ !rdma->received_error) {
RDMAControlHeader head = { .len = 0,
.type = RDMA_CONTROL_ERROR,
.repeat = 1,
}
-static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
+static int qemu_rdma_source_init(RDMAContext *rdma, bool pin_all, Error **errp)
{
int ret, idx;
Error *local_err = NULL, **temp = &local_err;
caps_to_network(&cap);
+ ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
+ if (ret) {
+ ERROR(errp, "posting second control recv");
+ goto err_rdma_source_connect;
+ }
+
ret = rdma_connect(rdma->cm_id, &conn_param);
if (ret) {
perror("rdma_connect");
rdma_ack_cm_event(cm_event);
- ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
- if (ret) {
- ERROR(errp, "posting second control recv!");
- goto err_rdma_source_connect;
- }
-
rdma->control_ready_expected = 1;
rdma->nb_sent = 0;
return 0;
continue;
}
if (e->ai_family == AF_INET6) {
- ret = qemu_rdma_broken_ipv6_kernel(errp, listen_id->verbs);
+ ret = qemu_rdma_broken_ipv6_kernel(listen_id->verbs, errp);
if (ret) {
continue;
}
rdma->current_index = -1;
rdma->current_chunk = -1;
- addr = inet_parse(host_port, NULL);
- if (addr != NULL) {
+ addr = g_new(InetSocketAddress, 1);
+ if (!inet_parse(addr, host_port, NULL)) {
rdma->port = atoi(addr->port);
rdma->host = g_strdup(addr->host);
} else {
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
trace_qemu_rdma_close();
if (rioc->rdma) {
+ if (!rioc->rdma->error_state) {
+ rioc->rdma->error_state = qemu_file_get_error(rioc->file);
+ }
qemu_rdma_cleanup(rioc->rdma);
g_free(rioc->rdma);
rioc->rdma = NULL;
ret = -EIO;
goto out;
default:
- error_report("Unknown control message %s", control_desc[head.type]);
+ error_report("Unknown control message %s", control_desc(head.type));
ret = -EIO;
goto out;
}
goto err;
}
- ret = qemu_rdma_source_init(rdma, errp,
- s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL]);
+ ret = qemu_rdma_source_init(rdma,
+ s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
if (ret) {
goto err;
trace_rdma_start_outgoing_migration_after_rdma_connect();
s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
- migrate_fd_connect(s);
+ migrate_fd_connect(s, NULL);
return;
err:
g_free(rdma);