xref: /openbmc/qemu/blockjob.c (revision dd873966)
1 /*
2  * QEMU System Emulator block driver
3  *
4  * Copyright (c) 2011 IBM Corp.
5  * Copyright (c) 2012 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 #include "qemu-common.h"
28 #include "block/block.h"
29 #include "block/blockjob_int.h"
30 #include "block/block_int.h"
31 #include "sysemu/block-backend.h"
32 #include "qapi/qmp/qerror.h"
33 #include "qapi/qmp/qjson.h"
34 #include "qemu/coroutine.h"
35 #include "qemu/id.h"
36 #include "qmp-commands.h"
37 #include "qemu/timer.h"
38 #include "qapi-event.h"
39 
40 /* Right now, this mutex is only needed to synchronize accesses to job->busy
41  * and job->sleep_timer, such as concurrent calls to block_job_do_yield and
42  * block_job_enter. */
43 static QemuMutex block_job_mutex;
44 
45 static void block_job_lock(void)
46 {
47     qemu_mutex_lock(&block_job_mutex);
48 }
49 
50 static void block_job_unlock(void)
51 {
52     qemu_mutex_unlock(&block_job_mutex);
53 }
54 
55 static void __attribute__((__constructor__)) block_job_init(void)
56 {
57     qemu_mutex_init(&block_job_mutex);
58 }
59 
60 static void block_job_event_cancelled(BlockJob *job);
61 static void block_job_event_completed(BlockJob *job, const char *msg);
62 
63 /* Transactional group of block jobs */
64 struct BlockJobTxn {
65 
66     /* Is this txn being cancelled? */
67     bool aborting;
68 
69     /* List of jobs */
70     QLIST_HEAD(, BlockJob) jobs;
71 
72     /* Reference count */
73     int refcnt;
74 };
75 
76 static QLIST_HEAD(, BlockJob) block_jobs = QLIST_HEAD_INITIALIZER(block_jobs);
77 
78 /*
79  * The block job API is composed of two categories of functions.
80  *
81  * The first includes functions used by the monitor.  The monitor is
82  * peculiar in that it accesses the block job list with block_job_get, and
83  * therefore needs consistency across block_job_get and the actual operation
84  * (e.g. block_job_set_speed).  The consistency is achieved with
85  * aio_context_acquire/release.  These functions are declared in blockjob.h.
86  *
87  * The second includes functions used by the block job drivers and sometimes
88  * by the core block layer.  These do not care about locking, because the
89  * whole coroutine runs under the AioContext lock, and are declared in
90  * blockjob_int.h.
91  */
92 
93 BlockJob *block_job_next(BlockJob *job)
94 {
95     if (!job) {
96         return QLIST_FIRST(&block_jobs);
97     }
98     return QLIST_NEXT(job, job_list);
99 }
100 
101 BlockJob *block_job_get(const char *id)
102 {
103     BlockJob *job;
104 
105     QLIST_FOREACH(job, &block_jobs, job_list) {
106         if (job->id && !strcmp(id, job->id)) {
107             return job;
108         }
109     }
110 
111     return NULL;
112 }
113 
114 BlockJobTxn *block_job_txn_new(void)
115 {
116     BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
117     QLIST_INIT(&txn->jobs);
118     txn->refcnt = 1;
119     return txn;
120 }
121 
122 static void block_job_txn_ref(BlockJobTxn *txn)
123 {
124     txn->refcnt++;
125 }
126 
127 void block_job_txn_unref(BlockJobTxn *txn)
128 {
129     if (txn && --txn->refcnt == 0) {
130         g_free(txn);
131     }
132 }
133 
134 void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
135 {
136     if (!txn) {
137         return;
138     }
139 
140     assert(!job->txn);
141     job->txn = txn;
142 
143     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
144     block_job_txn_ref(txn);
145 }
146 
147 static void block_job_pause(BlockJob *job)
148 {
149     job->pause_count++;
150 }
151 
152 static void block_job_resume(BlockJob *job)
153 {
154     assert(job->pause_count > 0);
155     job->pause_count--;
156     if (job->pause_count) {
157         return;
158     }
159     block_job_enter(job);
160 }
161 
162 void block_job_ref(BlockJob *job)
163 {
164     ++job->refcnt;
165 }
166 
167 static void block_job_attached_aio_context(AioContext *new_context,
168                                            void *opaque);
169 static void block_job_detach_aio_context(void *opaque);
170 
171 void block_job_unref(BlockJob *job)
172 {
173     if (--job->refcnt == 0) {
174         BlockDriverState *bs = blk_bs(job->blk);
175         QLIST_REMOVE(job, job_list);
176         bs->job = NULL;
177         block_job_remove_all_bdrv(job);
178         blk_remove_aio_context_notifier(job->blk,
179                                         block_job_attached_aio_context,
180                                         block_job_detach_aio_context, job);
181         blk_unref(job->blk);
182         error_free(job->blocker);
183         g_free(job->id);
184         assert(!timer_pending(&job->sleep_timer));
185         g_free(job);
186     }
187 }
188 
189 static void block_job_attached_aio_context(AioContext *new_context,
190                                            void *opaque)
191 {
192     BlockJob *job = opaque;
193 
194     if (job->driver->attached_aio_context) {
195         job->driver->attached_aio_context(job, new_context);
196     }
197 
198     block_job_resume(job);
199 }
200 
201 static void block_job_drain(BlockJob *job)
202 {
203     /* If job is !job->busy this kicks it into the next pause point. */
204     block_job_enter(job);
205 
206     blk_drain(job->blk);
207     if (job->driver->drain) {
208         job->driver->drain(job);
209     }
210 }
211 
212 static void block_job_detach_aio_context(void *opaque)
213 {
214     BlockJob *job = opaque;
215 
216     /* In case the job terminates during aio_poll()... */
217     block_job_ref(job);
218 
219     block_job_pause(job);
220 
221     while (!job->paused && !job->completed) {
222         block_job_drain(job);
223     }
224 
225     block_job_unref(job);
226 }
227 
228 static char *child_job_get_parent_desc(BdrvChild *c)
229 {
230     BlockJob *job = c->opaque;
231     return g_strdup_printf("%s job '%s'",
232                            BlockJobType_str(job->driver->job_type),
233                            job->id);
234 }
235 
236 static const BdrvChildRole child_job = {
237     .get_parent_desc    = child_job_get_parent_desc,
238     .stay_at_node       = true,
239 };
240 
241 static void block_job_drained_begin(void *opaque)
242 {
243     BlockJob *job = opaque;
244     block_job_pause(job);
245 }
246 
247 static void block_job_drained_end(void *opaque)
248 {
249     BlockJob *job = opaque;
250     block_job_resume(job);
251 }
252 
253 static const BlockDevOps block_job_dev_ops = {
254     .drained_begin = block_job_drained_begin,
255     .drained_end = block_job_drained_end,
256 };
257 
258 void block_job_remove_all_bdrv(BlockJob *job)
259 {
260     GSList *l;
261     for (l = job->nodes; l; l = l->next) {
262         BdrvChild *c = l->data;
263         bdrv_op_unblock_all(c->bs, job->blocker);
264         bdrv_root_unref_child(c);
265     }
266     g_slist_free(job->nodes);
267     job->nodes = NULL;
268 }
269 
270 int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
271                        uint64_t perm, uint64_t shared_perm, Error **errp)
272 {
273     BdrvChild *c;
274 
275     c = bdrv_root_attach_child(bs, name, &child_job, perm, shared_perm,
276                                job, errp);
277     if (c == NULL) {
278         return -EPERM;
279     }
280 
281     job->nodes = g_slist_prepend(job->nodes, c);
282     bdrv_ref(bs);
283     bdrv_op_block_all(bs, job->blocker);
284 
285     return 0;
286 }
287 
288 bool block_job_is_internal(BlockJob *job)
289 {
290     return (job->id == NULL);
291 }
292 
293 static bool block_job_started(BlockJob *job)
294 {
295     return job->co;
296 }
297 
298 /**
299  * All jobs must allow a pause point before entering their job proper. This
300  * ensures that jobs can be paused prior to being started, then resumed later.
301  */
302 static void coroutine_fn block_job_co_entry(void *opaque)
303 {
304     BlockJob *job = opaque;
305 
306     assert(job && job->driver && job->driver->start);
307     block_job_pause_point(job);
308     job->driver->start(job);
309 }
310 
311 static void block_job_sleep_timer_cb(void *opaque)
312 {
313     BlockJob *job = opaque;
314 
315     block_job_enter(job);
316 }
317 
318 void block_job_start(BlockJob *job)
319 {
320     assert(job && !block_job_started(job) && job->paused &&
321            job->driver && job->driver->start);
322     job->co = qemu_coroutine_create(block_job_co_entry, job);
323     job->pause_count--;
324     job->busy = true;
325     job->paused = false;
326     bdrv_coroutine_enter(blk_bs(job->blk), job->co);
327 }
328 
329 static void block_job_completed_single(BlockJob *job)
330 {
331     assert(job->completed);
332 
333     if (!job->ret) {
334         if (job->driver->commit) {
335             job->driver->commit(job);
336         }
337     } else {
338         if (job->driver->abort) {
339             job->driver->abort(job);
340         }
341     }
342     if (job->driver->clean) {
343         job->driver->clean(job);
344     }
345 
346     if (job->cb) {
347         job->cb(job->opaque, job->ret);
348     }
349 
350     /* Emit events only if we actually started */
351     if (block_job_started(job)) {
352         if (block_job_is_cancelled(job)) {
353             block_job_event_cancelled(job);
354         } else {
355             const char *msg = NULL;
356             if (job->ret < 0) {
357                 msg = strerror(-job->ret);
358             }
359             block_job_event_completed(job, msg);
360         }
361     }
362 
363     if (job->txn) {
364         QLIST_REMOVE(job, txn_list);
365         block_job_txn_unref(job->txn);
366     }
367     block_job_unref(job);
368 }
369 
370 static void block_job_cancel_async(BlockJob *job)
371 {
372     if (job->iostatus != BLOCK_DEVICE_IO_STATUS_OK) {
373         block_job_iostatus_reset(job);
374     }
375     if (job->user_paused) {
376         /* Do not call block_job_enter here, the caller will handle it.  */
377         job->user_paused = false;
378         job->pause_count--;
379     }
380     job->cancelled = true;
381 }
382 
383 static int block_job_finish_sync(BlockJob *job,
384                                  void (*finish)(BlockJob *, Error **errp),
385                                  Error **errp)
386 {
387     Error *local_err = NULL;
388     int ret;
389 
390     assert(blk_bs(job->blk)->job == job);
391 
392     block_job_ref(job);
393 
394     if (finish) {
395         finish(job, &local_err);
396     }
397     if (local_err) {
398         error_propagate(errp, local_err);
399         block_job_unref(job);
400         return -EBUSY;
401     }
402     /* block_job_drain calls block_job_enter, and it should be enough to
403      * induce progress until the job completes or moves to the main thread.
404     */
405     while (!job->deferred_to_main_loop && !job->completed) {
406         block_job_drain(job);
407     }
408     while (!job->completed) {
409         aio_poll(qemu_get_aio_context(), true);
410     }
411     ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
412     block_job_unref(job);
413     return ret;
414 }
415 
416 static void block_job_completed_txn_abort(BlockJob *job)
417 {
418     AioContext *ctx;
419     BlockJobTxn *txn = job->txn;
420     BlockJob *other_job;
421 
422     if (txn->aborting) {
423         /*
424          * We are cancelled by another job, which will handle everything.
425          */
426         return;
427     }
428     txn->aborting = true;
429     block_job_txn_ref(txn);
430 
431     /* We are the first failed job. Cancel other jobs. */
432     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
433         ctx = blk_get_aio_context(other_job->blk);
434         aio_context_acquire(ctx);
435     }
436 
437     /* Other jobs are effectively cancelled by us, set the status for
438      * them; this job, however, may or may not be cancelled, depending
439      * on the caller, so leave it. */
440     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
441         if (other_job != job) {
442             block_job_cancel_async(other_job);
443         }
444     }
445     while (!QLIST_EMPTY(&txn->jobs)) {
446         other_job = QLIST_FIRST(&txn->jobs);
447         ctx = blk_get_aio_context(other_job->blk);
448         if (!other_job->completed) {
449             assert(other_job->cancelled);
450             block_job_finish_sync(other_job, NULL, NULL);
451         }
452         block_job_completed_single(other_job);
453         aio_context_release(ctx);
454     }
455 
456     block_job_txn_unref(txn);
457 }
458 
459 static void block_job_completed_txn_success(BlockJob *job)
460 {
461     AioContext *ctx;
462     BlockJobTxn *txn = job->txn;
463     BlockJob *other_job, *next;
464     /*
465      * Successful completion, see if there are other running jobs in this
466      * txn.
467      */
468     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
469         if (!other_job->completed) {
470             return;
471         }
472     }
473     /* We are the last completed job, commit the transaction. */
474     QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
475         ctx = blk_get_aio_context(other_job->blk);
476         aio_context_acquire(ctx);
477         assert(other_job->ret == 0);
478         block_job_completed_single(other_job);
479         aio_context_release(ctx);
480     }
481 }
482 
483 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
484 {
485     Error *local_err = NULL;
486 
487     if (!job->driver->set_speed) {
488         error_setg(errp, QERR_UNSUPPORTED);
489         return;
490     }
491     job->driver->set_speed(job, speed, &local_err);
492     if (local_err) {
493         error_propagate(errp, local_err);
494         return;
495     }
496 
497     job->speed = speed;
498 }
499 
500 void block_job_complete(BlockJob *job, Error **errp)
501 {
502     /* Should not be reachable via external interface for internal jobs */
503     assert(job->id);
504     if (job->pause_count || job->cancelled ||
505         !block_job_started(job) || !job->driver->complete) {
506         error_setg(errp, "The active block job '%s' cannot be completed",
507                    job->id);
508         return;
509     }
510 
511     job->driver->complete(job, errp);
512 }
513 
514 void block_job_user_pause(BlockJob *job)
515 {
516     job->user_paused = true;
517     block_job_pause(job);
518 }
519 
520 bool block_job_user_paused(BlockJob *job)
521 {
522     return job->user_paused;
523 }
524 
525 void block_job_user_resume(BlockJob *job)
526 {
527     if (job && job->user_paused && job->pause_count > 0) {
528         block_job_iostatus_reset(job);
529         job->user_paused = false;
530         block_job_resume(job);
531     }
532 }
533 
534 void block_job_cancel(BlockJob *job)
535 {
536     if (block_job_started(job)) {
537         block_job_cancel_async(job);
538         block_job_enter(job);
539     } else {
540         block_job_completed(job, -ECANCELED);
541     }
542 }
543 
544 /* A wrapper around block_job_cancel() taking an Error ** parameter so it may be
545  * used with block_job_finish_sync() without the need for (rather nasty)
546  * function pointer casts there. */
547 static void block_job_cancel_err(BlockJob *job, Error **errp)
548 {
549     block_job_cancel(job);
550 }
551 
552 int block_job_cancel_sync(BlockJob *job)
553 {
554     return block_job_finish_sync(job, &block_job_cancel_err, NULL);
555 }
556 
557 void block_job_cancel_sync_all(void)
558 {
559     BlockJob *job;
560     AioContext *aio_context;
561 
562     while ((job = QLIST_FIRST(&block_jobs))) {
563         aio_context = blk_get_aio_context(job->blk);
564         aio_context_acquire(aio_context);
565         block_job_cancel_sync(job);
566         aio_context_release(aio_context);
567     }
568 }
569 
570 int block_job_complete_sync(BlockJob *job, Error **errp)
571 {
572     return block_job_finish_sync(job, &block_job_complete, errp);
573 }
574 
575 BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
576 {
577     BlockJobInfo *info;
578 
579     if (block_job_is_internal(job)) {
580         error_setg(errp, "Cannot query QEMU internal jobs");
581         return NULL;
582     }
583     info = g_new0(BlockJobInfo, 1);
584     info->type      = g_strdup(BlockJobType_str(job->driver->job_type));
585     info->device    = g_strdup(job->id);
586     info->len       = job->len;
587     info->busy      = atomic_read(&job->busy);
588     info->paused    = job->pause_count > 0;
589     info->offset    = job->offset;
590     info->speed     = job->speed;
591     info->io_status = job->iostatus;
592     info->ready     = job->ready;
593     return info;
594 }
595 
596 static void block_job_iostatus_set_err(BlockJob *job, int error)
597 {
598     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
599         job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
600                                           BLOCK_DEVICE_IO_STATUS_FAILED;
601     }
602 }
603 
604 static void block_job_event_cancelled(BlockJob *job)
605 {
606     if (block_job_is_internal(job)) {
607         return;
608     }
609 
610     qapi_event_send_block_job_cancelled(job->driver->job_type,
611                                         job->id,
612                                         job->len,
613                                         job->offset,
614                                         job->speed,
615                                         &error_abort);
616 }
617 
618 static void block_job_event_completed(BlockJob *job, const char *msg)
619 {
620     if (block_job_is_internal(job)) {
621         return;
622     }
623 
624     qapi_event_send_block_job_completed(job->driver->job_type,
625                                         job->id,
626                                         job->len,
627                                         job->offset,
628                                         job->speed,
629                                         !!msg,
630                                         msg,
631                                         &error_abort);
632 }
633 
634 /*
635  * API for block job drivers and the block layer.  These functions are
636  * declared in blockjob_int.h.
637  */
638 
639 void *block_job_create(const char *job_id, const BlockJobDriver *driver,
640                        BlockDriverState *bs, uint64_t perm,
641                        uint64_t shared_perm, int64_t speed, int flags,
642                        BlockCompletionFunc *cb, void *opaque, Error **errp)
643 {
644     BlockBackend *blk;
645     BlockJob *job;
646     int ret;
647 
648     if (bs->job) {
649         error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs));
650         return NULL;
651     }
652 
653     if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) {
654         job_id = bdrv_get_device_name(bs);
655         if (!*job_id) {
656             error_setg(errp, "An explicit job ID is required for this node");
657             return NULL;
658         }
659     }
660 
661     if (job_id) {
662         if (flags & BLOCK_JOB_INTERNAL) {
663             error_setg(errp, "Cannot specify job ID for internal block job");
664             return NULL;
665         }
666 
667         if (!id_wellformed(job_id)) {
668             error_setg(errp, "Invalid job ID '%s'", job_id);
669             return NULL;
670         }
671 
672         if (block_job_get(job_id)) {
673             error_setg(errp, "Job ID '%s' already in use", job_id);
674             return NULL;
675         }
676     }
677 
678     blk = blk_new(perm, shared_perm);
679     ret = blk_insert_bs(blk, bs, errp);
680     if (ret < 0) {
681         blk_unref(blk);
682         return NULL;
683     }
684 
685     job = g_malloc0(driver->instance_size);
686     job->driver        = driver;
687     job->id            = g_strdup(job_id);
688     job->blk           = blk;
689     job->cb            = cb;
690     job->opaque        = opaque;
691     job->busy          = false;
692     job->paused        = true;
693     job->pause_count   = 1;
694     job->refcnt        = 1;
695     aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
696                    QEMU_CLOCK_REALTIME, SCALE_NS,
697                    block_job_sleep_timer_cb, job);
698 
699     error_setg(&job->blocker, "block device is in use by block job: %s",
700                BlockJobType_str(driver->job_type));
701     block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort);
702     bs->job = job;
703 
704     blk_set_dev_ops(blk, &block_job_dev_ops, job);
705     bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
706 
707     QLIST_INSERT_HEAD(&block_jobs, job, job_list);
708 
709     blk_add_aio_context_notifier(blk, block_job_attached_aio_context,
710                                  block_job_detach_aio_context, job);
711 
712     /* Only set speed when necessary to avoid NotSupported error */
713     if (speed != 0) {
714         Error *local_err = NULL;
715 
716         block_job_set_speed(job, speed, &local_err);
717         if (local_err) {
718             block_job_unref(job);
719             error_propagate(errp, local_err);
720             return NULL;
721         }
722     }
723     return job;
724 }
725 
726 void block_job_pause_all(void)
727 {
728     BlockJob *job = NULL;
729     while ((job = block_job_next(job))) {
730         AioContext *aio_context = blk_get_aio_context(job->blk);
731 
732         aio_context_acquire(aio_context);
733         block_job_ref(job);
734         block_job_pause(job);
735         aio_context_release(aio_context);
736     }
737 }
738 
739 void block_job_early_fail(BlockJob *job)
740 {
741     block_job_unref(job);
742 }
743 
744 void block_job_completed(BlockJob *job, int ret)
745 {
746     assert(blk_bs(job->blk)->job == job);
747     assert(!job->completed);
748     job->completed = true;
749     job->ret = ret;
750     if (!job->txn) {
751         block_job_completed_single(job);
752     } else if (ret < 0 || block_job_is_cancelled(job)) {
753         block_job_completed_txn_abort(job);
754     } else {
755         block_job_completed_txn_success(job);
756     }
757 }
758 
759 static bool block_job_should_pause(BlockJob *job)
760 {
761     return job->pause_count > 0;
762 }
763 
764 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
765  * Reentering the job coroutine with block_job_enter() before the timer has
766  * expired is allowed and cancels the timer.
767  *
768  * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
769  * called explicitly. */
770 static void block_job_do_yield(BlockJob *job, uint64_t ns)
771 {
772     block_job_lock();
773     if (ns != -1) {
774         timer_mod(&job->sleep_timer, ns);
775     }
776     job->busy = false;
777     block_job_unlock();
778     qemu_coroutine_yield();
779 
780     /* Set by block_job_enter before re-entering the coroutine.  */
781     assert(job->busy);
782 }
783 
784 void coroutine_fn block_job_pause_point(BlockJob *job)
785 {
786     assert(job && block_job_started(job));
787 
788     if (!block_job_should_pause(job)) {
789         return;
790     }
791     if (block_job_is_cancelled(job)) {
792         return;
793     }
794 
795     if (job->driver->pause) {
796         job->driver->pause(job);
797     }
798 
799     if (block_job_should_pause(job) && !block_job_is_cancelled(job)) {
800         job->paused = true;
801         block_job_do_yield(job, -1);
802         job->paused = false;
803     }
804 
805     if (job->driver->resume) {
806         job->driver->resume(job);
807     }
808 }
809 
810 void block_job_resume_all(void)
811 {
812     BlockJob *job, *next;
813 
814     QLIST_FOREACH_SAFE(job, &block_jobs, job_list, next) {
815         AioContext *aio_context = blk_get_aio_context(job->blk);
816 
817         aio_context_acquire(aio_context);
818         block_job_resume(job);
819         block_job_unref(job);
820         aio_context_release(aio_context);
821     }
822 }
823 
824 void block_job_enter(BlockJob *job)
825 {
826     if (!block_job_started(job)) {
827         return;
828     }
829     if (job->deferred_to_main_loop) {
830         return;
831     }
832 
833     block_job_lock();
834     if (job->busy) {
835         block_job_unlock();
836         return;
837     }
838 
839     assert(!job->deferred_to_main_loop);
840     timer_del(&job->sleep_timer);
841     job->busy = true;
842     block_job_unlock();
843     aio_co_wake(job->co);
844 }
845 
846 bool block_job_is_cancelled(BlockJob *job)
847 {
848     return job->cancelled;
849 }
850 
851 void block_job_sleep_ns(BlockJob *job, int64_t ns)
852 {
853     assert(job->busy);
854 
855     /* Check cancellation *before* setting busy = false, too!  */
856     if (block_job_is_cancelled(job)) {
857         return;
858     }
859 
860     if (!block_job_should_pause(job)) {
861         block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
862     }
863 
864     block_job_pause_point(job);
865 }
866 
867 void block_job_yield(BlockJob *job)
868 {
869     assert(job->busy);
870 
871     /* Check cancellation *before* setting busy = false, too!  */
872     if (block_job_is_cancelled(job)) {
873         return;
874     }
875 
876     if (!block_job_should_pause(job)) {
877         block_job_do_yield(job, -1);
878     }
879 
880     block_job_pause_point(job);
881 }
882 
883 void block_job_iostatus_reset(BlockJob *job)
884 {
885     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
886         return;
887     }
888     assert(job->user_paused && job->pause_count > 0);
889     job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
890 }
891 
892 void block_job_event_ready(BlockJob *job)
893 {
894     job->ready = true;
895 
896     if (block_job_is_internal(job)) {
897         return;
898     }
899 
900     qapi_event_send_block_job_ready(job->driver->job_type,
901                                     job->id,
902                                     job->len,
903                                     job->offset,
904                                     job->speed, &error_abort);
905 }
906 
907 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
908                                         int is_read, int error)
909 {
910     BlockErrorAction action;
911 
912     switch (on_err) {
913     case BLOCKDEV_ON_ERROR_ENOSPC:
914     case BLOCKDEV_ON_ERROR_AUTO:
915         action = (error == ENOSPC) ?
916                  BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
917         break;
918     case BLOCKDEV_ON_ERROR_STOP:
919         action = BLOCK_ERROR_ACTION_STOP;
920         break;
921     case BLOCKDEV_ON_ERROR_REPORT:
922         action = BLOCK_ERROR_ACTION_REPORT;
923         break;
924     case BLOCKDEV_ON_ERROR_IGNORE:
925         action = BLOCK_ERROR_ACTION_IGNORE;
926         break;
927     default:
928         abort();
929     }
930     if (!block_job_is_internal(job)) {
931         qapi_event_send_block_job_error(job->id,
932                                         is_read ? IO_OPERATION_TYPE_READ :
933                                         IO_OPERATION_TYPE_WRITE,
934                                         action, &error_abort);
935     }
936     if (action == BLOCK_ERROR_ACTION_STOP) {
937         /* make the pause user visible, which will be resumed from QMP. */
938         block_job_user_pause(job);
939         block_job_iostatus_set_err(job, error);
940     }
941     return action;
942 }
943 
944 typedef struct {
945     BlockJob *job;
946     AioContext *aio_context;
947     BlockJobDeferToMainLoopFn *fn;
948     void *opaque;
949 } BlockJobDeferToMainLoopData;
950 
951 static void block_job_defer_to_main_loop_bh(void *opaque)
952 {
953     BlockJobDeferToMainLoopData *data = opaque;
954     AioContext *aio_context;
955 
956     /* Prevent race with block_job_defer_to_main_loop() */
957     aio_context_acquire(data->aio_context);
958 
959     /* Fetch BDS AioContext again, in case it has changed */
960     aio_context = blk_get_aio_context(data->job->blk);
961     if (aio_context != data->aio_context) {
962         aio_context_acquire(aio_context);
963     }
964 
965     data->fn(data->job, data->opaque);
966 
967     if (aio_context != data->aio_context) {
968         aio_context_release(aio_context);
969     }
970 
971     aio_context_release(data->aio_context);
972 
973     g_free(data);
974 }
975 
976 void block_job_defer_to_main_loop(BlockJob *job,
977                                   BlockJobDeferToMainLoopFn *fn,
978                                   void *opaque)
979 {
980     BlockJobDeferToMainLoopData *data = g_malloc(sizeof(*data));
981     data->job = job;
982     data->aio_context = blk_get_aio_context(job->blk);
983     data->fn = fn;
984     data->opaque = opaque;
985     job->deferred_to_main_loop = true;
986 
987     aio_bh_schedule_oneshot(qemu_get_aio_context(),
988                             block_job_defer_to_main_loop_bh, data);
989 }
990