]> Git Repo - qemu.git/blob - block/mirror.c
mirror: add buf-size argument to drive-mirror
[qemu.git] / block / mirror.c
1 /*
2  * Image mirroring
3  *
4  * Copyright Red Hat, Inc. 2012
5  *
6  * Authors:
7  *  Paolo Bonzini  <[email protected]>
8  *
9  * This work is licensed under the terms of the GNU LGPL, version 2 or later.
10  * See the COPYING.LIB file in the top-level directory.
11  *
12  */
13
14 #include "trace.h"
15 #include "block/blockjob.h"
16 #include "block/block_int.h"
17 #include "qemu/ratelimit.h"
18 #include "qemu/bitmap.h"
19
20 #define SLICE_TIME 100000000ULL /* ns */
21
22 typedef struct MirrorBlockJob {
23     BlockJob common;
24     RateLimit limit;
25     BlockDriverState *target;
26     MirrorSyncMode mode;
27     BlockdevOnError on_source_error, on_target_error;
28     bool synced;
29     bool should_complete;
30     int64_t sector_num;
31     int64_t granularity;
32     size_t buf_size;
33     unsigned long *cow_bitmap;
34     HBitmapIter hbi;
35     uint8_t *buf;
36
37     int in_flight;
38     int ret;
39 } MirrorBlockJob;
40
41 typedef struct MirrorOp {
42     MirrorBlockJob *s;
43     QEMUIOVector qiov;
44     struct iovec iov;
45     int64_t sector_num;
46     int nb_sectors;
47 } MirrorOp;
48
49 static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
50                                             int error)
51 {
52     s->synced = false;
53     if (read) {
54         return block_job_error_action(&s->common, s->common.bs,
55                                       s->on_source_error, true, error);
56     } else {
57         return block_job_error_action(&s->common, s->target,
58                                       s->on_target_error, false, error);
59     }
60 }
61
62 static void mirror_iteration_done(MirrorOp *op, int ret)
63 {
64     MirrorBlockJob *s = op->s;
65     int64_t chunk_num;
66     int nb_chunks, sectors_per_chunk;
67
68     trace_mirror_iteration_done(s, op->sector_num, op->nb_sectors, ret);
69
70     s->in_flight--;
71     sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
72     chunk_num = op->sector_num / sectors_per_chunk;
73     nb_chunks = op->nb_sectors / sectors_per_chunk;
74     if (s->cow_bitmap && ret >= 0) {
75         bitmap_set(s->cow_bitmap, chunk_num, nb_chunks);
76     }
77
78     g_slice_free(MirrorOp, op);
79     qemu_coroutine_enter(s->common.co, NULL);
80 }
81
82 static void mirror_write_complete(void *opaque, int ret)
83 {
84     MirrorOp *op = opaque;
85     MirrorBlockJob *s = op->s;
86     if (ret < 0) {
87         BlockDriverState *source = s->common.bs;
88         BlockErrorAction action;
89
90         bdrv_set_dirty(source, op->sector_num, op->nb_sectors);
91         action = mirror_error_action(s, false, -ret);
92         if (action == BDRV_ACTION_REPORT && s->ret >= 0) {
93             s->ret = ret;
94         }
95     }
96     mirror_iteration_done(op, ret);
97 }
98
99 static void mirror_read_complete(void *opaque, int ret)
100 {
101     MirrorOp *op = opaque;
102     MirrorBlockJob *s = op->s;
103     if (ret < 0) {
104         BlockDriverState *source = s->common.bs;
105         BlockErrorAction action;
106
107         bdrv_set_dirty(source, op->sector_num, op->nb_sectors);
108         action = mirror_error_action(s, true, -ret);
109         if (action == BDRV_ACTION_REPORT && s->ret >= 0) {
110             s->ret = ret;
111         }
112
113         mirror_iteration_done(op, ret);
114         return;
115     }
116     bdrv_aio_writev(s->target, op->sector_num, &op->qiov, op->nb_sectors,
117                     mirror_write_complete, op);
118 }
119
120 static void coroutine_fn mirror_iteration(MirrorBlockJob *s)
121 {
122     BlockDriverState *source = s->common.bs;
123     int nb_sectors, sectors_per_chunk;
124     int64_t end, sector_num, chunk_num;
125     MirrorOp *op;
126
127     s->sector_num = hbitmap_iter_next(&s->hbi);
128     if (s->sector_num < 0) {
129         bdrv_dirty_iter_init(source, &s->hbi);
130         s->sector_num = hbitmap_iter_next(&s->hbi);
131         trace_mirror_restart_iter(s, bdrv_get_dirty_count(source));
132         assert(s->sector_num >= 0);
133     }
134
135     /* If we have no backing file yet in the destination, and the cluster size
136      * is very large, we need to do COW ourselves.  The first time a cluster is
137      * copied, copy it entirely.
138      *
139      * Because both the granularity and the cluster size are powers of two, the
140      * number of sectors to copy cannot exceed one cluster.
141      */
142     sector_num = s->sector_num;
143     sectors_per_chunk = nb_sectors = s->granularity >> BDRV_SECTOR_BITS;
144     chunk_num = sector_num / sectors_per_chunk;
145     if (s->cow_bitmap && !test_bit(chunk_num, s->cow_bitmap)) {
146         trace_mirror_cow(s, sector_num);
147         bdrv_round_to_clusters(s->target,
148                                sector_num, sectors_per_chunk,
149                                &sector_num, &nb_sectors);
150     }
151
152     end = s->common.len >> BDRV_SECTOR_BITS;
153     nb_sectors = MIN(nb_sectors, end - sector_num);
154
155     /* Allocate a MirrorOp that is used as an AIO callback.  */
156     op = g_slice_new(MirrorOp);
157     op->s = s;
158     op->iov.iov_base = s->buf;
159     op->iov.iov_len  = nb_sectors * 512;
160     op->sector_num = sector_num;
161     op->nb_sectors = nb_sectors;
162     qemu_iovec_init_external(&op->qiov, &op->iov, 1);
163
164     bdrv_reset_dirty(source, sector_num, nb_sectors);
165
166     /* Copy the dirty cluster.  */
167     s->in_flight++;
168     trace_mirror_one_iteration(s, sector_num, nb_sectors);
169     bdrv_aio_readv(source, sector_num, &op->qiov, nb_sectors,
170                    mirror_read_complete, op);
171 }
172
173 static void mirror_drain(MirrorBlockJob *s)
174 {
175     while (s->in_flight > 0) {
176         qemu_coroutine_yield();
177     }
178 }
179
180 static void coroutine_fn mirror_run(void *opaque)
181 {
182     MirrorBlockJob *s = opaque;
183     BlockDriverState *bs = s->common.bs;
184     int64_t sector_num, end, sectors_per_chunk, length;
185     uint64_t last_pause_ns;
186     BlockDriverInfo bdi;
187     char backing_filename[1024];
188     int ret = 0;
189     int n;
190
191     if (block_job_is_cancelled(&s->common)) {
192         goto immediate_exit;
193     }
194
195     s->common.len = bdrv_getlength(bs);
196     if (s->common.len < 0) {
197         block_job_completed(&s->common, s->common.len);
198         return;
199     }
200
201     /* If we have no backing file yet in the destination, we cannot let
202      * the destination do COW.  Instead, we copy sectors around the
203      * dirty data if needed.  We need a bitmap to do that.
204      */
205     bdrv_get_backing_filename(s->target, backing_filename,
206                               sizeof(backing_filename));
207     if (backing_filename[0] && !s->target->backing_hd) {
208         bdrv_get_info(s->target, &bdi);
209         if (s->granularity < bdi.cluster_size) {
210             s->buf_size = MAX(s->buf_size, bdi.cluster_size);
211             length = (bdrv_getlength(bs) + s->granularity - 1) / s->granularity;
212             s->cow_bitmap = bitmap_new(length);
213         }
214     }
215
216     end = s->common.len >> BDRV_SECTOR_BITS;
217     s->buf = qemu_blockalign(bs, s->buf_size);
218     sectors_per_chunk = s->granularity >> BDRV_SECTOR_BITS;
219
220     if (s->mode != MIRROR_SYNC_MODE_NONE) {
221         /* First part, loop on the sectors and initialize the dirty bitmap.  */
222         BlockDriverState *base;
223         base = s->mode == MIRROR_SYNC_MODE_FULL ? NULL : bs->backing_hd;
224         for (sector_num = 0; sector_num < end; ) {
225             int64_t next = (sector_num | (sectors_per_chunk - 1)) + 1;
226             ret = bdrv_co_is_allocated_above(bs, base,
227                                              sector_num, next - sector_num, &n);
228
229             if (ret < 0) {
230                 goto immediate_exit;
231             }
232
233             assert(n > 0);
234             if (ret == 1) {
235                 bdrv_set_dirty(bs, sector_num, n);
236                 sector_num = next;
237             } else {
238                 sector_num += n;
239             }
240         }
241     }
242
243     bdrv_dirty_iter_init(bs, &s->hbi);
244     last_pause_ns = qemu_get_clock_ns(rt_clock);
245     for (;;) {
246         uint64_t delay_ns;
247         int64_t cnt;
248         bool should_complete;
249
250         if (s->ret < 0) {
251             ret = s->ret;
252             goto immediate_exit;
253         }
254
255         cnt = bdrv_get_dirty_count(bs);
256
257         /* Note that even when no rate limit is applied we need to yield
258          * periodically with no pending I/O so that qemu_aio_flush() returns.
259          * We do so every SLICE_TIME nanoseconds, or when there is an error,
260          * or when the source is clean, whichever comes first.
261          */
262         if (qemu_get_clock_ns(rt_clock) - last_pause_ns < SLICE_TIME &&
263             s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
264             if (s->in_flight > 0) {
265                 trace_mirror_yield(s, s->in_flight, cnt);
266                 qemu_coroutine_yield();
267                 continue;
268             } else if (cnt != 0) {
269                 mirror_iteration(s);
270                 continue;
271             }
272         }
273
274         should_complete = false;
275         if (s->in_flight == 0 && cnt == 0) {
276             trace_mirror_before_flush(s);
277             ret = bdrv_flush(s->target);
278             if (ret < 0) {
279                 if (mirror_error_action(s, false, -ret) == BDRV_ACTION_REPORT) {
280                     goto immediate_exit;
281                 }
282             } else {
283                 /* We're out of the streaming phase.  From now on, if the job
284                  * is cancelled we will actually complete all pending I/O and
285                  * report completion.  This way, block-job-cancel will leave
286                  * the target in a consistent state.
287                  */
288                 s->common.offset = end * BDRV_SECTOR_SIZE;
289                 if (!s->synced) {
290                     block_job_ready(&s->common);
291                     s->synced = true;
292                 }
293
294                 should_complete = s->should_complete ||
295                     block_job_is_cancelled(&s->common);
296                 cnt = bdrv_get_dirty_count(bs);
297             }
298         }
299
300         if (cnt == 0 && should_complete) {
301             /* The dirty bitmap is not updated while operations are pending.
302              * If we're about to exit, wait for pending operations before
303              * calling bdrv_get_dirty_count(bs), or we may exit while the
304              * source has dirty data to copy!
305              *
306              * Note that I/O can be submitted by the guest while
307              * mirror_populate runs.
308              */
309             trace_mirror_before_drain(s, cnt);
310             bdrv_drain_all();
311             cnt = bdrv_get_dirty_count(bs);
312         }
313
314         ret = 0;
315         trace_mirror_before_sleep(s, cnt, s->synced);
316         if (!s->synced) {
317             /* Publish progress */
318             s->common.offset = (end - cnt) * BDRV_SECTOR_SIZE;
319
320             if (s->common.speed) {
321                 delay_ns = ratelimit_calculate_delay(&s->limit, sectors_per_chunk);
322             } else {
323                 delay_ns = 0;
324             }
325
326             block_job_sleep_ns(&s->common, rt_clock, delay_ns);
327             if (block_job_is_cancelled(&s->common)) {
328                 break;
329             }
330         } else if (!should_complete) {
331             delay_ns = (s->in_flight == 0 && cnt == 0 ? SLICE_TIME : 0);
332             block_job_sleep_ns(&s->common, rt_clock, delay_ns);
333         } else if (cnt == 0) {
334             /* The two disks are in sync.  Exit and report successful
335              * completion.
336              */
337             assert(QLIST_EMPTY(&bs->tracked_requests));
338             s->common.cancelled = false;
339             break;
340         }
341         last_pause_ns = qemu_get_clock_ns(rt_clock);
342     }
343
344 immediate_exit:
345     if (s->in_flight > 0) {
346         /* We get here only if something went wrong.  Either the job failed,
347          * or it was cancelled prematurely so that we do not guarantee that
348          * the target is a copy of the source.
349          */
350         assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
351         mirror_drain(s);
352     }
353
354     assert(s->in_flight == 0);
355     qemu_vfree(s->buf);
356     g_free(s->cow_bitmap);
357     bdrv_set_dirty_tracking(bs, 0);
358     bdrv_iostatus_disable(s->target);
359     if (s->should_complete && ret == 0) {
360         if (bdrv_get_flags(s->target) != bdrv_get_flags(s->common.bs)) {
361             bdrv_reopen(s->target, bdrv_get_flags(s->common.bs), NULL);
362         }
363         bdrv_swap(s->target, s->common.bs);
364     }
365     bdrv_close(s->target);
366     bdrv_delete(s->target);
367     block_job_completed(&s->common, ret);
368 }
369
370 static void mirror_set_speed(BlockJob *job, int64_t speed, Error **errp)
371 {
372     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
373
374     if (speed < 0) {
375         error_set(errp, QERR_INVALID_PARAMETER, "speed");
376         return;
377     }
378     ratelimit_set_speed(&s->limit, speed / BDRV_SECTOR_SIZE, SLICE_TIME);
379 }
380
381 static void mirror_iostatus_reset(BlockJob *job)
382 {
383     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
384
385     bdrv_iostatus_reset(s->target);
386 }
387
388 static void mirror_complete(BlockJob *job, Error **errp)
389 {
390     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
391     int ret;
392
393     ret = bdrv_open_backing_file(s->target);
394     if (ret < 0) {
395         char backing_filename[PATH_MAX];
396         bdrv_get_full_backing_filename(s->target, backing_filename,
397                                        sizeof(backing_filename));
398         error_set(errp, QERR_OPEN_FILE_FAILED, backing_filename);
399         return;
400     }
401     if (!s->synced) {
402         error_set(errp, QERR_BLOCK_JOB_NOT_READY, job->bs->device_name);
403         return;
404     }
405
406     s->should_complete = true;
407     block_job_resume(job);
408 }
409
410 static BlockJobType mirror_job_type = {
411     .instance_size = sizeof(MirrorBlockJob),
412     .job_type      = "mirror",
413     .set_speed     = mirror_set_speed,
414     .iostatus_reset= mirror_iostatus_reset,
415     .complete      = mirror_complete,
416 };
417
418 void mirror_start(BlockDriverState *bs, BlockDriverState *target,
419                   int64_t speed, int64_t granularity, int64_t buf_size,
420                   MirrorSyncMode mode, BlockdevOnError on_source_error,
421                   BlockdevOnError on_target_error,
422                   BlockDriverCompletionFunc *cb,
423                   void *opaque, Error **errp)
424 {
425     MirrorBlockJob *s;
426
427     if (granularity == 0) {
428         /* Choose the default granularity based on the target file's cluster
429          * size, clamped between 4k and 64k.  */
430         BlockDriverInfo bdi;
431         if (bdrv_get_info(target, &bdi) >= 0 && bdi.cluster_size != 0) {
432             granularity = MAX(4096, bdi.cluster_size);
433             granularity = MIN(65536, granularity);
434         } else {
435             granularity = 65536;
436         }
437     }
438
439     assert ((granularity & (granularity - 1)) == 0);
440
441     if ((on_source_error == BLOCKDEV_ON_ERROR_STOP ||
442          on_source_error == BLOCKDEV_ON_ERROR_ENOSPC) &&
443         !bdrv_iostatus_is_enabled(bs)) {
444         error_set(errp, QERR_INVALID_PARAMETER, "on-source-error");
445         return;
446     }
447
448     s = block_job_create(&mirror_job_type, bs, speed, cb, opaque, errp);
449     if (!s) {
450         return;
451     }
452
453     s->on_source_error = on_source_error;
454     s->on_target_error = on_target_error;
455     s->target = target;
456     s->mode = mode;
457     s->granularity = granularity;
458     s->buf_size = MAX(buf_size, granularity);
459
460     bdrv_set_dirty_tracking(bs, granularity);
461     bdrv_set_enable_write_cache(s->target, true);
462     bdrv_set_on_error(s->target, on_target_error, on_target_error);
463     bdrv_iostatus_enable(s->target);
464     s->common.co = qemu_coroutine_create(mirror_run);
465     trace_mirror_start(bs, s, s->common.co, opaque);
466     qemu_coroutine_enter(s->common.co, s);
467 }
This page took 0.050168 seconds and 4 git commands to generate.