]> Git Repo - qemu.git/blob - blockjob.c
job: Move state transitions to Job
[qemu.git] / blockjob.c
1 /*
2  * QEMU System Emulator block driver
3  *
4  * Copyright (c) 2011 IBM Corp.
5  * Copyright (c) 2012 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25
26 #include "qemu/osdep.h"
27 #include "qemu-common.h"
28 #include "block/block.h"
29 #include "block/blockjob_int.h"
30 #include "block/block_int.h"
31 #include "block/trace.h"
32 #include "sysemu/block-backend.h"
33 #include "qapi/error.h"
34 #include "qapi/qapi-events-block-core.h"
35 #include "qapi/qmp/qerror.h"
36 #include "qemu/coroutine.h"
37 #include "qemu/timer.h"
38
39 /* Right now, this mutex is only needed to synchronize accesses to job->busy
40  * and job->sleep_timer, such as concurrent calls to block_job_do_yield and
41  * block_job_enter. */
42 static QemuMutex block_job_mutex;
43
44 static void block_job_lock(void)
45 {
46     qemu_mutex_lock(&block_job_mutex);
47 }
48
49 static void block_job_unlock(void)
50 {
51     qemu_mutex_unlock(&block_job_mutex);
52 }
53
54 static void __attribute__((__constructor__)) block_job_init(void)
55 {
56     qemu_mutex_init(&block_job_mutex);
57 }
58
59 static void block_job_event_cancelled(BlockJob *job);
60 static void block_job_event_completed(BlockJob *job, const char *msg);
61 static int block_job_event_pending(BlockJob *job);
62 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job));
63
64 /* Transactional group of block jobs */
65 struct BlockJobTxn {
66
67     /* Is this txn being cancelled? */
68     bool aborting;
69
70     /* List of jobs */
71     QLIST_HEAD(, BlockJob) jobs;
72
73     /* Reference count */
74     int refcnt;
75 };
76
77 /*
78  * The block job API is composed of two categories of functions.
79  *
80  * The first includes functions used by the monitor.  The monitor is
81  * peculiar in that it accesses the block job list with block_job_get, and
82  * therefore needs consistency across block_job_get and the actual operation
83  * (e.g. block_job_set_speed).  The consistency is achieved with
84  * aio_context_acquire/release.  These functions are declared in blockjob.h.
85  *
86  * The second includes functions used by the block job drivers and sometimes
87  * by the core block layer.  These do not care about locking, because the
88  * whole coroutine runs under the AioContext lock, and are declared in
89  * blockjob_int.h.
90  */
91
92 static bool is_block_job(Job *job)
93 {
94     return job_type(job) == JOB_TYPE_BACKUP ||
95            job_type(job) == JOB_TYPE_COMMIT ||
96            job_type(job) == JOB_TYPE_MIRROR ||
97            job_type(job) == JOB_TYPE_STREAM;
98 }
99
100 BlockJob *block_job_next(BlockJob *bjob)
101 {
102     Job *job = bjob ? &bjob->job : NULL;
103
104     do {
105         job = job_next(job);
106     } while (job && !is_block_job(job));
107
108     return job ? container_of(job, BlockJob, job) : NULL;
109 }
110
111 BlockJob *block_job_get(const char *id)
112 {
113     Job *job = job_get(id);
114
115     if (job && is_block_job(job)) {
116         return container_of(job, BlockJob, job);
117     } else {
118         return NULL;
119     }
120 }
121
122 BlockJobTxn *block_job_txn_new(void)
123 {
124     BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
125     QLIST_INIT(&txn->jobs);
126     txn->refcnt = 1;
127     return txn;
128 }
129
130 static void block_job_txn_ref(BlockJobTxn *txn)
131 {
132     txn->refcnt++;
133 }
134
135 void block_job_txn_unref(BlockJobTxn *txn)
136 {
137     if (txn && --txn->refcnt == 0) {
138         g_free(txn);
139     }
140 }
141
142 void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
143 {
144     if (!txn) {
145         return;
146     }
147
148     assert(!job->txn);
149     job->txn = txn;
150
151     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
152     block_job_txn_ref(txn);
153 }
154
155 static void block_job_txn_del_job(BlockJob *job)
156 {
157     if (job->txn) {
158         QLIST_REMOVE(job, txn_list);
159         block_job_txn_unref(job->txn);
160         job->txn = NULL;
161     }
162 }
163
164 /* Assumes the block_job_mutex is held */
165 static bool block_job_timer_pending(BlockJob *job)
166 {
167     return timer_pending(&job->sleep_timer);
168 }
169
170 /* Assumes the block_job_mutex is held */
171 static bool block_job_timer_not_pending(BlockJob *job)
172 {
173     return !block_job_timer_pending(job);
174 }
175
176 static void block_job_pause(BlockJob *job)
177 {
178     job->pause_count++;
179 }
180
181 static void block_job_resume(BlockJob *job)
182 {
183     assert(job->pause_count > 0);
184     job->pause_count--;
185     if (job->pause_count) {
186         return;
187     }
188
189     /* kick only if no timer is pending */
190     block_job_enter_cond(job, block_job_timer_not_pending);
191 }
192
193 void block_job_ref(BlockJob *job)
194 {
195     ++job->refcnt;
196 }
197
198 static void block_job_attached_aio_context(AioContext *new_context,
199                                            void *opaque);
200 static void block_job_detach_aio_context(void *opaque);
201
202 void block_job_unref(BlockJob *job)
203 {
204     if (--job->refcnt == 0) {
205         assert(job->job.status == JOB_STATUS_NULL);
206         assert(!job->txn);
207         BlockDriverState *bs = blk_bs(job->blk);
208         bs->job = NULL;
209         block_job_remove_all_bdrv(job);
210         blk_remove_aio_context_notifier(job->blk,
211                                         block_job_attached_aio_context,
212                                         block_job_detach_aio_context, job);
213         blk_unref(job->blk);
214         error_free(job->blocker);
215         assert(!timer_pending(&job->sleep_timer));
216         job_delete(&job->job);
217     }
218 }
219
220 static void block_job_attached_aio_context(AioContext *new_context,
221                                            void *opaque)
222 {
223     BlockJob *job = opaque;
224
225     if (job->driver->attached_aio_context) {
226         job->driver->attached_aio_context(job, new_context);
227     }
228
229     block_job_resume(job);
230 }
231
232 static void block_job_drain(BlockJob *job)
233 {
234     /* If job is !job->busy this kicks it into the next pause point. */
235     block_job_enter(job);
236
237     blk_drain(job->blk);
238     if (job->driver->drain) {
239         job->driver->drain(job);
240     }
241 }
242
243 static void block_job_detach_aio_context(void *opaque)
244 {
245     BlockJob *job = opaque;
246
247     /* In case the job terminates during aio_poll()... */
248     block_job_ref(job);
249
250     block_job_pause(job);
251
252     while (!job->paused && !job->completed) {
253         block_job_drain(job);
254     }
255
256     block_job_unref(job);
257 }
258
259 static char *child_job_get_parent_desc(BdrvChild *c)
260 {
261     BlockJob *job = c->opaque;
262     return g_strdup_printf("%s job '%s'", job_type_str(&job->job), job->job.id);
263 }
264
265 static void child_job_drained_begin(BdrvChild *c)
266 {
267     BlockJob *job = c->opaque;
268     block_job_pause(job);
269 }
270
271 static void child_job_drained_end(BdrvChild *c)
272 {
273     BlockJob *job = c->opaque;
274     block_job_resume(job);
275 }
276
277 static const BdrvChildRole child_job = {
278     .get_parent_desc    = child_job_get_parent_desc,
279     .drained_begin      = child_job_drained_begin,
280     .drained_end        = child_job_drained_end,
281     .stay_at_node       = true,
282 };
283
284 void block_job_remove_all_bdrv(BlockJob *job)
285 {
286     GSList *l;
287     for (l = job->nodes; l; l = l->next) {
288         BdrvChild *c = l->data;
289         bdrv_op_unblock_all(c->bs, job->blocker);
290         bdrv_root_unref_child(c);
291     }
292     g_slist_free(job->nodes);
293     job->nodes = NULL;
294 }
295
296 int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
297                        uint64_t perm, uint64_t shared_perm, Error **errp)
298 {
299     BdrvChild *c;
300
301     c = bdrv_root_attach_child(bs, name, &child_job, perm, shared_perm,
302                                job, errp);
303     if (c == NULL) {
304         return -EPERM;
305     }
306
307     job->nodes = g_slist_prepend(job->nodes, c);
308     bdrv_ref(bs);
309     bdrv_op_block_all(bs, job->blocker);
310
311     return 0;
312 }
313
314 bool block_job_is_internal(BlockJob *job)
315 {
316     return (job->job.id == NULL);
317 }
318
319 static bool block_job_started(BlockJob *job)
320 {
321     return job->co;
322 }
323
324 const BlockJobDriver *block_job_driver(BlockJob *job)
325 {
326     return job->driver;
327 }
328
329 /**
330  * All jobs must allow a pause point before entering their job proper. This
331  * ensures that jobs can be paused prior to being started, then resumed later.
332  */
333 static void coroutine_fn block_job_co_entry(void *opaque)
334 {
335     BlockJob *job = opaque;
336
337     assert(job && job->driver && job->driver->start);
338     block_job_pause_point(job);
339     job->driver->start(job);
340 }
341
342 static void block_job_sleep_timer_cb(void *opaque)
343 {
344     BlockJob *job = opaque;
345
346     block_job_enter(job);
347 }
348
349 void block_job_start(BlockJob *job)
350 {
351     assert(job && !block_job_started(job) && job->paused &&
352            job->driver && job->driver->start);
353     job->co = qemu_coroutine_create(block_job_co_entry, job);
354     job->pause_count--;
355     job->busy = true;
356     job->paused = false;
357     job_state_transition(&job->job, JOB_STATUS_RUNNING);
358     bdrv_coroutine_enter(blk_bs(job->blk), job->co);
359 }
360
361 static void block_job_decommission(BlockJob *job)
362 {
363     assert(job);
364     job->completed = true;
365     job->busy = false;
366     job->paused = false;
367     job->deferred_to_main_loop = true;
368     block_job_txn_del_job(job);
369     job_state_transition(&job->job, JOB_STATUS_NULL);
370     block_job_unref(job);
371 }
372
373 static void block_job_do_dismiss(BlockJob *job)
374 {
375     block_job_decommission(job);
376 }
377
378 static void block_job_conclude(BlockJob *job)
379 {
380     job_state_transition(&job->job, JOB_STATUS_CONCLUDED);
381     if (job->auto_dismiss || !block_job_started(job)) {
382         block_job_do_dismiss(job);
383     }
384 }
385
386 static void block_job_update_rc(BlockJob *job)
387 {
388     if (!job->ret && block_job_is_cancelled(job)) {
389         job->ret = -ECANCELED;
390     }
391     if (job->ret) {
392         job_state_transition(&job->job, JOB_STATUS_ABORTING);
393     }
394 }
395
396 static int block_job_prepare(BlockJob *job)
397 {
398     if (job->ret == 0 && job->driver->prepare) {
399         job->ret = job->driver->prepare(job);
400     }
401     return job->ret;
402 }
403
404 static void block_job_commit(BlockJob *job)
405 {
406     assert(!job->ret);
407     if (job->driver->commit) {
408         job->driver->commit(job);
409     }
410 }
411
412 static void block_job_abort(BlockJob *job)
413 {
414     assert(job->ret);
415     if (job->driver->abort) {
416         job->driver->abort(job);
417     }
418 }
419
420 static void block_job_clean(BlockJob *job)
421 {
422     if (job->driver->clean) {
423         job->driver->clean(job);
424     }
425 }
426
427 static int block_job_finalize_single(BlockJob *job)
428 {
429     assert(job->completed);
430
431     /* Ensure abort is called for late-transactional failures */
432     block_job_update_rc(job);
433
434     if (!job->ret) {
435         block_job_commit(job);
436     } else {
437         block_job_abort(job);
438     }
439     block_job_clean(job);
440
441     if (job->cb) {
442         job->cb(job->opaque, job->ret);
443     }
444
445     /* Emit events only if we actually started */
446     if (block_job_started(job)) {
447         if (block_job_is_cancelled(job)) {
448             block_job_event_cancelled(job);
449         } else {
450             const char *msg = NULL;
451             if (job->ret < 0) {
452                 msg = strerror(-job->ret);
453             }
454             block_job_event_completed(job, msg);
455         }
456     }
457
458     block_job_txn_del_job(job);
459     block_job_conclude(job);
460     return 0;
461 }
462
463 static void block_job_cancel_async(BlockJob *job, bool force)
464 {
465     if (job->iostatus != BLOCK_DEVICE_IO_STATUS_OK) {
466         block_job_iostatus_reset(job);
467     }
468     if (job->user_paused) {
469         /* Do not call block_job_enter here, the caller will handle it.  */
470         job->user_paused = false;
471         job->pause_count--;
472     }
473     job->cancelled = true;
474     /* To prevent 'force == false' overriding a previous 'force == true' */
475     job->force |= force;
476 }
477
478 static int block_job_txn_apply(BlockJobTxn *txn, int fn(BlockJob *), bool lock)
479 {
480     AioContext *ctx;
481     BlockJob *job, *next;
482     int rc = 0;
483
484     QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
485         if (lock) {
486             ctx = blk_get_aio_context(job->blk);
487             aio_context_acquire(ctx);
488         }
489         rc = fn(job);
490         if (lock) {
491             aio_context_release(ctx);
492         }
493         if (rc) {
494             break;
495         }
496     }
497     return rc;
498 }
499
500 static int block_job_finish_sync(BlockJob *job,
501                                  void (*finish)(BlockJob *, Error **errp),
502                                  Error **errp)
503 {
504     Error *local_err = NULL;
505     int ret;
506
507     assert(blk_bs(job->blk)->job == job);
508
509     block_job_ref(job);
510
511     if (finish) {
512         finish(job, &local_err);
513     }
514     if (local_err) {
515         error_propagate(errp, local_err);
516         block_job_unref(job);
517         return -EBUSY;
518     }
519     /* block_job_drain calls block_job_enter, and it should be enough to
520      * induce progress until the job completes or moves to the main thread.
521     */
522     while (!job->deferred_to_main_loop && !job->completed) {
523         block_job_drain(job);
524     }
525     while (!job->completed) {
526         aio_poll(qemu_get_aio_context(), true);
527     }
528     ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
529     block_job_unref(job);
530     return ret;
531 }
532
533 static void block_job_completed_txn_abort(BlockJob *job)
534 {
535     AioContext *ctx;
536     BlockJobTxn *txn = job->txn;
537     BlockJob *other_job;
538
539     if (txn->aborting) {
540         /*
541          * We are cancelled by another job, which will handle everything.
542          */
543         return;
544     }
545     txn->aborting = true;
546     block_job_txn_ref(txn);
547
548     /* We are the first failed job. Cancel other jobs. */
549     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
550         ctx = blk_get_aio_context(other_job->blk);
551         aio_context_acquire(ctx);
552     }
553
554     /* Other jobs are effectively cancelled by us, set the status for
555      * them; this job, however, may or may not be cancelled, depending
556      * on the caller, so leave it. */
557     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
558         if (other_job != job) {
559             block_job_cancel_async(other_job, false);
560         }
561     }
562     while (!QLIST_EMPTY(&txn->jobs)) {
563         other_job = QLIST_FIRST(&txn->jobs);
564         ctx = blk_get_aio_context(other_job->blk);
565         if (!other_job->completed) {
566             assert(other_job->cancelled);
567             block_job_finish_sync(other_job, NULL, NULL);
568         }
569         block_job_finalize_single(other_job);
570         aio_context_release(ctx);
571     }
572
573     block_job_txn_unref(txn);
574 }
575
576 static int block_job_needs_finalize(BlockJob *job)
577 {
578     return !job->auto_finalize;
579 }
580
581 static void block_job_do_finalize(BlockJob *job)
582 {
583     int rc;
584     assert(job && job->txn);
585
586     /* prepare the transaction to complete */
587     rc = block_job_txn_apply(job->txn, block_job_prepare, true);
588     if (rc) {
589         block_job_completed_txn_abort(job);
590     } else {
591         block_job_txn_apply(job->txn, block_job_finalize_single, true);
592     }
593 }
594
595 static void block_job_completed_txn_success(BlockJob *job)
596 {
597     BlockJobTxn *txn = job->txn;
598     BlockJob *other_job;
599
600     job_state_transition(&job->job, JOB_STATUS_WAITING);
601
602     /*
603      * Successful completion, see if there are other running jobs in this
604      * txn.
605      */
606     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
607         if (!other_job->completed) {
608             return;
609         }
610         assert(other_job->ret == 0);
611     }
612
613     block_job_txn_apply(txn, block_job_event_pending, false);
614
615     /* If no jobs need manual finalization, automatically do so */
616     if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) {
617         block_job_do_finalize(job);
618     }
619 }
620
621 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
622 {
623     int64_t old_speed = job->speed;
624
625     if (job_apply_verb(&job->job, JOB_VERB_SET_SPEED, errp)) {
626         return;
627     }
628     if (speed < 0) {
629         error_setg(errp, QERR_INVALID_PARAMETER, "speed");
630         return;
631     }
632
633     ratelimit_set_speed(&job->limit, speed, BLOCK_JOB_SLICE_TIME);
634
635     job->speed = speed;
636     if (speed && speed <= old_speed) {
637         return;
638     }
639
640     /* kick only if a timer is pending */
641     block_job_enter_cond(job, block_job_timer_pending);
642 }
643
644 int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
645 {
646     if (!job->speed) {
647         return 0;
648     }
649
650     return ratelimit_calculate_delay(&job->limit, n);
651 }
652
653 void block_job_complete(BlockJob *job, Error **errp)
654 {
655     /* Should not be reachable via external interface for internal jobs */
656     assert(job->job.id);
657     if (job_apply_verb(&job->job, JOB_VERB_COMPLETE, errp)) {
658         return;
659     }
660     if (job->pause_count || job->cancelled || !job->driver->complete) {
661         error_setg(errp, "The active block job '%s' cannot be completed",
662                    job->job.id);
663         return;
664     }
665
666     job->driver->complete(job, errp);
667 }
668
669 void block_job_finalize(BlockJob *job, Error **errp)
670 {
671     assert(job && job->job.id);
672     if (job_apply_verb(&job->job, JOB_VERB_FINALIZE, errp)) {
673         return;
674     }
675     block_job_do_finalize(job);
676 }
677
678 void block_job_dismiss(BlockJob **jobptr, Error **errp)
679 {
680     BlockJob *job = *jobptr;
681     /* similarly to _complete, this is QMP-interface only. */
682     assert(job->job.id);
683     if (job_apply_verb(&job->job, JOB_VERB_DISMISS, errp)) {
684         return;
685     }
686
687     block_job_do_dismiss(job);
688     *jobptr = NULL;
689 }
690
691 void block_job_user_pause(BlockJob *job, Error **errp)
692 {
693     if (job_apply_verb(&job->job, JOB_VERB_PAUSE, errp)) {
694         return;
695     }
696     if (job->user_paused) {
697         error_setg(errp, "Job is already paused");
698         return;
699     }
700     job->user_paused = true;
701     block_job_pause(job);
702 }
703
704 bool block_job_user_paused(BlockJob *job)
705 {
706     return job->user_paused;
707 }
708
709 void block_job_user_resume(BlockJob *job, Error **errp)
710 {
711     assert(job);
712     if (!job->user_paused || job->pause_count <= 0) {
713         error_setg(errp, "Can't resume a job that was not paused");
714         return;
715     }
716     if (job_apply_verb(&job->job, JOB_VERB_RESUME, errp)) {
717         return;
718     }
719     block_job_iostatus_reset(job);
720     job->user_paused = false;
721     block_job_resume(job);
722 }
723
724 void block_job_cancel(BlockJob *job, bool force)
725 {
726     if (job->job.status == JOB_STATUS_CONCLUDED) {
727         block_job_do_dismiss(job);
728         return;
729     }
730     block_job_cancel_async(job, force);
731     if (!block_job_started(job)) {
732         block_job_completed(job, -ECANCELED);
733     } else if (job->deferred_to_main_loop) {
734         block_job_completed_txn_abort(job);
735     } else {
736         block_job_enter(job);
737     }
738 }
739
740 void block_job_user_cancel(BlockJob *job, bool force, Error **errp)
741 {
742     if (job_apply_verb(&job->job, JOB_VERB_CANCEL, errp)) {
743         return;
744     }
745     block_job_cancel(job, force);
746 }
747
748 /* A wrapper around block_job_cancel() taking an Error ** parameter so it may be
749  * used with block_job_finish_sync() without the need for (rather nasty)
750  * function pointer casts there. */
751 static void block_job_cancel_err(BlockJob *job, Error **errp)
752 {
753     block_job_cancel(job, false);
754 }
755
756 int block_job_cancel_sync(BlockJob *job)
757 {
758     return block_job_finish_sync(job, &block_job_cancel_err, NULL);
759 }
760
761 void block_job_cancel_sync_all(void)
762 {
763     BlockJob *job;
764     AioContext *aio_context;
765
766     while ((job = block_job_next(NULL))) {
767         aio_context = blk_get_aio_context(job->blk);
768         aio_context_acquire(aio_context);
769         block_job_cancel_sync(job);
770         aio_context_release(aio_context);
771     }
772 }
773
774 int block_job_complete_sync(BlockJob *job, Error **errp)
775 {
776     return block_job_finish_sync(job, &block_job_complete, errp);
777 }
778
779 void block_job_progress_update(BlockJob *job, uint64_t done)
780 {
781     job->offset += done;
782 }
783
784 void block_job_progress_set_remaining(BlockJob *job, uint64_t remaining)
785 {
786     job->len = job->offset + remaining;
787 }
788
789 BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
790 {
791     BlockJobInfo *info;
792
793     if (block_job_is_internal(job)) {
794         error_setg(errp, "Cannot query QEMU internal jobs");
795         return NULL;
796     }
797     info = g_new0(BlockJobInfo, 1);
798     info->type      = g_strdup(job_type_str(&job->job));
799     info->device    = g_strdup(job->job.id);
800     info->len       = job->len;
801     info->busy      = atomic_read(&job->busy);
802     info->paused    = job->pause_count > 0;
803     info->offset    = job->offset;
804     info->speed     = job->speed;
805     info->io_status = job->iostatus;
806     info->ready     = job->ready;
807     info->status    = job->job.status;
808     info->auto_finalize = job->auto_finalize;
809     info->auto_dismiss  = job->auto_dismiss;
810     info->has_error = job->ret != 0;
811     info->error     = job->ret ? g_strdup(strerror(-job->ret)) : NULL;
812     return info;
813 }
814
815 static void block_job_iostatus_set_err(BlockJob *job, int error)
816 {
817     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
818         job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
819                                           BLOCK_DEVICE_IO_STATUS_FAILED;
820     }
821 }
822
823 static void block_job_event_cancelled(BlockJob *job)
824 {
825     if (block_job_is_internal(job)) {
826         return;
827     }
828
829     qapi_event_send_block_job_cancelled(job_type(&job->job),
830                                         job->job.id,
831                                         job->len,
832                                         job->offset,
833                                         job->speed,
834                                         &error_abort);
835 }
836
837 static void block_job_event_completed(BlockJob *job, const char *msg)
838 {
839     if (block_job_is_internal(job)) {
840         return;
841     }
842
843     qapi_event_send_block_job_completed(job_type(&job->job),
844                                         job->job.id,
845                                         job->len,
846                                         job->offset,
847                                         job->speed,
848                                         !!msg,
849                                         msg,
850                                         &error_abort);
851 }
852
853 static int block_job_event_pending(BlockJob *job)
854 {
855     job_state_transition(&job->job, JOB_STATUS_PENDING);
856     if (!job->auto_finalize && !block_job_is_internal(job)) {
857         qapi_event_send_block_job_pending(job_type(&job->job),
858                                           job->job.id,
859                                           &error_abort);
860     }
861     return 0;
862 }
863
864 /*
865  * API for block job drivers and the block layer.  These functions are
866  * declared in blockjob_int.h.
867  */
868
869 void *block_job_create(const char *job_id, const BlockJobDriver *driver,
870                        BlockJobTxn *txn, BlockDriverState *bs, uint64_t perm,
871                        uint64_t shared_perm, int64_t speed, int flags,
872                        BlockCompletionFunc *cb, void *opaque, Error **errp)
873 {
874     BlockBackend *blk;
875     BlockJob *job;
876     int ret;
877
878     if (bs->job) {
879         error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs));
880         return NULL;
881     }
882
883     if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) {
884         job_id = bdrv_get_device_name(bs);
885         if (!*job_id) {
886             error_setg(errp, "An explicit job ID is required for this node");
887             return NULL;
888         }
889     }
890
891     if (job_id) {
892         if (flags & BLOCK_JOB_INTERNAL) {
893             error_setg(errp, "Cannot specify job ID for internal block job");
894             return NULL;
895         }
896     }
897
898     blk = blk_new(perm, shared_perm);
899     ret = blk_insert_bs(blk, bs, errp);
900     if (ret < 0) {
901         blk_unref(blk);
902         return NULL;
903     }
904
905     job = job_create(job_id, &driver->job_driver, errp);
906     if (job == NULL) {
907         blk_unref(blk);
908         return NULL;
909     }
910
911     assert(is_block_job(&job->job));
912
913     job->driver        = driver;
914     job->blk           = blk;
915     job->cb            = cb;
916     job->opaque        = opaque;
917     job->busy          = false;
918     job->paused        = true;
919     job->pause_count   = 1;
920     job->refcnt        = 1;
921     job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE);
922     job->auto_dismiss  = !(flags & BLOCK_JOB_MANUAL_DISMISS);
923     aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
924                    QEMU_CLOCK_REALTIME, SCALE_NS,
925                    block_job_sleep_timer_cb, job);
926
927     error_setg(&job->blocker, "block device is in use by block job: %s",
928                job_type_str(&job->job));
929     block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort);
930     bs->job = job;
931
932     bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
933
934     blk_add_aio_context_notifier(blk, block_job_attached_aio_context,
935                                  block_job_detach_aio_context, job);
936
937     /* Only set speed when necessary to avoid NotSupported error */
938     if (speed != 0) {
939         Error *local_err = NULL;
940
941         block_job_set_speed(job, speed, &local_err);
942         if (local_err) {
943             block_job_early_fail(job);
944             error_propagate(errp, local_err);
945             return NULL;
946         }
947     }
948
949     /* Single jobs are modeled as single-job transactions for sake of
950      * consolidating the job management logic */
951     if (!txn) {
952         txn = block_job_txn_new();
953         block_job_txn_add_job(txn, job);
954         block_job_txn_unref(txn);
955     } else {
956         block_job_txn_add_job(txn, job);
957     }
958
959     return job;
960 }
961
962 void block_job_early_fail(BlockJob *job)
963 {
964     assert(job->job.status == JOB_STATUS_CREATED);
965     block_job_decommission(job);
966 }
967
968 void block_job_completed(BlockJob *job, int ret)
969 {
970     assert(job && job->txn && !job->completed);
971     assert(blk_bs(job->blk)->job == job);
972     job->completed = true;
973     job->ret = ret;
974     block_job_update_rc(job);
975     trace_block_job_completed(job, ret, job->ret);
976     if (job->ret) {
977         block_job_completed_txn_abort(job);
978     } else {
979         block_job_completed_txn_success(job);
980     }
981 }
982
983 static bool block_job_should_pause(BlockJob *job)
984 {
985     return job->pause_count > 0;
986 }
987
988 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
989  * Reentering the job coroutine with block_job_enter() before the timer has
990  * expired is allowed and cancels the timer.
991  *
992  * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
993  * called explicitly. */
994 static void block_job_do_yield(BlockJob *job, uint64_t ns)
995 {
996     block_job_lock();
997     if (ns != -1) {
998         timer_mod(&job->sleep_timer, ns);
999     }
1000     job->busy = false;
1001     block_job_unlock();
1002     qemu_coroutine_yield();
1003
1004     /* Set by block_job_enter before re-entering the coroutine.  */
1005     assert(job->busy);
1006 }
1007
1008 void coroutine_fn block_job_pause_point(BlockJob *job)
1009 {
1010     assert(job && block_job_started(job));
1011
1012     if (!block_job_should_pause(job)) {
1013         return;
1014     }
1015     if (block_job_is_cancelled(job)) {
1016         return;
1017     }
1018
1019     if (job->driver->pause) {
1020         job->driver->pause(job);
1021     }
1022
1023     if (block_job_should_pause(job) && !block_job_is_cancelled(job)) {
1024         JobStatus status = job->job.status;
1025         job_state_transition(&job->job, status == JOB_STATUS_READY
1026                                         ? JOB_STATUS_STANDBY
1027                                         : JOB_STATUS_PAUSED);
1028         job->paused = true;
1029         block_job_do_yield(job, -1);
1030         job->paused = false;
1031         job_state_transition(&job->job, status);
1032     }
1033
1034     if (job->driver->resume) {
1035         job->driver->resume(job);
1036     }
1037 }
1038
1039 /*
1040  * Conditionally enter a block_job pending a call to fn() while
1041  * under the block_job_lock critical section.
1042  */
1043 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job))
1044 {
1045     if (!block_job_started(job)) {
1046         return;
1047     }
1048     if (job->deferred_to_main_loop) {
1049         return;
1050     }
1051
1052     block_job_lock();
1053     if (job->busy) {
1054         block_job_unlock();
1055         return;
1056     }
1057
1058     if (fn && !fn(job)) {
1059         block_job_unlock();
1060         return;
1061     }
1062
1063     assert(!job->deferred_to_main_loop);
1064     timer_del(&job->sleep_timer);
1065     job->busy = true;
1066     block_job_unlock();
1067     aio_co_wake(job->co);
1068 }
1069
1070 void block_job_enter(BlockJob *job)
1071 {
1072     block_job_enter_cond(job, NULL);
1073 }
1074
1075 bool block_job_is_cancelled(BlockJob *job)
1076 {
1077     return job->cancelled;
1078 }
1079
1080 void block_job_sleep_ns(BlockJob *job, int64_t ns)
1081 {
1082     assert(job->busy);
1083
1084     /* Check cancellation *before* setting busy = false, too!  */
1085     if (block_job_is_cancelled(job)) {
1086         return;
1087     }
1088
1089     if (!block_job_should_pause(job)) {
1090         block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
1091     }
1092
1093     block_job_pause_point(job);
1094 }
1095
1096 void block_job_yield(BlockJob *job)
1097 {
1098     assert(job->busy);
1099
1100     /* Check cancellation *before* setting busy = false, too!  */
1101     if (block_job_is_cancelled(job)) {
1102         return;
1103     }
1104
1105     if (!block_job_should_pause(job)) {
1106         block_job_do_yield(job, -1);
1107     }
1108
1109     block_job_pause_point(job);
1110 }
1111
1112 void block_job_iostatus_reset(BlockJob *job)
1113 {
1114     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
1115         return;
1116     }
1117     assert(job->user_paused && job->pause_count > 0);
1118     job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
1119 }
1120
1121 void block_job_event_ready(BlockJob *job)
1122 {
1123     job_state_transition(&job->job, JOB_STATUS_READY);
1124     job->ready = true;
1125
1126     if (block_job_is_internal(job)) {
1127         return;
1128     }
1129
1130     qapi_event_send_block_job_ready(job_type(&job->job),
1131                                     job->job.id,
1132                                     job->len,
1133                                     job->offset,
1134                                     job->speed, &error_abort);
1135 }
1136
1137 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
1138                                         int is_read, int error)
1139 {
1140     BlockErrorAction action;
1141
1142     switch (on_err) {
1143     case BLOCKDEV_ON_ERROR_ENOSPC:
1144     case BLOCKDEV_ON_ERROR_AUTO:
1145         action = (error == ENOSPC) ?
1146                  BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
1147         break;
1148     case BLOCKDEV_ON_ERROR_STOP:
1149         action = BLOCK_ERROR_ACTION_STOP;
1150         break;
1151     case BLOCKDEV_ON_ERROR_REPORT:
1152         action = BLOCK_ERROR_ACTION_REPORT;
1153         break;
1154     case BLOCKDEV_ON_ERROR_IGNORE:
1155         action = BLOCK_ERROR_ACTION_IGNORE;
1156         break;
1157     default:
1158         abort();
1159     }
1160     if (!block_job_is_internal(job)) {
1161         qapi_event_send_block_job_error(job->job.id,
1162                                         is_read ? IO_OPERATION_TYPE_READ :
1163                                         IO_OPERATION_TYPE_WRITE,
1164                                         action, &error_abort);
1165     }
1166     if (action == BLOCK_ERROR_ACTION_STOP) {
1167         block_job_pause(job);
1168         /* make the pause user visible, which will be resumed from QMP. */
1169         job->user_paused = true;
1170         block_job_iostatus_set_err(job, error);
1171     }
1172     return action;
1173 }
1174
1175 typedef struct {
1176     BlockJob *job;
1177     AioContext *aio_context;
1178     BlockJobDeferToMainLoopFn *fn;
1179     void *opaque;
1180 } BlockJobDeferToMainLoopData;
1181
1182 static void block_job_defer_to_main_loop_bh(void *opaque)
1183 {
1184     BlockJobDeferToMainLoopData *data = opaque;
1185     AioContext *aio_context;
1186
1187     /* Prevent race with block_job_defer_to_main_loop() */
1188     aio_context_acquire(data->aio_context);
1189
1190     /* Fetch BDS AioContext again, in case it has changed */
1191     aio_context = blk_get_aio_context(data->job->blk);
1192     if (aio_context != data->aio_context) {
1193         aio_context_acquire(aio_context);
1194     }
1195
1196     data->fn(data->job, data->opaque);
1197
1198     if (aio_context != data->aio_context) {
1199         aio_context_release(aio_context);
1200     }
1201
1202     aio_context_release(data->aio_context);
1203
1204     g_free(data);
1205 }
1206
1207 void block_job_defer_to_main_loop(BlockJob *job,
1208                                   BlockJobDeferToMainLoopFn *fn,
1209                                   void *opaque)
1210 {
1211     BlockJobDeferToMainLoopData *data = g_malloc(sizeof(*data));
1212     data->job = job;
1213     data->aio_context = blk_get_aio_context(job->blk);
1214     data->fn = fn;
1215     data->opaque = opaque;
1216     job->deferred_to_main_loop = true;
1217
1218     aio_bh_schedule_oneshot(qemu_get_aio_context(),
1219                             block_job_defer_to_main_loop_bh, data);
1220 }
This page took 0.0864279999999999 seconds and 4 git commands to generate.