]> Git Repo - qemu.git/blobdiff - migration/qemu-file.c
migration: Postcopy recover with preempt enabled
[qemu.git] / migration / qemu-file.c
index e33c46764fc40d2d262d1720f511881033a331b9..2f266b25cdf63ed55c1f27ce10fd08d39e8ddd35 100644 (file)
@@ -23,6 +23,7 @@
  */
 #include "qemu/osdep.h"
 #include <zlib.h>
+#include "qemu/madvise.h"
 #include "qemu/error-report.h"
 #include "qemu/iov.h"
 #include "migration.h"
 #include "qapi/error.h"
 
 #define IO_BUF_SIZE 32768
-#define MAX_IOV_SIZE MIN(IOV_MAX, 64)
+#define MAX_IOV_SIZE MIN_CONST(IOV_MAX, 64)
 
 struct QEMUFile {
-    const QEMUFileOps *ops;
     const QEMUFileHooks *hooks;
-    void *opaque;
+    QIOChannel *ioc;
+    bool is_writable;
 
-    int64_t bytes_xfer;
-    int64_t xfer_limit;
+    /*
+     * Maximum amount of data in bytes to transfer during one
+     * rate limiting time window
+     */
+    int64_t rate_limit_max;
+    /*
+     * Total amount of data in bytes queued for transfer
+     * during this rate limiting time window
+     */
+    int64_t rate_limit_used;
+
+    /* The sum of bytes transferred on the wire */
+    int64_t total_transferred;
 
-    int64_t pos; /* start of buffer when writing, end of buffer
-                    when reading */
     int buf_index;
     int buf_size; /* 0 when writing */
     uint8_t buf[IO_BUF_SIZE];
@@ -53,30 +63,35 @@ struct QEMUFile {
 
     int last_error;
     Error *last_error_obj;
+    /* has the file has been shutdown */
+    bool shutdown;
 };
 
 /*
  * Stop a file from being read/written - not all backing files can do this
  * typically only sockets can.
+ *
+ * TODO: convert to propagate Error objects instead of squashing
+ * to a fixed errno value
  */
 int qemu_file_shutdown(QEMUFile *f)
 {
-    if (!f->ops->shut_down) {
+    int ret = 0;
+
+    f->shutdown = true;
+    if (!qio_channel_has_feature(f->ioc,
+                                 QIO_CHANNEL_FEATURE_SHUTDOWN)) {
         return -ENOSYS;
     }
-    return f->ops->shut_down(f->opaque, true, true, NULL);
-}
 
-/*
- * Result: QEMUFile* for a 'return path' for comms in the opposite direction
- *         NULL if not available
- */
-QEMUFile *qemu_file_get_return_path(QEMUFile *f)
-{
-    if (!f->ops->get_return_path) {
-        return NULL;
+    if (qio_channel_shutdown(f->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL) < 0) {
+        ret = -EIO;
+    }
+
+    if (!f->last_error) {
+        qemu_file_set_error(f, -EIO);
     }
-    return f->ops->get_return_path(f->opaque);
+    return ret;
 }
 
 bool qemu_file_mode_is_not_valid(const char *mode)
@@ -91,17 +106,37 @@ bool qemu_file_mode_is_not_valid(const char *mode)
     return false;
 }
 
-QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops)
+static QEMUFile *qemu_file_new_impl(QIOChannel *ioc, bool is_writable)
 {
     QEMUFile *f;
 
     f = g_new0(QEMUFile, 1);
 
-    f->opaque = opaque;
-    f->ops = ops;
+    object_ref(ioc);
+    f->ioc = ioc;
+    f->is_writable = is_writable;
+
     return f;
 }
 
+/*
+ * Result: QEMUFile* for a 'return path' for comms in the opposite direction
+ *         NULL if not available
+ */
+QEMUFile *qemu_file_get_return_path(QEMUFile *f)
+{
+    return qemu_file_new_impl(f->ioc, !f->is_writable);
+}
+
+QEMUFile *qemu_file_new_output(QIOChannel *ioc)
+{
+    return qemu_file_new_impl(ioc, true);
+}
+
+QEMUFile *qemu_file_new_input(QIOChannel *ioc)
+{
+    return qemu_file_new_impl(ioc, false);
+}
 
 void qemu_file_set_hooks(QEMUFile *f, const QEMUFileHooks *hooks)
 {
@@ -125,6 +160,33 @@ int qemu_file_get_error_obj(QEMUFile *f, Error **errp)
     return f->last_error;
 }
 
+/*
+ * Get last error for either stream f1 or f2 with optional Error*.
+ * The error returned (non-zero) can be either from f1 or f2.
+ *
+ * If any of the qemufile* is NULL, then skip the check on that file.
+ *
+ * When there is no error on both qemufile, zero is returned.
+ */
+int qemu_file_get_error_obj_any(QEMUFile *f1, QEMUFile *f2, Error **errp)
+{
+    int ret = 0;
+
+    if (f1) {
+        ret = qemu_file_get_error_obj(f1, errp);
+        /* If there's already error detected, return */
+        if (ret) {
+            return ret;
+        }
+    }
+
+    if (f2) {
+        ret = qemu_file_get_error_obj(f2, errp);
+    }
+
+    return ret;
+}
+
 /*
  * Set the last error for stream f with optional Error*
  */
@@ -160,7 +222,7 @@ void qemu_file_set_error(QEMUFile *f, int ret)
 
 bool qemu_file_is_writable(QEMUFile *f)
 {
-    return f->ops->writev_buffer;
+    return f->is_writable;
 }
 
 static void qemu_iovec_release_ram(QEMUFile *f)
@@ -198,40 +260,35 @@ static void qemu_iovec_release_ram(QEMUFile *f)
     memset(f->may_free, 0, sizeof(f->may_free));
 }
 
+
 /**
  * Flushes QEMUFile buffer
  *
- * If there is writev_buffer QEMUFileOps it uses it otherwise uses
- * put_buffer ops. This will flush all pending data. If data was
- * only partially flushed, it will set an error state.
+ * This will flush all pending data. If data was only partially flushed, it
+ * will set an error state.
  */
 void qemu_fflush(QEMUFile *f)
 {
-    ssize_t ret = 0;
-    ssize_t expect = 0;
-    Error *local_error = NULL;
-
     if (!qemu_file_is_writable(f)) {
         return;
     }
 
+    if (f->shutdown) {
+        return;
+    }
     if (f->iovcnt > 0) {
-        expect = iov_size(f->iov, f->iovcnt);
-        ret = f->ops->writev_buffer(f->opaque, f->iov, f->iovcnt, f->pos,
-                                    &local_error);
+        Error *local_error = NULL;
+        if (qio_channel_writev_all(f->ioc,
+                                   f->iov, f->iovcnt,
+                                   &local_error) < 0) {
+            qemu_file_set_error_obj(f, -EIO, local_error);
+        } else {
+            f->total_transferred += iov_size(f->iov, f->iovcnt);
+        }
 
         qemu_iovec_release_ram(f);
     }
 
-    if (ret >= 0) {
-        f->pos += ret;
-    }
-    /* We expect the QEMUFile write impl to send the full
-     * data set we requested, so sanity check that.
-     */
-    if (ret != expect) {
-        qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
-    }
     f->buf_index = 0;
     f->iovcnt = 0;
 }
@@ -241,7 +298,7 @@ void ram_control_before_iterate(QEMUFile *f, uint64_t flags)
     int ret = 0;
 
     if (f->hooks && f->hooks->before_ram_iterate) {
-        ret = f->hooks->before_ram_iterate(f, f->opaque, flags, NULL);
+        ret = f->hooks->before_ram_iterate(f, flags, NULL);
         if (ret < 0) {
             qemu_file_set_error(f, ret);
         }
@@ -253,7 +310,7 @@ void ram_control_after_iterate(QEMUFile *f, uint64_t flags)
     int ret = 0;
 
     if (f->hooks && f->hooks->after_ram_iterate) {
-        ret = f->hooks->after_ram_iterate(f, f->opaque, flags, NULL);
+        ret = f->hooks->after_ram_iterate(f, flags, NULL);
         if (ret < 0) {
             qemu_file_set_error(f, ret);
         }
@@ -265,7 +322,7 @@ void ram_control_load_hook(QEMUFile *f, uint64_t flags, void *data)
     int ret = -EINVAL;
 
     if (f->hooks && f->hooks->hook_ram_load) {
-        ret = f->hooks->hook_ram_load(f, f->opaque, flags, data);
+        ret = f->hooks->hook_ram_load(f, flags, data);
         if (ret < 0) {
             qemu_file_set_error(f, ret);
         }
@@ -285,16 +342,16 @@ size_t ram_control_save_page(QEMUFile *f, ram_addr_t block_offset,
                              uint64_t *bytes_sent)
 {
     if (f->hooks && f->hooks->save_page) {
-        int ret = f->hooks->save_page(f, f->opaque, block_offset,
+        int ret = f->hooks->save_page(f, block_offset,
                                       offset, size, bytes_sent);
         if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
-            f->bytes_xfer += size;
+            f->rate_limit_used += size;
         }
 
         if (ret != RAM_SAVE_CONTROL_DELAYED &&
             ret != RAM_SAVE_CONTROL_NOT_SUPP) {
             if (bytes_sent && *bytes_sent > 0) {
-                qemu_update_position(f, *bytes_sent);
+                qemu_file_credit_transfer(f, *bytes_sent);
             } else if (ret < 0) {
                 qemu_file_set_error(f, ret);
             }
@@ -329,11 +386,29 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
     f->buf_index = 0;
     f->buf_size = pending;
 
-    len = f->ops->get_buffer(f->opaque, f->buf + pending, f->pos,
-                             IO_BUF_SIZE - pending, &local_error);
+    if (f->shutdown) {
+        return 0;
+    }
+
+    do {
+        len = qio_channel_read(f->ioc,
+                               (char *)f->buf + pending,
+                               IO_BUF_SIZE - pending,
+                               &local_error);
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            if (qemu_in_coroutine()) {
+                qio_channel_yield(f->ioc, G_IO_IN);
+            } else {
+                qio_channel_wait(f->ioc, G_IO_IN);
+            }
+        } else if (len < 0) {
+            len = -EIO;
+        }
+    } while (len == QIO_CHANNEL_ERR_BLOCK);
+
     if (len > 0) {
         f->buf_size += len;
-        f->pos += len;
+        f->total_transferred += len;
     } else if (len == 0) {
         qemu_file_set_error_obj(f, -EIO, local_error);
     } else if (len != -EAGAIN) {
@@ -345,9 +420,9 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
     return len;
 }
 
-void qemu_update_position(QEMUFile *f, size_t size)
+void qemu_file_credit_transfer(QEMUFile *f, size_t size)
 {
-    f->pos += size;
+    f->total_transferred += size;
 }
 
 /** Closes the file
@@ -360,16 +435,16 @@ void qemu_update_position(QEMUFile *f, size_t size)
  */
 int qemu_fclose(QEMUFile *f)
 {
-    int ret;
+    int ret, ret2;
     qemu_fflush(f);
     ret = qemu_file_get_error(f);
 
-    if (f->ops->close) {
-        int ret2 = f->ops->close(f->opaque, NULL);
-        if (ret >= 0) {
-            ret = ret2;
-        }
+    ret2 = qio_channel_close(f->ioc, NULL);
+    if (ret >= 0) {
+        ret = ret2;
     }
+    g_clear_pointer(&f->ioc, object_unref);
+
     /* If any error was spotted before closing, we should report it
      * instead of the close() return value.
      */
@@ -382,8 +457,16 @@ int qemu_fclose(QEMUFile *f)
     return ret;
 }
 
-static void add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
-                         bool may_free)
+/*
+ * Add buf to iovec. Do flush if iovec is full.
+ *
+ * Return values:
+ * 1 iovec is full and flushed
+ * 0 iovec is not flushed
+ *
+ */
+static int add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
+                        bool may_free)
 {
     /* check for adjacent buffer and coalesce them */
     if (f->iovcnt > 0 && buf == f->iov[f->iovcnt - 1].iov_base +
@@ -392,6 +475,11 @@ static void add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
     {
         f->iov[f->iovcnt - 1].iov_len += size;
     } else {
+        if (f->iovcnt >= MAX_IOV_SIZE) {
+            /* Should only happen if a previous fflush failed */
+            assert(f->shutdown || !qemu_file_is_writable(f));
+            return 1;
+        }
         if (may_free) {
             set_bit(f->iovcnt, f->may_free);
         }
@@ -401,6 +489,19 @@ static void add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size,
 
     if (f->iovcnt >= MAX_IOV_SIZE) {
         qemu_fflush(f);
+        return 1;
+    }
+
+    return 0;
+}
+
+static void add_buf_to_iovec(QEMUFile *f, size_t len)
+{
+    if (!add_to_iovec(f, f->buf + f->buf_index, len, false)) {
+        f->buf_index += len;
+        if (f->buf_index == IO_BUF_SIZE) {
+            qemu_fflush(f);
+        }
     }
 }
 
@@ -411,7 +512,7 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size,
         return;
     }
 
-    f->bytes_xfer += size;
+    f->rate_limit_used += size;
     add_to_iovec(f, buf, size, may_free);
 }
 
@@ -429,12 +530,8 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
             l = size;
         }
         memcpy(f->buf + f->buf_index, buf, l);
-        f->bytes_xfer += l;
-        add_to_iovec(f, f->buf + f->buf_index, l, false);
-        f->buf_index += l;
-        if (f->buf_index == IO_BUF_SIZE) {
-            qemu_fflush(f);
-        }
+        f->rate_limit_used += l;
+        add_buf_to_iovec(f, l);
         if (qemu_file_get_error(f)) {
             break;
         }
@@ -450,12 +547,8 @@ void qemu_put_byte(QEMUFile *f, int v)
     }
 
     f->buf[f->buf_index] = v;
-    f->bytes_xfer++;
-    add_to_iovec(f, f->buf + f->buf_index, 1, false);
-    f->buf_index++;
-    if (f->buf_index == IO_BUF_SIZE) {
-        qemu_fflush(f);
-    }
+    f->rate_limit_used++;
+    add_buf_to_iovec(f, 1);
 }
 
 void qemu_file_skip(QEMUFile *f, int size)
@@ -566,7 +659,7 @@ size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size)
 {
     if (size < IO_BUF_SIZE) {
         size_t res;
-        uint8_t *src;
+        uint8_t *src = NULL;
 
         res = qemu_peek_buffer(f, &src, size, 0);
 
@@ -610,9 +703,9 @@ int qemu_get_byte(QEMUFile *f)
     return result;
 }
 
-int64_t qemu_ftell_fast(QEMUFile *f)
+int64_t qemu_file_total_transferred_fast(QEMUFile *f)
 {
-    int64_t ret = f->pos;
+    int64_t ret = f->total_transferred;
     int i;
 
     for (i = 0; i < f->iovcnt; i++) {
@@ -622,18 +715,21 @@ int64_t qemu_ftell_fast(QEMUFile *f)
     return ret;
 }
 
-int64_t qemu_ftell(QEMUFile *f)
+int64_t qemu_file_total_transferred(QEMUFile *f)
 {
     qemu_fflush(f);
-    return f->pos;
+    return f->total_transferred;
 }
 
 int qemu_file_rate_limit(QEMUFile *f)
 {
+    if (f->shutdown) {
+        return 1;
+    }
     if (qemu_file_get_error(f)) {
         return 1;
     }
-    if (f->xfer_limit > 0 && f->bytes_xfer > f->xfer_limit) {
+    if (f->rate_limit_max > 0 && f->rate_limit_used > f->rate_limit_max) {
         return 1;
     }
     return 0;
@@ -641,22 +737,22 @@ int qemu_file_rate_limit(QEMUFile *f)
 
 int64_t qemu_file_get_rate_limit(QEMUFile *f)
 {
-    return f->xfer_limit;
+    return f->rate_limit_max;
 }
 
 void qemu_file_set_rate_limit(QEMUFile *f, int64_t limit)
 {
-    f->xfer_limit = limit;
+    f->rate_limit_max = limit;
 }
 
 void qemu_file_reset_rate_limit(QEMUFile *f)
 {
-    f->bytes_xfer = 0;
+    f->rate_limit_used = 0;
 }
 
-void qemu_file_update_transfer(QEMUFile *f, int64_t len)
+void qemu_file_acct_rate_limit(QEMUFile *f, int64_t len)
 {
-    f->bytes_xfer += len;
+    f->rate_limit_used += len;
 }
 
 void qemu_put_be16(QEMUFile *f, unsigned int v)
@@ -732,11 +828,8 @@ static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
 /* Compress size bytes of data start at p and store the compressed
  * data to the buffer of f.
  *
- * When f is not writable, return -1 if f has no space to save the
- * compressed data.
- * When f is wirtable and it has no space to save the compressed data,
- * do fflush first, if f still has no space to save the compressed
- * data, return -1.
+ * Since the file is dummy file with empty_ops, return -1 if f has no space to
+ * save the compressed data.
  */
 ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
                                   const uint8_t *p, size_t size)
@@ -744,14 +837,7 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
     ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
 
     if (blen < compressBound(size)) {
-        if (!qemu_file_is_writable(f)) {
-            return -1;
-        }
-        qemu_fflush(f);
-        blen = IO_BUF_SIZE - sizeof(int32_t);
-        if (blen < compressBound(size)) {
-            return -1;
-        }
+        return -1;
     }
 
     blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
@@ -761,13 +847,7 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
     }
 
     qemu_put_be32(f, blen);
-    if (f->ops->writev_buffer) {
-        add_to_iovec(f, f->buf + f->buf_index, blen, false);
-    }
-    f->buf_index += blen;
-    if (f->buf_index == IO_BUF_SIZE) {
-        qemu_fflush(f);
-    }
+    add_buf_to_iovec(f, blen);
     return blen + sizeof(int32_t);
 }
 
@@ -826,7 +906,18 @@ void qemu_put_counted_string(QEMUFile *f, const char *str)
  */
 void qemu_file_set_blocking(QEMUFile *f, bool block)
 {
-    if (f->ops->set_blocking) {
-        f->ops->set_blocking(f->opaque, block, NULL);
-    }
+    qio_channel_set_blocking(f->ioc, block, NULL);
+}
+
+/*
+ * qemu_file_get_ioc:
+ *
+ * Get the ioc object for the file, without incrementing
+ * the reference count.
+ *
+ * Returns: the ioc object
+ */
+QIOChannel *qemu_file_get_ioc(QEMUFile *file)
+{
+    return file->ioc;
 }
This page took 0.04009 seconds and 4 git commands to generate.