" to abort!"); \
rdma->error_reported = 1; \
} \
+ rcu_read_unlock(); \
return rdma->error_state; \
} \
- } while (0);
+ } while (0)
/*
* A work request ID is 64-bits and we split up these bits
RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
};
-static const char *control_desc[] = {
- [RDMA_CONTROL_NONE] = "NONE",
- [RDMA_CONTROL_ERROR] = "ERROR",
- [RDMA_CONTROL_READY] = "READY",
- [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
- [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
- [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
- [RDMA_CONTROL_COMPRESS] = "COMPRESS",
- [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
- [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
- [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
- [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
- [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
-};
/*
* Memory and MR structures used to represent an IB Send/Recv work request.
uint32_t padding;
} RDMADestBlock;
+static const char *control_desc(unsigned int rdma_control)
+{
+ static const char *strs[] = {
+ [RDMA_CONTROL_NONE] = "NONE",
+ [RDMA_CONTROL_ERROR] = "ERROR",
+ [RDMA_CONTROL_READY] = "READY",
+ [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
+ [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
+ [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
+ [RDMA_CONTROL_COMPRESS] = "COMPRESS",
+ [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
+ [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
+ [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
+ [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
+ [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
+ };
+
+ if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
+ return "??BAD CONTROL VALUE??";
+ }
+
+ return strs[rdma_control];
+}
+
static uint64_t htonll(uint64_t v)
{
union { uint32_t lv[2]; uint64_t llv; } u;
uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
GHashTable *blockmap;
+
+ /* the RDMAContext for return path */
+ struct RDMAContext *return_path;
+ bool is_return_path;
} RDMAContext;
#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
struct QIOChannelRDMA {
QIOChannel parent;
- RDMAContext *rdma;
+ RDMAContext *rdmain;
+ RDMAContext *rdmaout;
QEMUFile *file;
- size_t len;
bool blocking; /* XXX we don't actually honour this yet */
};
* in advanced before the migration starts. This tells us where the RAM blocks
* are so that we can register them individually.
*/
-static int qemu_rdma_init_one_block(const char *block_name, void *host_addr,
- ram_addr_t block_offset, ram_addr_t length, void *opaque)
+static int qemu_rdma_init_one_block(RAMBlock *rb, void *opaque)
{
+ const char *block_name = qemu_ram_get_idstr(rb);
+ void *host_addr = qemu_ram_get_host_addr(rb);
+ ram_addr_t block_offset = qemu_ram_get_offset(rb);
+ ram_addr_t length = qemu_ram_get_used_length(rb);
return rdma_add_block(opaque, block_name, host_addr, block_offset, length);
}
static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
{
RDMALocalBlocks *local = &rdma->local_ram_blocks;
+ int ret;
assert(rdma->blockmap == NULL);
memset(local, 0, sizeof *local);
- qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
+ ret = foreach_not_ignored_block(qemu_rdma_init_one_block, rdma);
+ if (ret) {
+ return ret;
+ }
trace_qemu_rdma_init_ram_blocks(local->nb_blocks);
rdma->dest_blocks = g_new0(RDMADestBlock,
rdma->local_ram_blocks.nb_blocks);
memcpy(local->block + block->index, old + (block->index + 1),
sizeof(RDMALocalBlock) *
(local->nb_blocks - (block->index + 1)));
+ for (x = block->index; x < local->nb_blocks - 1; x++) {
+ local->block[x].index--;
+ }
}
} else {
assert(block == local->block);
return 0;
}
+/* Wait for activity on the completion channel.
+ * Returns 0 on success, none-0 on error.
+ */
+static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
+{
+ struct rdma_cm_event *cm_event;
+ int ret = -1;
+
+ /*
+ * Coroutine doesn't start until migration_fd_process_incoming()
+ * so don't yield unless we know we're running inside of a coroutine.
+ */
+ if (rdma->migration_started_on_destination &&
+ migration_incoming_get_current()->state == MIGRATION_STATUS_ACTIVE) {
+ yield_until_fd_readable(rdma->comp_channel->fd);
+ } else {
+ /* This is the source side, we're in a separate thread
+ * or destination prior to migration_fd_process_incoming()
+ * after postcopy, the destination also in a seprate thread.
+ * we can't yield; so we have to poll the fd.
+ * But we need to be able to handle 'cancel' or an error
+ * without hanging forever.
+ */
+ while (!rdma->error_state && !rdma->received_error) {
+ GPollFD pfds[2];
+ pfds[0].fd = rdma->comp_channel->fd;
+ pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+ pfds[0].revents = 0;
+
+ pfds[1].fd = rdma->channel->fd;
+ pfds[1].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
+ pfds[1].revents = 0;
+
+ /* 0.1s timeout, should be fine for a 'cancel' */
+ switch (qemu_poll_ns(pfds, 2, 100 * 1000 * 1000)) {
+ case 2:
+ case 1: /* fd active */
+ if (pfds[0].revents) {
+ return 0;
+ }
+
+ if (pfds[1].revents) {
+ ret = rdma_get_cm_event(rdma->channel, &cm_event);
+ if (!ret) {
+ rdma_ack_cm_event(cm_event);
+ }
+
+ error_report("receive cm event while wait comp channel,"
+ "cm event is %d", cm_event->event);
+ if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
+ cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
+ return -EPIPE;
+ }
+ }
+ break;
+
+ case 0: /* Timeout, go around again */
+ break;
+
+ default: /* Error of some type -
+ * I don't trust errno from qemu_poll_ns
+ */
+ error_report("%s: poll failed", __func__);
+ return -EPIPE;
+ }
+
+ if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
+ /* Bail out and let the cancellation happen */
+ return -EPIPE;
+ }
+ }
+ }
+
+ if (rdma->received_error) {
+ return -EPIPE;
+ }
+ return rdma->error_state;
+}
+
/*
* Block until the next work request has completed.
*
}
while (1) {
- /*
- * Coroutine doesn't start until migration_fd_process_incoming()
- * so don't yield unless we know we're running inside of a coroutine.
- */
- if (rdma->migration_started_on_destination) {
- yield_until_fd_readable(rdma->comp_channel->fd);
+ ret = qemu_rdma_wait_comp_channel(rdma);
+ if (ret) {
+ goto err_block_for_wrid;
}
ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
.num_sge = 1,
};
- trace_qemu_rdma_post_send_control(control_desc[head->type]);
+ trace_qemu_rdma_post_send_control(control_desc(head->type));
/*
* We don't actually need to do a memcpy() in here if we used
network_to_control((void *) rdma->wr_data[idx].control);
memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
- trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
+ trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
if (expecting == RDMA_CONTROL_NONE) {
- trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
+ trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
head->type);
} else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
error_report("Was expecting a %s (%d) control message"
", but got: %s (%d), length: %d",
- control_desc[expecting], expecting,
- control_desc[head->type], head->type, head->len);
+ control_desc(expecting), expecting,
+ control_desc(head->type), head->type, head->len);
if (head->type == RDMA_CONTROL_ERROR) {
rdma->received_error = true;
}
}
}
- trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
+ trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
ret = qemu_rdma_exchange_get_response(rdma, resp,
resp->type, RDMA_WRID_DATA);
if (resp_idx) {
*resp_idx = RDMA_WRID_DATA;
}
- trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
+ trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
}
rdma->control_ready_expected = 1;
static void qemu_rdma_cleanup(RDMAContext *rdma)
{
- struct rdma_cm_event *cm_event;
- int ret, idx;
+ int idx;
if (rdma->cm_id && rdma->connected) {
- if (rdma->error_state && !rdma->received_error) {
+ if ((rdma->error_state ||
+ migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
+ !rdma->received_error) {
RDMAControlHeader head = { .len = 0,
.type = RDMA_CONTROL_ERROR,
.repeat = 1,
qemu_rdma_post_send_control(rdma, NULL, &head);
}
- ret = rdma_disconnect(rdma->cm_id);
- if (!ret) {
- trace_qemu_rdma_cleanup_waiting_for_disconnect();
- ret = rdma_get_cm_event(rdma->channel, &cm_event);
- if (!ret) {
- rdma_ack_cm_event(cm_event);
- }
- }
+ rdma_disconnect(rdma->cm_id);
trace_qemu_rdma_cleanup_disconnect();
rdma->connected = false;
}
+ if (rdma->channel) {
+ qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
+ }
g_free(rdma->dest_blocks);
rdma->dest_blocks = NULL;
rdma_destroy_id(rdma->cm_id);
rdma->cm_id = NULL;
}
+
+ /* the destination side, listen_id and channel is shared */
if (rdma->listen_id) {
- rdma_destroy_id(rdma->listen_id);
+ if (!rdma->is_return_path) {
+ rdma_destroy_id(rdma->listen_id);
+ }
rdma->listen_id = NULL;
+
+ if (rdma->channel) {
+ if (!rdma->is_return_path) {
+ rdma_destroy_event_channel(rdma->channel);
+ }
+ rdma->channel = NULL;
+ }
}
+
if (rdma->channel) {
rdma_destroy_event_channel(rdma->channel);
rdma->channel = NULL;
}
+static void qemu_rdma_return_path_dest_init(RDMAContext *rdma_return_path,
+ RDMAContext *rdma)
+{
+ int idx;
+
+ for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
+ rdma_return_path->wr_data[idx].control_len = 0;
+ rdma_return_path->wr_data[idx].control_curr = NULL;
+ }
+
+ /*the CM channel and CM id is shared*/
+ rdma_return_path->channel = rdma->channel;
+ rdma_return_path->listen_id = rdma->listen_id;
+
+ rdma->return_path = rdma_return_path;
+ rdma_return_path->return_path = rdma;
+ rdma_return_path->is_return_path = true;
+}
+
static void *qemu_rdma_data_init(const char *host_port, Error **errp)
{
RDMAContext *rdma = NULL;
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
QEMUFile *f = rioc->file;
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
int ret;
ssize_t done = 0;
size_t i;
+ size_t len = 0;
+
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmaout);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
CHECK_ERROR_STATE();
ret = qemu_rdma_write_flush(f, rdma);
if (ret < 0) {
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
while (remaining) {
RDMAControlHeader head;
- rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
- remaining -= rioc->len;
+ len = MIN(remaining, RDMA_SEND_INCREMENT);
+ remaining -= len;
- head.len = rioc->len;
+ head.len = len;
head.type = RDMA_CONTROL_QEMU_FILE;
ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
if (ret < 0) {
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
- data += rioc->len;
- done += rioc->len;
+ data += len;
+ done += len;
}
}
+ rcu_read_unlock();
return done;
}
Error **errp)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
RDMAControlHeader head;
int ret = 0;
ssize_t i;
size_t done = 0;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmain);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
for (i = 0; i < niov; i++) {
* were given and dish out the bytes until we run
* out of bytes.
*/
- ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+ ret = qemu_rdma_fill(rdma, data, want, 0);
done += ret;
want -= ret;
/* Got what we needed, so go to next iovec */
if (ret < 0) {
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
/*
* SEND was received with new bytes, now try again.
*/
- ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+ ret = qemu_rdma_fill(rdma, data, want, 0);
done += ret;
want -= ret;
/* Still didn't get enough, so lets just return */
if (want) {
if (done == 0) {
+ rcu_read_unlock();
return QIO_CHANNEL_ERR_BLOCK;
} else {
break;
}
}
}
- rioc->len = done;
- return rioc->len;
+ rcu_read_unlock();
+ return done;
}
/*
gint *timeout)
{
QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
- RDMAContext *rdma = rsource->rioc->rdma;
+ RDMAContext *rdma;
GIOCondition cond = 0;
*timeout = -1;
+ rcu_read_lock();
+ if (rsource->condition == G_IO_IN) {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmain);
+ } else {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
+ }
+
+ if (!rdma) {
+ error_report("RDMAContext is NULL when prepare Gsource");
+ rcu_read_unlock();
+ return FALSE;
+ }
+
if (rdma->wr_data[0].control_len) {
cond |= G_IO_IN;
}
cond |= G_IO_OUT;
+ rcu_read_unlock();
return cond & rsource->condition;
}
qio_channel_rdma_source_check(GSource *source)
{
QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
- RDMAContext *rdma = rsource->rioc->rdma;
+ RDMAContext *rdma;
GIOCondition cond = 0;
+ rcu_read_lock();
+ if (rsource->condition == G_IO_IN) {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmain);
+ } else {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
+ }
+
+ if (!rdma) {
+ error_report("RDMAContext is NULL when check Gsource");
+ rcu_read_unlock();
+ return FALSE;
+ }
+
if (rdma->wr_data[0].control_len) {
cond |= G_IO_IN;
}
cond |= G_IO_OUT;
+ rcu_read_unlock();
return cond & rsource->condition;
}
{
QIOChannelFunc func = (QIOChannelFunc)callback;
QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
- RDMAContext *rdma = rsource->rioc->rdma;
+ RDMAContext *rdma;
GIOCondition cond = 0;
+ rcu_read_lock();
+ if (rsource->condition == G_IO_IN) {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmain);
+ } else {
+ rdma = atomic_rcu_read(&rsource->rioc->rdmaout);
+ }
+
+ if (!rdma) {
+ error_report("RDMAContext is NULL when dispatch Gsource");
+ rcu_read_unlock();
+ return FALSE;
+ }
+
if (rdma->wr_data[0].control_len) {
cond |= G_IO_IN;
}
cond |= G_IO_OUT;
+ rcu_read_unlock();
return (*func)(QIO_CHANNEL(rsource->rioc),
(cond & rsource->condition),
user_data);
return source;
}
+static void qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc,
+ AioContext *ctx,
+ IOHandler *io_read,
+ IOHandler *io_write,
+ void *opaque)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ if (io_read) {
+ aio_set_fd_handler(ctx, rioc->rdmain->comp_channel->fd,
+ false, io_read, io_write, NULL, opaque);
+ } else {
+ aio_set_fd_handler(ctx, rioc->rdmaout->comp_channel->fd,
+ false, io_read, io_write, NULL, opaque);
+ }
+}
static int qio_channel_rdma_close(QIOChannel *ioc,
Error **errp)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ RDMAContext *rdmain, *rdmaout;
trace_qemu_rdma_close();
- if (rioc->rdma) {
- if (!rioc->rdma->error_state) {
- rioc->rdma->error_state = qemu_file_get_error(rioc->file);
+
+ rdmain = rioc->rdmain;
+ if (rdmain) {
+ atomic_rcu_set(&rioc->rdmain, NULL);
+ }
+
+ rdmaout = rioc->rdmaout;
+ if (rdmaout) {
+ atomic_rcu_set(&rioc->rdmaout, NULL);
+ }
+
+ synchronize_rcu();
+
+ if (rdmain) {
+ qemu_rdma_cleanup(rdmain);
+ }
+
+ if (rdmaout) {
+ qemu_rdma_cleanup(rdmaout);
+ }
+
+ g_free(rdmain);
+ g_free(rdmaout);
+
+ return 0;
+}
+
+static int
+qio_channel_rdma_shutdown(QIOChannel *ioc,
+ QIOChannelShutdown how,
+ Error **errp)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ RDMAContext *rdmain, *rdmaout;
+
+ rcu_read_lock();
+
+ rdmain = atomic_rcu_read(&rioc->rdmain);
+ rdmaout = atomic_rcu_read(&rioc->rdmain);
+
+ switch (how) {
+ case QIO_CHANNEL_SHUTDOWN_READ:
+ if (rdmain) {
+ rdmain->error_state = -1;
+ }
+ break;
+ case QIO_CHANNEL_SHUTDOWN_WRITE:
+ if (rdmaout) {
+ rdmaout->error_state = -1;
+ }
+ break;
+ case QIO_CHANNEL_SHUTDOWN_BOTH:
+ default:
+ if (rdmain) {
+ rdmain->error_state = -1;
+ }
+ if (rdmaout) {
+ rdmaout->error_state = -1;
}
- qemu_rdma_cleanup(rioc->rdma);
- g_free(rioc->rdma);
- rioc->rdma = NULL;
+ break;
}
+
+ rcu_read_unlock();
return 0;
}
size_t size, uint64_t *bytes_sent)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
int ret;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmaout);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
+ if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+ rcu_read_unlock();
+ return RAM_SAVE_CONTROL_NOT_SUPP;
+ }
+
qemu_fflush(f);
if (size > 0) {
}
}
+ rcu_read_unlock();
return RAM_SAVE_CONTROL_DELAYED;
err:
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
+static void rdma_accept_incoming_migration(void *opaque);
+
+static void rdma_cm_poll_handler(void *opaque)
+{
+ RDMAContext *rdma = opaque;
+ int ret;
+ struct rdma_cm_event *cm_event;
+ MigrationIncomingState *mis = migration_incoming_get_current();
+
+ ret = rdma_get_cm_event(rdma->channel, &cm_event);
+ if (ret) {
+ error_report("get_cm_event failed %d", errno);
+ return;
+ }
+ rdma_ack_cm_event(cm_event);
+
+ if (cm_event->event == RDMA_CM_EVENT_DISCONNECTED ||
+ cm_event->event == RDMA_CM_EVENT_DEVICE_REMOVAL) {
+ error_report("receive cm event, cm event is %d", cm_event->event);
+ rdma->error_state = -EPIPE;
+ if (rdma->return_path) {
+ rdma->return_path->error_state = -EPIPE;
+ }
+
+ if (mis->migration_incoming_co) {
+ qemu_coroutine_enter(mis->migration_incoming_co);
+ }
+ return;
+ }
+}
+
static int qemu_rdma_accept(RDMAContext *rdma)
{
RDMACapabilities cap;
}
}
- qemu_set_fd_handler(rdma->channel->fd, NULL, NULL, NULL);
+ /* Accept the second connection request for return path */
+ if (migrate_postcopy() && !rdma->is_return_path) {
+ qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
+ NULL,
+ (void *)(intptr_t)rdma->return_path);
+ } else {
+ qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
+ NULL, rdma);
+ }
ret = rdma_accept(rdma->cm_id, &conn_param);
if (ret) {
RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
.repeat = 1 };
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma = rioc->rdma;
- RDMALocalBlocks *local = &rdma->local_ram_blocks;
+ RDMAContext *rdma;
+ RDMALocalBlocks *local;
RDMAControlHeader head;
RDMARegister *reg, *registers;
RDMACompress *comp;
int count = 0;
int i = 0;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmain);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
+ local = &rdma->local_ram_blocks;
do {
trace_qemu_rdma_registration_handle_wait();
qsort(rdma->local_ram_blocks.block,
rdma->local_ram_blocks.nb_blocks,
sizeof(RDMALocalBlock), dest_ram_sort_func);
+ for (i = 0; i < local->nb_blocks; i++) {
+ local->block[i].index = i;
+ }
+
if (rdma->pin_all) {
ret = qemu_rdma_reg_whole_ram_blocks(rdma);
if (ret) {
}
chunk_start = ram_chunk_start(block, chunk);
chunk_end = ram_chunk_end(block, chunk + reg->chunks);
+ /* avoid "-Waddress-of-packed-member" warning */
+ uint32_t tmp_rkey = 0;
if (qemu_rdma_register_and_get_keys(rdma, block,
- (uintptr_t)host_addr, NULL, ®_result->rkey,
+ (uintptr_t)host_addr, NULL, &tmp_rkey,
chunk, chunk_start, chunk_end)) {
error_report("cannot get rkey");
ret = -EINVAL;
goto out;
}
+ reg_result->rkey = tmp_rkey;
reg_result->host_addr = (uintptr_t)block->local_host_addr;
ret = -EIO;
goto out;
default:
- error_report("Unknown control message %s", control_desc[head.type]);
+ error_report("Unknown control message %s", control_desc(head.type));
ret = -EIO;
goto out;
}
if (ret < 0) {
rdma->error_state = ret;
}
+ rcu_read_unlock();
return ret;
}
static int
rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
{
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
int curr;
int found = -1;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmain);
+
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
/* Find the matching RAMBlock in our local list */
for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
if (found == -1) {
error_report("RAMBlock '%s' not found on destination", name);
+ rcu_read_unlock();
return -ENOENT;
}
trace_rdma_block_notification_handle(name, rdma->next_src_index);
rdma->next_src_index++;
+ rcu_read_unlock();
return 0;
}
uint64_t flags, void *data)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
+
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmaout);
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
CHECK_ERROR_STATE();
+ if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+ rcu_read_unlock();
+ return 0;
+ }
+
trace_qemu_rdma_registration_start(flags);
qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
qemu_fflush(f);
+ rcu_read_unlock();
return 0;
}
{
Error *local_err = NULL, **errp = &local_err;
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
- RDMAContext *rdma = rioc->rdma;
+ RDMAContext *rdma;
RDMAControlHeader head = { .len = 0, .repeat = 1 };
int ret = 0;
+ rcu_read_lock();
+ rdma = atomic_rcu_read(&rioc->rdmaout);
+ if (!rdma) {
+ rcu_read_unlock();
+ return -EIO;
+ }
+
CHECK_ERROR_STATE();
+ if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) {
+ rcu_read_unlock();
+ return 0;
+ }
+
qemu_fflush(f);
ret = qemu_rdma_drain_cq(f, rdma);
qemu_rdma_reg_whole_ram_blocks : NULL);
if (ret < 0) {
ERROR(errp, "receiving remote info!");
+ rcu_read_unlock();
return ret;
}
"not identical on both the source and destination.",
local->nb_blocks, nb_dest_blocks);
rdma->error_state = -EINVAL;
+ rcu_read_unlock();
return -EINVAL;
}
local->block[i].length,
rdma->dest_blocks[i].length);
rdma->error_state = -EINVAL;
+ rcu_read_unlock();
return -EINVAL;
}
local->block[i].remote_host_addr =
goto err;
}
+ rcu_read_unlock();
return 0;
err:
rdma->error_state = ret;
+ rcu_read_unlock();
return ret;
}
static void qio_channel_rdma_finalize(Object *obj)
{
QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
- if (rioc->rdma) {
- qemu_rdma_cleanup(rioc->rdma);
- g_free(rioc->rdma);
- rioc->rdma = NULL;
+ if (rioc->rdmain) {
+ qemu_rdma_cleanup(rioc->rdmain);
+ g_free(rioc->rdmain);
+ rioc->rdmain = NULL;
+ }
+ if (rioc->rdmaout) {
+ qemu_rdma_cleanup(rioc->rdmaout);
+ g_free(rioc->rdmaout);
+ rioc->rdmaout = NULL;
}
}
ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
ioc_klass->io_close = qio_channel_rdma_close;
ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
+ ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
+ ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
}
static const TypeInfo qio_channel_rdma_info = {
}
rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
- rioc->rdma = rdma;
if (mode[0] == 'w') {
rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
+ rioc->rdmaout = rdma;
+ rioc->rdmain = rdma->return_path;
qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
} else {
rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
+ rioc->rdmain = rdma;
+ rioc->rdmaout = rdma->return_path;
qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
}
trace_qemu_rdma_accept_incoming_migration_accepted();
+ if (rdma->is_return_path) {
+ return;
+ }
+
f = qemu_fopen_rdma(rdma, "rb");
if (f == NULL) {
ERROR(errp, "could not qemu_fopen_rdma!");
void rdma_start_incoming_migration(const char *host_port, Error **errp)
{
int ret;
- RDMAContext *rdma;
+ RDMAContext *rdma, *rdma_return_path = NULL;
Error *local_err = NULL;
trace_rdma_start_incoming_migration();
trace_rdma_start_incoming_migration_after_rdma_listen();
+ /* initialize the RDMAContext for return path */
+ if (migrate_postcopy()) {
+ rdma_return_path = qemu_rdma_data_init(host_port, &local_err);
+
+ if (rdma_return_path == NULL) {
+ goto err;
+ }
+
+ qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
+ }
+
qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
NULL, (void *)(intptr_t)rdma);
return;
err:
error_propagate(errp, local_err);
g_free(rdma);
+ g_free(rdma_return_path);
}
void rdma_start_outgoing_migration(void *opaque,
{
MigrationState *s = opaque;
RDMAContext *rdma = qemu_rdma_data_init(host_port, errp);
+ RDMAContext *rdma_return_path = NULL;
int ret = 0;
if (rdma == NULL) {
goto err;
}
+ /* RDMA postcopy need a seprate queue pair for return path */
+ if (migrate_postcopy()) {
+ rdma_return_path = qemu_rdma_data_init(host_port, errp);
+
+ if (rdma_return_path == NULL) {
+ goto err;
+ }
+
+ ret = qemu_rdma_source_init(rdma_return_path,
+ s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
+
+ if (ret) {
+ goto err;
+ }
+
+ ret = qemu_rdma_connect(rdma_return_path, errp);
+
+ if (ret) {
+ goto err;
+ }
+
+ rdma->return_path = rdma_return_path;
+ rdma_return_path->return_path = rdma;
+ rdma_return_path->is_return_path = true;
+ }
+
trace_rdma_start_outgoing_migration_after_rdma_connect();
s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
- migrate_fd_connect(s);
+ migrate_fd_connect(s, NULL);
return;
err:
g_free(rdma);
+ g_free(rdma_return_path);
}