#include "qemu/ratelimit.h"
#include "qemu/bitmap.h"
-#define SLICE_TIME 100000000ULL /* ns */
+#define SLICE_TIME 100000000ULL /* ns */
+#define MAX_IN_FLIGHT 16
+
+/* The mirroring buffer is a list of granularity-sized chunks.
+ * Free chunks are organized in a list.
+ */
+typedef struct MirrorBuffer {
+ QSIMPLEQ_ENTRY(MirrorBuffer) next;
+} MirrorBuffer;
typedef struct MirrorBlockJob {
BlockJob common;
unsigned long *cow_bitmap;
HBitmapIter hbi;
uint8_t *buf;
+ QSIMPLEQ_HEAD(, MirrorBuffer) buf_free;
+ int buf_free_count;
+ unsigned long *in_flight_bitmap;
int in_flight;
int ret;
} MirrorBlockJob;
typedef struct MirrorOp {
MirrorBlockJob *s;
QEMUIOVector qiov;
- struct iovec iov;
int64_t sector_num;
int nb_sectors;
} MirrorOp;
static void mirror_iteration_done(MirrorOp *op, int ret)
{
MirrorBlockJob *s = op->s;
+ struct iovec *iov;
int64_t chunk_num;
- int nb_chunks, sectors_per_chunk;
+ int i, nb_chunks, sectors_per_chunk;
trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors, ret);
s->in_flight--;
+ iov = op->qiov.iov;
+ for (i = 0; i < op->qiov.niov; i++) {
+ MirrorBuffer *buf = (MirrorBuffer *) iov[i].iov_base;
+ QSIMPLEQ_INSERT_TAIL(&s->buf_free, buf, next);
+ s->buf_free_count++;
+ }
+
sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
chunk_num = op->sector_num / sectors_per_chunk;
nb_chunks = op->nb_sectors / sectors_per_chunk;
+ bitmap_clear(s->in_flight_bitmap, chunk_num, nb_chunks);
if (s->cow_bitmap && ret >= 0) {
bitmap_set(s->cow_bitmap, chunk_num, nb_chunks);
}
static void coroutine_fn mirror_iteration(MirrorBlockJob *s)
{
BlockDriverState *source = s->common.bs;
- int nb_sectors, sectors_per_chunk;
- int64_t end, sector_num, chunk_num;
+ int nb_sectors, sectors_per_chunk, nb_chunks;
+ int64_t end, sector_num, next_chunk, next_sector, hbitmap_next_sector;
MirrorOp *op;
s->sector_num = hbitmap_iter_next(&s->hbi);
assert(s->sector_num >= 0);
}
- /* If we have no backing file yet in the destination, and the cluster size
- * is very large, we need to do COW ourselves. The first time a cluster is
- * copied, copy it entirely.
+ hbitmap_next_sector = s->sector_num;
+ sector_num = s->sector_num;
+ sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
+ end = s->common.len >> BDRV_SECTOR_BITS;
+
+ /* Extend the QEMUIOVector to include all adjacent blocks that will
+ * be copied in this operation.
*
- * Because both the granularity and the cluster size are powers of two, the
- * number of sectors to copy cannot exceed one cluster.
+ * We have to do this if we have no backing file yet in the destination,
+ * and the cluster size is very large. Then we need to do COW ourselves.
+ * The first time a cluster is copied, copy it entirely. Note that,
+ * because both the granularity and the cluster size are powers of two,
+ * the number of sectors to copy cannot exceed one cluster.
+ *
+ * We also want to extend the QEMUIOVector to include more adjacent
+ * dirty blocks if possible, to limit the number of I/O operations and
+ * run efficiently even with a small granularity.
*/
- sector_num = s->sector_num;
- sectors_per_chunk = nb_sectors = s->granularity >> BDRV_SECTOR_BITS;
- chunk_num = sector_num / sectors_per_chunk;
- if (s->cow_bitmap && !test_bit(chunk_num, s->cow_bitmap)) {
- trace_mirror_cow(s, sector_num);
- bdrv_round_to_clusters(s->target,
- sector_num, sectors_per_chunk,
- §or_num, &nb_sectors);
+ nb_chunks = 0;
+ nb_sectors = 0;
+ next_sector = sector_num;
+ next_chunk = sector_num / sectors_per_chunk;
+
+ /* Wait for I/O to this cluster (from a previous iteration) to be done. */
+ while (test_bit(next_chunk, s->in_flight_bitmap)) {
+ trace_mirror_yield_in_flight(s, sector_num, s->in_flight);
+ qemu_coroutine_yield();
}
- end = s->common.len >> BDRV_SECTOR_BITS;
- nb_sectors = MIN(nb_sectors, end - sector_num);
+ do {
+ int added_sectors, added_chunks;
+
+ if (!bdrv_get_dirty(source, next_sector) ||
+ test_bit(next_chunk, s->in_flight_bitmap)) {
+ assert(nb_sectors > 0);
+ break;
+ }
+
+ added_sectors = sectors_per_chunk;
+ if (s->cow_bitmap && !test_bit(next_chunk, s->cow_bitmap)) {
+ bdrv_round_to_clusters(s->target,
+ next_sector, added_sectors,
+ &next_sector, &added_sectors);
+
+ /* On the first iteration, the rounding may make us copy
+ * sectors before the first dirty one.
+ */
+ if (next_sector < sector_num) {
+ assert(nb_sectors == 0);
+ sector_num = next_sector;
+ next_chunk = next_sector / sectors_per_chunk;
+ }
+ }
+
+ added_sectors = MIN(added_sectors, end - (sector_num + nb_sectors));
+ added_chunks = (added_sectors + sectors_per_chunk - 1) / sectors_per_chunk;
+
+ /* When doing COW, it may happen that there is not enough space for
+ * a full cluster. Wait if that is the case.
+ */
+ while (nb_chunks == 0 && s->buf_free_count < added_chunks) {
+ trace_mirror_yield_buf_busy(s, nb_chunks, s->in_flight);
+ qemu_coroutine_yield();
+ }
+ if (s->buf_free_count < nb_chunks + added_chunks) {
+ trace_mirror_break_buf_busy(s, nb_chunks, s->in_flight);
+ break;
+ }
+
+ /* We have enough free space to copy these sectors. */
+ bitmap_set(s->in_flight_bitmap, next_chunk, added_chunks);
+
+ nb_sectors += added_sectors;
+ nb_chunks += added_chunks;
+ next_sector += added_sectors;
+ next_chunk += added_chunks;
+ } while (next_sector < end);
/* Allocate a MirrorOp that is used as an AIO callback. */
op = g_slice_new(MirrorOp);
op->s = s;
- op->iov.iov_base = s->buf;
- op->iov.iov_len = nb_sectors * 512;
op->sector_num = sector_num;
op->nb_sectors = nb_sectors;
- qemu_iovec_init_external(&op->qiov, &op->iov, 1);
+
+ /* Now make a QEMUIOVector taking enough granularity-sized chunks
+ * from s->buf_free.
+ */
+ qemu_iovec_init(&op->qiov, nb_chunks);
+ next_sector = sector_num;
+ while (nb_chunks-- > 0) {
+ MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
+ QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
+ s->buf_free_count--;
+ qemu_iovec_add(&op->qiov, buf, s->granularity);
+
+ /* Advance the HBitmapIter in parallel, so that we do not examine
+ * the same sector twice.
+ */
+ if (next_sector > hbitmap_next_sector && bdrv_get_dirty(source, next_sector)) {
+ hbitmap_next_sector = hbitmap_iter_next(&s->hbi);
+ }
+
+ next_sector += sectors_per_chunk;
+ }
bdrv_reset_dirty(source, sector_num, nb_sectors);
mirror_read_complete, op);
}
+static void mirror_free_init(MirrorBlockJob *s)
+{
+ int granularity = s->granularity;
+ size_t buf_size = s->buf_size;
+ uint8_t *buf = s->buf;
+
+ assert(s->buf_free_count == 0);
+ QSIMPLEQ_INIT(&s->buf_free);
+ while (buf_size != 0) {
+ MirrorBuffer *cur = (MirrorBuffer *)buf;
+ QSIMPLEQ_INSERT_TAIL(&s->buf_free, cur, next);
+ s->buf_free_count++;
+ buf_size -= granularity;
+ buf += granularity;
+ }
+}
+
static void mirror_drain(MirrorBlockJob *s)
{
while (s->in_flight > 0) {
}
s->common.len = bdrv_getlength(bs);
- if (s->common.len < 0) {
+ if (s->common.len <= 0) {
block_job_completed(&s->common, s->common.len);
return;
}
+ length = (bdrv_getlength(bs) + s->granularity - 1) / s->granularity;
+ s->in_flight_bitmap = bitmap_new(length);
+
/* If we have no backing file yet in the destination, we cannot let
* the destination do COW. Instead, we copy sectors around the
* dirty data if needed. We need a bitmap to do that.
if (backing_filename[0] && !s->target->backing_hd) {
bdrv_get_info(s->target, &bdi);
if (s->granularity < bdi.cluster_size) {
- s->buf_size = bdi.cluster_size;
- length = (bdrv_getlength(bs) + s->granularity - 1) / s->granularity;
+ s->buf_size = MAX(s->buf_size, bdi.cluster_size);
s->cow_bitmap = bitmap_new(length);
}
}
end = s->common.len >> BDRV_SECTOR_BITS;
s->buf = qemu_blockalign(bs, s->buf_size);
sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
+ mirror_free_init(s);
if (s->mode != MIRROR_SYNC_MODE_NONE) {
/* First part, loop on the sectors and initialize the dirty bitmap. */
*/
if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME &&
s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
- if (s->in_flight > 0) {
- trace_mirror_yield(s, s->in_flight, cnt);
+ if (s->in_flight == MAX_IN_FLIGHT || s->buf_free_count == 0 ||
+ (cnt == 0 && s->in_flight > 0)) {
+ trace_mirror_yield(s, s->in_flight, s->buf_free_count, cnt);
qemu_coroutine_yield();
continue;
} else if (cnt != 0) {
assert(s->in_flight == 0);
qemu_vfree(s->buf);
g_free(s->cow_bitmap);
+ g_free(s->in_flight_bitmap);
bdrv_set_dirty_tracking(bs, 0);
bdrv_iostatus_disable(s->target);
if (s->should_complete && ret == 0) {
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
int ret;
- ret = bdrv_open_backing_file(s->target);
+ ret = bdrv_open_backing_file(s->target, NULL);
if (ret < 0) {
char backing_filename[PATH_MAX];
bdrv_get_full_backing_filename(s->target, backing_filename,
};
void mirror_start(BlockDriverState *bs, BlockDriverState *target,
- int64_t speed, int64_t granularity, MirrorSyncMode mode,
- BlockdevOnError on_source_error,
+ int64_t speed, int64_t granularity, int64_t buf_size,
+ MirrorSyncMode mode, BlockdevOnError on_source_error,
BlockdevOnError on_target_error,
BlockDriverCompletionFunc *cb,
void *opaque, Error **errp)
s->target = target;
s->mode = mode;
s->granularity = granularity;
- s->buf_size = granularity;
+ s->buf_size = MAX(buf_size, granularity);
bdrv_set_dirty_tracking(bs, granularity);
bdrv_set_enable_write_cache(s->target, true);