/*
* QEMU Block driver for NBD
*
+ * Copyright (C) 2016 Red Hat, Inc.
* Copyright (C) 2008 Bull S.A.S.
*
#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
#define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs))
-static void nbd_recv_coroutines_enter_all(NbdClientSession *s)
+static void nbd_recv_coroutines_enter_all(BlockDriverState *bs)
{
+ NBDClientSession *s = nbd_get_client_session(bs);
int i;
for (i = 0; i < MAX_NBD_REQUESTS; i++) {
if (s->recv_coroutine[i]) {
- qemu_coroutine_enter(s->recv_coroutine[i], NULL);
+ qemu_coroutine_enter(s->recv_coroutine[i]);
}
}
+ BDRV_POLL_WHILE(bs, s->read_reply_co);
}
static void nbd_teardown_connection(BlockDriverState *bs)
{
- NbdClientSession *client = nbd_get_client_session(bs);
+ NBDClientSession *client = nbd_get_client_session(bs);
if (!client->ioc) { /* Already closed */
return;
qio_channel_shutdown(client->ioc,
QIO_CHANNEL_SHUTDOWN_BOTH,
NULL);
- nbd_recv_coroutines_enter_all(client);
+ nbd_recv_coroutines_enter_all(bs);
nbd_client_detach_aio_context(bs);
object_unref(OBJECT(client->sioc));
client->ioc = NULL;
}
-static void nbd_reply_ready(void *opaque)
+static coroutine_fn void nbd_read_reply_entry(void *opaque)
{
- BlockDriverState *bs = opaque;
- NbdClientSession *s = nbd_get_client_session(bs);
+ NBDClientSession *s = opaque;
uint64_t i;
int ret;
- if (!s->ioc) { /* Already closed */
- return;
- }
-
- if (s->reply.handle == 0) {
- /* No reply already in flight. Fetch a header. It is possible
- * that another thread has done the same thing in parallel, so
- * the socket is not readable anymore.
- */
+ for (;;) {
+ assert(s->reply.handle == 0);
ret = nbd_receive_reply(s->ioc, &s->reply);
- if (ret == -EAGAIN) {
- return;
- }
if (ret < 0) {
- s->reply.handle = 0;
- goto fail;
+ break;
}
- }
- /* There's no need for a mutex on the receive side, because the
- * handler acts as a synchronization point and ensures that only
- * one coroutine is called until the reply finishes. */
- i = HANDLE_TO_INDEX(s, s->reply.handle);
- if (i >= MAX_NBD_REQUESTS) {
- goto fail;
- }
+ /* There's no need for a mutex on the receive side, because the
+ * handler acts as a synchronization point and ensures that only
+ * one coroutine is called until the reply finishes.
+ */
+ i = HANDLE_TO_INDEX(s, s->reply.handle);
+ if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) {
+ break;
+ }
- if (s->recv_coroutine[i]) {
- qemu_coroutine_enter(s->recv_coroutine[i], NULL);
- return;
+ /* We're woken up by the recv_coroutine itself. Note that there
+ * is no race between yielding and reentering read_reply_co. This
+ * is because:
+ *
+ * - if recv_coroutine[i] runs on the same AioContext, it is only
+ * entered after we yield
+ *
+ * - if recv_coroutine[i] runs on a different AioContext, reentering
+ * read_reply_co happens through a bottom half, which can only
+ * run after we yield.
+ */
+ aio_co_wake(s->recv_coroutine[i]);
+ qemu_coroutine_yield();
}
-
-fail:
- nbd_teardown_connection(bs);
-}
-
-static void nbd_restart_write(void *opaque)
-{
- BlockDriverState *bs = opaque;
-
- qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine, NULL);
+ s->read_reply_co = NULL;
}
static int nbd_co_send_request(BlockDriverState *bs,
- struct nbd_request *request,
- QEMUIOVector *qiov, int offset)
+ NBDRequest *request,
+ QEMUIOVector *qiov)
{
- NbdClientSession *s = nbd_get_client_session(bs);
- AioContext *aio_context;
+ NBDClientSession *s = nbd_get_client_session(bs);
int rc, ret, i;
qemu_co_mutex_lock(&s->send_mutex);
return -EPIPE;
}
- s->send_coroutine = qemu_coroutine_self();
- aio_context = bdrv_get_aio_context(bs);
-
- aio_set_fd_handler(aio_context, s->sioc->fd, false,
- nbd_reply_ready, nbd_restart_write, bs);
if (qiov) {
qio_channel_set_cork(s->ioc, true);
rc = nbd_send_request(s->ioc, request);
if (rc >= 0) {
- ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
- offset, request->len, 0);
+ ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov, request->len,
+ false);
if (ret != request->len) {
rc = -EIO;
}
} else {
rc = nbd_send_request(s->ioc, request);
}
- aio_set_fd_handler(aio_context, s->sioc->fd, false,
- nbd_reply_ready, NULL, bs);
- s->send_coroutine = NULL;
qemu_co_mutex_unlock(&s->send_mutex);
return rc;
}
-static void nbd_co_receive_reply(NbdClientSession *s,
- struct nbd_request *request, struct nbd_reply *reply,
- QEMUIOVector *qiov, int offset)
+static void nbd_co_receive_reply(NBDClientSession *s,
+ NBDRequest *request,
+ NBDReply *reply,
+ QEMUIOVector *qiov)
{
int ret;
- /* Wait until we're woken up by the read handler. TODO: perhaps
- * peek at the next reply and avoid yielding if it's ours? */
+ /* Wait until we're woken up by nbd_read_reply_entry. */
qemu_coroutine_yield();
*reply = s->reply;
if (reply->handle != request->handle ||
reply->error = EIO;
} else {
if (qiov && reply->error == 0) {
- ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
- offset, request->len, 1);
+ ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov, request->len,
+ true);
if (ret != request->len) {
reply->error = EIO;
}
}
}
-static void nbd_coroutine_start(NbdClientSession *s,
- struct nbd_request *request)
+static void nbd_coroutine_start(NBDClientSession *s,
+ NBDRequest *request)
{
/* Poor man semaphore. The free_sema is locked when no other request
* can be accepted, and unlocked after receiving one reply. */
- if (s->in_flight >= MAX_NBD_REQUESTS - 1) {
- qemu_co_mutex_lock(&s->free_sema);
+ if (s->in_flight == MAX_NBD_REQUESTS) {
+ qemu_co_queue_wait(&s->free_sema, NULL);
assert(s->in_flight < MAX_NBD_REQUESTS);
}
s->in_flight++;
/* s->recv_coroutine[i] is set as soon as we get the send_lock. */
}
-static void nbd_coroutine_end(NbdClientSession *s,
- struct nbd_request *request)
+static void nbd_coroutine_end(BlockDriverState *bs,
+ NBDRequest *request)
{
+ NBDClientSession *s = nbd_get_client_session(bs);
int i = HANDLE_TO_INDEX(s, request->handle);
+
s->recv_coroutine[i] = NULL;
- if (s->in_flight-- == MAX_NBD_REQUESTS) {
- qemu_co_mutex_unlock(&s->free_sema);
+ s->in_flight--;
+ qemu_co_queue_next(&s->free_sema);
+
+ /* Kick the read_reply_co to get the next reply. */
+ if (s->read_reply_co) {
+ aio_co_wake(s->read_reply_co);
}
}
-static int nbd_co_readv_1(BlockDriverState *bs, int64_t sector_num,
- int nb_sectors, QEMUIOVector *qiov,
- int offset)
+int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
+ uint64_t bytes, QEMUIOVector *qiov, int flags)
{
- NbdClientSession *client = nbd_get_client_session(bs);
- struct nbd_request request = { .type = NBD_CMD_READ };
- struct nbd_reply reply;
+ NBDClientSession *client = nbd_get_client_session(bs);
+ NBDRequest request = {
+ .type = NBD_CMD_READ,
+ .from = offset,
+ .len = bytes,
+ };
+ NBDReply reply;
ssize_t ret;
- request.from = sector_num * 512;
- request.len = nb_sectors * 512;
+ assert(bytes <= NBD_MAX_BUFFER_SIZE);
+ assert(!flags);
nbd_coroutine_start(client, &request);
- ret = nbd_co_send_request(bs, &request, NULL, 0);
+ ret = nbd_co_send_request(bs, &request, NULL);
if (ret < 0) {
reply.error = -ret;
} else {
- nbd_co_receive_reply(client, &request, &reply, qiov, offset);
+ nbd_co_receive_reply(client, &request, &reply, qiov);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
-
}
-static int nbd_co_writev_1(BlockDriverState *bs, int64_t sector_num,
- int nb_sectors, QEMUIOVector *qiov,
- int offset)
+int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
+ uint64_t bytes, QEMUIOVector *qiov, int flags)
{
- NbdClientSession *client = nbd_get_client_session(bs);
- struct nbd_request request = { .type = NBD_CMD_WRITE };
- struct nbd_reply reply;
+ NBDClientSession *client = nbd_get_client_session(bs);
+ NBDRequest request = {
+ .type = NBD_CMD_WRITE,
+ .from = offset,
+ .len = bytes,
+ };
+ NBDReply reply;
ssize_t ret;
- if (!bdrv_enable_write_cache(bs) &&
- (client->nbdflags & NBD_FLAG_SEND_FUA)) {
- request.type |= NBD_CMD_FLAG_FUA;
+ if (flags & BDRV_REQ_FUA) {
+ assert(client->nbdflags & NBD_FLAG_SEND_FUA);
+ request.flags |= NBD_CMD_FLAG_FUA;
}
- request.from = sector_num * 512;
- request.len = nb_sectors * 512;
+ assert(bytes <= NBD_MAX_BUFFER_SIZE);
nbd_coroutine_start(client, &request);
- ret = nbd_co_send_request(bs, &request, qiov, offset);
+ ret = nbd_co_send_request(bs, &request, qiov);
if (ret < 0) {
reply.error = -ret;
} else {
- nbd_co_receive_reply(client, &request, &reply, NULL, 0);
+ nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
-/* qemu-nbd has a limit of slightly less than 1M per request. Try to
- * remain aligned to 4K. */
-#define NBD_MAX_SECTORS 2040
-
-int nbd_client_co_readv(BlockDriverState *bs, int64_t sector_num,
- int nb_sectors, QEMUIOVector *qiov)
+int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
+ int count, BdrvRequestFlags flags)
{
- int offset = 0;
- int ret;
- while (nb_sectors > NBD_MAX_SECTORS) {
- ret = nbd_co_readv_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset);
- if (ret < 0) {
- return ret;
- }
- offset += NBD_MAX_SECTORS * 512;
- sector_num += NBD_MAX_SECTORS;
- nb_sectors -= NBD_MAX_SECTORS;
+ ssize_t ret;
+ NBDClientSession *client = nbd_get_client_session(bs);
+ NBDRequest request = {
+ .type = NBD_CMD_WRITE_ZEROES,
+ .from = offset,
+ .len = count,
+ };
+ NBDReply reply;
+
+ if (!(client->nbdflags & NBD_FLAG_SEND_WRITE_ZEROES)) {
+ return -ENOTSUP;
}
- return nbd_co_readv_1(bs, sector_num, nb_sectors, qiov, offset);
-}
-int nbd_client_co_writev(BlockDriverState *bs, int64_t sector_num,
- int nb_sectors, QEMUIOVector *qiov)
-{
- int offset = 0;
- int ret;
- while (nb_sectors > NBD_MAX_SECTORS) {
- ret = nbd_co_writev_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset);
- if (ret < 0) {
- return ret;
- }
- offset += NBD_MAX_SECTORS * 512;
- sector_num += NBD_MAX_SECTORS;
- nb_sectors -= NBD_MAX_SECTORS;
+ if (flags & BDRV_REQ_FUA) {
+ assert(client->nbdflags & NBD_FLAG_SEND_FUA);
+ request.flags |= NBD_CMD_FLAG_FUA;
+ }
+ if (!(flags & BDRV_REQ_MAY_UNMAP)) {
+ request.flags |= NBD_CMD_FLAG_NO_HOLE;
}
- return nbd_co_writev_1(bs, sector_num, nb_sectors, qiov, offset);
+
+ nbd_coroutine_start(client, &request);
+ ret = nbd_co_send_request(bs, &request, NULL);
+ if (ret < 0) {
+ reply.error = -ret;
+ } else {
+ nbd_co_receive_reply(client, &request, &reply, NULL);
+ }
+ nbd_coroutine_end(bs, &request);
+ return -reply.error;
}
int nbd_client_co_flush(BlockDriverState *bs)
{
- NbdClientSession *client = nbd_get_client_session(bs);
- struct nbd_request request = { .type = NBD_CMD_FLUSH };
- struct nbd_reply reply;
+ NBDClientSession *client = nbd_get_client_session(bs);
+ NBDRequest request = { .type = NBD_CMD_FLUSH };
+ NBDReply reply;
ssize_t ret;
if (!(client->nbdflags & NBD_FLAG_SEND_FLUSH)) {
return 0;
}
- if (client->nbdflags & NBD_FLAG_SEND_FUA) {
- request.type |= NBD_CMD_FLAG_FUA;
- }
-
request.from = 0;
request.len = 0;
nbd_coroutine_start(client, &request);
- ret = nbd_co_send_request(bs, &request, NULL, 0);
+ ret = nbd_co_send_request(bs, &request, NULL);
if (ret < 0) {
reply.error = -ret;
} else {
- nbd_co_receive_reply(client, &request, &reply, NULL, 0);
+ nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
-int nbd_client_co_discard(BlockDriverState *bs, int64_t sector_num,
- int nb_sectors)
+int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
{
- NbdClientSession *client = nbd_get_client_session(bs);
- struct nbd_request request = { .type = NBD_CMD_TRIM };
- struct nbd_reply reply;
+ NBDClientSession *client = nbd_get_client_session(bs);
+ NBDRequest request = {
+ .type = NBD_CMD_TRIM,
+ .from = offset,
+ .len = count,
+ };
+ NBDReply reply;
ssize_t ret;
if (!(client->nbdflags & NBD_FLAG_SEND_TRIM)) {
return 0;
}
- request.from = sector_num * 512;
- request.len = nb_sectors * 512;
nbd_coroutine_start(client, &request);
- ret = nbd_co_send_request(bs, &request, NULL, 0);
+ ret = nbd_co_send_request(bs, &request, NULL);
if (ret < 0) {
reply.error = -ret;
} else {
- nbd_co_receive_reply(client, &request, &reply, NULL, 0);
+ nbd_co_receive_reply(client, &request, &reply, NULL);
}
- nbd_coroutine_end(client, &request);
+ nbd_coroutine_end(bs, &request);
return -reply.error;
}
void nbd_client_detach_aio_context(BlockDriverState *bs)
{
- aio_set_fd_handler(bdrv_get_aio_context(bs),
- nbd_get_client_session(bs)->sioc->fd,
- false, NULL, NULL, NULL);
+ NBDClientSession *client = nbd_get_client_session(bs);
+ qio_channel_detach_aio_context(QIO_CHANNEL(client->sioc));
}
void nbd_client_attach_aio_context(BlockDriverState *bs,
AioContext *new_context)
{
- aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
- false, nbd_reply_ready, NULL, bs);
+ NBDClientSession *client = nbd_get_client_session(bs);
+ qio_channel_attach_aio_context(QIO_CHANNEL(client->sioc), new_context);
+ aio_co_schedule(new_context, client->read_reply_co);
}
void nbd_client_close(BlockDriverState *bs)
{
- NbdClientSession *client = nbd_get_client_session(bs);
- struct nbd_request request = {
- .type = NBD_CMD_DISC,
- .from = 0,
- .len = 0
- };
+ NBDClientSession *client = nbd_get_client_session(bs);
+ NBDRequest request = { .type = NBD_CMD_DISC };
if (client->ioc == NULL) {
return;
const char *hostname,
Error **errp)
{
- NbdClientSession *client = nbd_get_client_session(bs);
+ NBDClientSession *client = nbd_get_client_session(bs);
int ret;
/* NBD handshake */
logout("Failed to negotiate with the NBD server\n");
return ret;
}
+ if (client->nbdflags & NBD_FLAG_SEND_FUA) {
+ bs->supported_write_flags = BDRV_REQ_FUA;
+ bs->supported_zero_flags |= BDRV_REQ_FUA;
+ }
+ if (client->nbdflags & NBD_FLAG_SEND_WRITE_ZEROES) {
+ bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
+ }
qemu_co_mutex_init(&client->send_mutex);
- qemu_co_mutex_init(&client->free_sema);
+ qemu_co_queue_init(&client->free_sema);
client->sioc = sioc;
object_ref(OBJECT(client->sioc));
/* Now that we're connected, set the socket to be non-blocking and
* kick the reply mechanism. */
qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
-
+ client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
logout("Established connection with NBD server\n");