#define logout(fmt, ...) ((void)0)
#endif
+#define MAX_NBD_REQUESTS 16
+#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
+#define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs))
+
typedef struct BDRVNBDState {
int sock;
uint32_t nbdflags;
size_t blocksize;
char *export_name; /* An NBD server may export several devices */
+ CoMutex send_mutex;
+ CoMutex free_sema;
+ Coroutine *send_coroutine;
+ int in_flight;
+
+ Coroutine *recv_coroutine[MAX_NBD_REQUESTS];
+ struct nbd_reply reply;
+
/* If it begins with '/', this is a UNIX domain socket. Otherwise,
* it's a string of the form <hostname|ip4|\[ip6\]>:port
*/
return err;
}
+static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request)
+{
+ int i;
+
+ /* Poor man semaphore. The free_sema is locked when no other request
+ * can be accepted, and unlocked after receiving one reply. */
+ if (s->in_flight >= MAX_NBD_REQUESTS - 1) {
+ qemu_co_mutex_lock(&s->free_sema);
+ assert(s->in_flight < MAX_NBD_REQUESTS);
+ }
+ s->in_flight++;
+
+ for (i = 0; i < MAX_NBD_REQUESTS; i++) {
+ if (s->recv_coroutine[i] == NULL) {
+ s->recv_coroutine[i] = qemu_coroutine_self();
+ break;
+ }
+ }
+
+ assert(i < MAX_NBD_REQUESTS);
+ request->handle = INDEX_TO_HANDLE(s, i);
+}
+
+static int nbd_have_request(void *opaque)
+{
+ BDRVNBDState *s = opaque;
+
+ return s->in_flight > 0;
+}
+
+static void nbd_reply_ready(void *opaque)
+{
+ BDRVNBDState *s = opaque;
+ int i;
+
+ if (s->reply.handle == 0) {
+ /* No reply already in flight. Fetch a header. */
+ if (nbd_receive_reply(s->sock, &s->reply) < 0) {
+ s->reply.handle = 0;
+ goto fail;
+ }
+ }
+
+ /* There's no need for a mutex on the receive side, because the
+ * handler acts as a synchronization point and ensures that only
+ * one coroutine is called until the reply finishes. */
+ i = HANDLE_TO_INDEX(s, s->reply.handle);
+ if (s->recv_coroutine[i]) {
+ qemu_coroutine_enter(s->recv_coroutine[i], NULL);
+ return;
+ }
+
+fail:
+ for (i = 0; i < MAX_NBD_REQUESTS; i++) {
+ if (s->recv_coroutine[i]) {
+ qemu_coroutine_enter(s->recv_coroutine[i], NULL);
+ }
+ }
+}
+
+static void nbd_restart_write(void *opaque)
+{
+ BDRVNBDState *s = opaque;
+ qemu_coroutine_enter(s->send_coroutine, NULL);
+}
+
+static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request,
+ struct iovec *iov, int offset)
+{
+ int rc, ret;
+
+ qemu_co_mutex_lock(&s->send_mutex);
+ s->send_coroutine = qemu_coroutine_self();
+ qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write,
+ nbd_have_request, NULL, s);
+ rc = nbd_send_request(s->sock, request);
+ if (rc != -1 && iov) {
+ ret = qemu_co_sendv(s->sock, iov, request->len, offset);
+ if (ret != request->len) {
+ errno = -EIO;
+ rc = -1;
+ }
+ }
+ qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
+ nbd_have_request, NULL, s);
+ s->send_coroutine = NULL;
+ qemu_co_mutex_unlock(&s->send_mutex);
+ return rc;
+}
+
+static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request,
+ struct nbd_reply *reply,
+ struct iovec *iov, int offset)
+{
+ int ret;
+
+ /* Wait until we're woken up by the read handler. TODO: perhaps
+ * peek at the next reply and avoid yielding if it's ours? */
+ qemu_coroutine_yield();
+ *reply = s->reply;
+ if (reply->handle != request->handle) {
+ reply->error = EIO;
+ } else {
+ if (iov && reply->error == 0) {
+ ret = qemu_co_recvv(s->sock, iov, request->len, offset);
+ if (ret != request->len) {
+ reply->error = EIO;
+ }
+ }
+
+ /* Tell the read handler to read another header. */
+ s->reply.handle = 0;
+ }
+}
+
+static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request)
+{
+ int i = HANDLE_TO_INDEX(s, request->handle);
+ s->recv_coroutine[i] = NULL;
+ if (s->in_flight-- == MAX_NBD_REQUESTS) {
+ qemu_co_mutex_unlock(&s->free_sema);
+ }
+}
+
static int nbd_establish_connection(BlockDriverState *bs)
{
BDRVNBDState *s = bs->opaque;
return -errno;
}
- /* Now that we're connected, set the socket to be non-blocking */
+ /* Now that we're connected, set the socket to be non-blocking and
+ * kick the reply mechanism. */
socket_set_nonblock(sock);
+ qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
+ nbd_have_request, NULL, s);
s->sock = sock;
s->size = size;
struct nbd_request request;
request.type = NBD_CMD_DISC;
- request.handle = (uint64_t)(intptr_t)bs;
request.from = 0;
request.len = 0;
nbd_send_request(s->sock, &request);
+ qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
closesocket(s->sock);
}
BDRVNBDState *s = bs->opaque;
int result;
+ qemu_co_mutex_init(&s->send_mutex);
+ qemu_co_mutex_init(&s->free_sema);
+
/* Pop the config into our state object. Exit if invalid. */
result = nbd_config(s, filename, flags);
if (result != 0) {
return result;
}
-static int nbd_read(BlockDriverState *bs, int64_t sector_num,
- uint8_t *buf, int nb_sectors)
+static int nbd_co_readv_1(BlockDriverState *bs, int64_t sector_num,
+ int nb_sectors, QEMUIOVector *qiov,
+ int offset)
{
BDRVNBDState *s = bs->opaque;
struct nbd_request request;
struct nbd_reply reply;
request.type = NBD_CMD_READ;
- request.handle = (uint64_t)(intptr_t)bs;
- request.from = sector_num * 512;;
+ request.from = sector_num * 512;
request.len = nb_sectors * 512;
- if (nbd_send_request(s->sock, &request) == -1)
- return -errno;
+ nbd_coroutine_start(s, &request);
+ if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
+ reply.error = errno;
+ } else {
+ nbd_co_receive_reply(s, &request, &reply, qiov->iov, offset);
+ }
+ nbd_coroutine_end(s, &request);
+ return -reply.error;
- if (nbd_receive_reply(s->sock, &reply) == -1)
- return -errno;
+}
- if (reply.error !=0)
- return -reply.error;
+static int nbd_co_writev_1(BlockDriverState *bs, int64_t sector_num,
+ int nb_sectors, QEMUIOVector *qiov,
+ int offset)
+{
+ BDRVNBDState *s = bs->opaque;
+ struct nbd_request request;
+ struct nbd_reply reply;
- if (reply.handle != request.handle)
- return -EIO;
+ request.type = NBD_CMD_WRITE;
+ if (!bdrv_enable_write_cache(bs) && (s->nbdflags & NBD_FLAG_SEND_FUA)) {
+ request.type |= NBD_CMD_FLAG_FUA;
+ }
- if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
- return -EIO;
+ request.from = sector_num * 512;
+ request.len = nb_sectors * 512;
- return 0;
+ nbd_coroutine_start(s, &request);
+ if (nbd_co_send_request(s, &request, qiov->iov, offset) == -1) {
+ reply.error = errno;
+ } else {
+ nbd_co_receive_reply(s, &request, &reply, NULL, 0);
+ }
+ nbd_coroutine_end(s, &request);
+ return -reply.error;
+}
+
+/* qemu-nbd has a limit of slightly less than 1M per request. Try to
+ * remain aligned to 4K. */
+#define NBD_MAX_SECTORS 2040
+
+static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num,
+ int nb_sectors, QEMUIOVector *qiov)
+{
+ int offset = 0;
+ int ret;
+ while (nb_sectors > NBD_MAX_SECTORS) {
+ ret = nbd_co_readv_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset);
+ if (ret < 0) {
+ return ret;
+ }
+ offset += NBD_MAX_SECTORS * 512;
+ sector_num += NBD_MAX_SECTORS;
+ nb_sectors -= NBD_MAX_SECTORS;
+ }
+ return nbd_co_readv_1(bs, sector_num, nb_sectors, qiov, offset);
+}
+
+static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num,
+ int nb_sectors, QEMUIOVector *qiov)
+{
+ int offset = 0;
+ int ret;
+ while (nb_sectors > NBD_MAX_SECTORS) {
+ ret = nbd_co_writev_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset);
+ if (ret < 0) {
+ return ret;
+ }
+ offset += NBD_MAX_SECTORS * 512;
+ sector_num += NBD_MAX_SECTORS;
+ nb_sectors -= NBD_MAX_SECTORS;
+ }
+ return nbd_co_writev_1(bs, sector_num, nb_sectors, qiov, offset);
}
-static int nbd_write(BlockDriverState *bs, int64_t sector_num,
- const uint8_t *buf, int nb_sectors)
+static int nbd_co_flush(BlockDriverState *bs)
{
BDRVNBDState *s = bs->opaque;
struct nbd_request request;
struct nbd_reply reply;
- request.type = NBD_CMD_WRITE;
- request.handle = (uint64_t)(intptr_t)bs;
- request.from = sector_num * 512;;
- request.len = nb_sectors * 512;
+ if (!(s->nbdflags & NBD_FLAG_SEND_FLUSH)) {
+ return 0;
+ }
- if (nbd_send_request(s->sock, &request) == -1)
- return -errno;
+ request.type = NBD_CMD_FLUSH;
+ if (s->nbdflags & NBD_FLAG_SEND_FUA) {
+ request.type |= NBD_CMD_FLAG_FUA;
+ }
- if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
- return -EIO;
+ request.from = 0;
+ request.len = 0;
- if (nbd_receive_reply(s->sock, &reply) == -1)
- return -errno;
+ nbd_coroutine_start(s, &request);
+ if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
+ reply.error = errno;
+ } else {
+ nbd_co_receive_reply(s, &request, &reply, NULL, 0);
+ }
+ nbd_coroutine_end(s, &request);
+ return -reply.error;
+}
- if (reply.error !=0)
- return -reply.error;
+static int nbd_co_discard(BlockDriverState *bs, int64_t sector_num,
+ int nb_sectors)
+{
+ BDRVNBDState *s = bs->opaque;
+ struct nbd_request request;
+ struct nbd_reply reply;
- if (reply.handle != request.handle)
- return -EIO;
+ if (!(s->nbdflags & NBD_FLAG_SEND_TRIM)) {
+ return 0;
+ }
+ request.type = NBD_CMD_TRIM;
+ request.from = sector_num * 512;;
+ request.len = nb_sectors * 512;
- return 0;
+ nbd_coroutine_start(s, &request);
+ if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
+ reply.error = errno;
+ } else {
+ nbd_co_receive_reply(s, &request, &reply, NULL, 0);
+ }
+ nbd_coroutine_end(s, &request);
+ return -reply.error;
}
static void nbd_close(BlockDriverState *bs)
}
static BlockDriver bdrv_nbd = {
- .format_name = "nbd",
- .instance_size = sizeof(BDRVNBDState),
- .bdrv_file_open = nbd_open,
- .bdrv_read = nbd_read,
- .bdrv_write = nbd_write,
- .bdrv_close = nbd_close,
- .bdrv_getlength = nbd_getlength,
- .protocol_name = "nbd",
+ .format_name = "nbd",
+ .instance_size = sizeof(BDRVNBDState),
+ .bdrv_file_open = nbd_open,
+ .bdrv_co_readv = nbd_co_readv,
+ .bdrv_co_writev = nbd_co_writev,
+ .bdrv_close = nbd_close,
+ .bdrv_co_flush_to_os = nbd_co_flush,
+ .bdrv_co_discard = nbd_co_discard,
+ .bdrv_getlength = nbd_getlength,
+ .protocol_name = "nbd",
};
static void bdrv_nbd_init(void)