xref: /openbmc/qemu/blockjob.c (revision 0dacec87)
1 /*
2  * QEMU System Emulator block driver
3  *
4  * Copyright (c) 2011 IBM Corp.
5  * Copyright (c) 2012 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 #include "qemu-common.h"
28 #include "block/block.h"
29 #include "block/blockjob_int.h"
30 #include "block/block_int.h"
31 #include "block/trace.h"
32 #include "sysemu/block-backend.h"
33 #include "qapi/error.h"
34 #include "qapi/qapi-events-block-core.h"
35 #include "qapi/qmp/qerror.h"
36 #include "qemu/coroutine.h"
37 #include "qemu/id.h"
38 #include "qemu/timer.h"
39 
40 /* Right now, this mutex is only needed to synchronize accesses to job->busy
41  * and job->sleep_timer, such as concurrent calls to block_job_do_yield and
42  * block_job_enter. */
43 static QemuMutex block_job_mutex;
44 
45 /* BlockJob State Transition Table */
46 bool BlockJobSTT[BLOCK_JOB_STATUS__MAX][BLOCK_JOB_STATUS__MAX] = {
47                                           /* U, C, R, P, Y, S, W, D, X, E, N */
48     /* U: */ [BLOCK_JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
49     /* C: */ [BLOCK_JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
50     /* R: */ [BLOCK_JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
51     /* P: */ [BLOCK_JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
52     /* Y: */ [BLOCK_JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
53     /* S: */ [BLOCK_JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
54     /* W: */ [BLOCK_JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
55     /* D: */ [BLOCK_JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
56     /* X: */ [BLOCK_JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
57     /* E: */ [BLOCK_JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
58     /* N: */ [BLOCK_JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
59 };
60 
61 bool BlockJobVerbTable[BLOCK_JOB_VERB__MAX][BLOCK_JOB_STATUS__MAX] = {
62                                           /* U, C, R, P, Y, S, W, D, X, E, N */
63     [BLOCK_JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
64     [BLOCK_JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
65     [BLOCK_JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
66     [BLOCK_JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
67     [BLOCK_JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
68     [BLOCK_JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
69     [BLOCK_JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
70 };
71 
72 static void block_job_state_transition(BlockJob *job, BlockJobStatus s1)
73 {
74     BlockJobStatus s0 = job->status;
75     assert(s1 >= 0 && s1 <= BLOCK_JOB_STATUS__MAX);
76     trace_block_job_state_transition(job, job->ret, BlockJobSTT[s0][s1] ?
77                                      "allowed" : "disallowed",
78                                      qapi_enum_lookup(&BlockJobStatus_lookup,
79                                                       s0),
80                                      qapi_enum_lookup(&BlockJobStatus_lookup,
81                                                       s1));
82     assert(BlockJobSTT[s0][s1]);
83     job->status = s1;
84 }
85 
86 static int block_job_apply_verb(BlockJob *job, BlockJobVerb bv, Error **errp)
87 {
88     assert(bv >= 0 && bv <= BLOCK_JOB_VERB__MAX);
89     trace_block_job_apply_verb(job, qapi_enum_lookup(&BlockJobStatus_lookup,
90                                                      job->status),
91                                qapi_enum_lookup(&BlockJobVerb_lookup, bv),
92                                BlockJobVerbTable[bv][job->status] ?
93                                "allowed" : "prohibited");
94     if (BlockJobVerbTable[bv][job->status]) {
95         return 0;
96     }
97     error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
98                job->id, qapi_enum_lookup(&BlockJobStatus_lookup, job->status),
99                qapi_enum_lookup(&BlockJobVerb_lookup, bv));
100     return -EPERM;
101 }
102 
103 static void block_job_lock(void)
104 {
105     qemu_mutex_lock(&block_job_mutex);
106 }
107 
108 static void block_job_unlock(void)
109 {
110     qemu_mutex_unlock(&block_job_mutex);
111 }
112 
113 static void __attribute__((__constructor__)) block_job_init(void)
114 {
115     qemu_mutex_init(&block_job_mutex);
116 }
117 
118 static void block_job_event_cancelled(BlockJob *job);
119 static void block_job_event_completed(BlockJob *job, const char *msg);
120 static int block_job_event_pending(BlockJob *job);
121 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job));
122 
123 /* Transactional group of block jobs */
124 struct BlockJobTxn {
125 
126     /* Is this txn being cancelled? */
127     bool aborting;
128 
129     /* List of jobs */
130     QLIST_HEAD(, BlockJob) jobs;
131 
132     /* Reference count */
133     int refcnt;
134 };
135 
136 static QLIST_HEAD(, BlockJob) block_jobs = QLIST_HEAD_INITIALIZER(block_jobs);
137 
138 /*
139  * The block job API is composed of two categories of functions.
140  *
141  * The first includes functions used by the monitor.  The monitor is
142  * peculiar in that it accesses the block job list with block_job_get, and
143  * therefore needs consistency across block_job_get and the actual operation
144  * (e.g. block_job_set_speed).  The consistency is achieved with
145  * aio_context_acquire/release.  These functions are declared in blockjob.h.
146  *
147  * The second includes functions used by the block job drivers and sometimes
148  * by the core block layer.  These do not care about locking, because the
149  * whole coroutine runs under the AioContext lock, and are declared in
150  * blockjob_int.h.
151  */
152 
153 BlockJob *block_job_next(BlockJob *job)
154 {
155     if (!job) {
156         return QLIST_FIRST(&block_jobs);
157     }
158     return QLIST_NEXT(job, job_list);
159 }
160 
161 BlockJob *block_job_get(const char *id)
162 {
163     BlockJob *job;
164 
165     QLIST_FOREACH(job, &block_jobs, job_list) {
166         if (job->id && !strcmp(id, job->id)) {
167             return job;
168         }
169     }
170 
171     return NULL;
172 }
173 
174 BlockJobTxn *block_job_txn_new(void)
175 {
176     BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
177     QLIST_INIT(&txn->jobs);
178     txn->refcnt = 1;
179     return txn;
180 }
181 
182 static void block_job_txn_ref(BlockJobTxn *txn)
183 {
184     txn->refcnt++;
185 }
186 
187 void block_job_txn_unref(BlockJobTxn *txn)
188 {
189     if (txn && --txn->refcnt == 0) {
190         g_free(txn);
191     }
192 }
193 
194 void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
195 {
196     if (!txn) {
197         return;
198     }
199 
200     assert(!job->txn);
201     job->txn = txn;
202 
203     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
204     block_job_txn_ref(txn);
205 }
206 
207 static void block_job_pause(BlockJob *job)
208 {
209     job->pause_count++;
210 }
211 
212 static void block_job_resume(BlockJob *job)
213 {
214     assert(job->pause_count > 0);
215     job->pause_count--;
216     if (job->pause_count) {
217         return;
218     }
219     block_job_enter(job);
220 }
221 
222 void block_job_ref(BlockJob *job)
223 {
224     ++job->refcnt;
225 }
226 
227 static void block_job_attached_aio_context(AioContext *new_context,
228                                            void *opaque);
229 static void block_job_detach_aio_context(void *opaque);
230 
231 void block_job_unref(BlockJob *job)
232 {
233     if (--job->refcnt == 0) {
234         assert(job->status == BLOCK_JOB_STATUS_NULL);
235         BlockDriverState *bs = blk_bs(job->blk);
236         QLIST_REMOVE(job, job_list);
237         bs->job = NULL;
238         block_job_remove_all_bdrv(job);
239         blk_remove_aio_context_notifier(job->blk,
240                                         block_job_attached_aio_context,
241                                         block_job_detach_aio_context, job);
242         blk_unref(job->blk);
243         error_free(job->blocker);
244         g_free(job->id);
245         assert(!timer_pending(&job->sleep_timer));
246         g_free(job);
247     }
248 }
249 
250 static void block_job_attached_aio_context(AioContext *new_context,
251                                            void *opaque)
252 {
253     BlockJob *job = opaque;
254 
255     if (job->driver->attached_aio_context) {
256         job->driver->attached_aio_context(job, new_context);
257     }
258 
259     block_job_resume(job);
260 }
261 
262 static void block_job_drain(BlockJob *job)
263 {
264     /* If job is !job->busy this kicks it into the next pause point. */
265     block_job_enter(job);
266 
267     blk_drain(job->blk);
268     if (job->driver->drain) {
269         job->driver->drain(job);
270     }
271 }
272 
273 static void block_job_detach_aio_context(void *opaque)
274 {
275     BlockJob *job = opaque;
276 
277     /* In case the job terminates during aio_poll()... */
278     block_job_ref(job);
279 
280     block_job_pause(job);
281 
282     while (!job->paused && !job->completed) {
283         block_job_drain(job);
284     }
285 
286     block_job_unref(job);
287 }
288 
289 static char *child_job_get_parent_desc(BdrvChild *c)
290 {
291     BlockJob *job = c->opaque;
292     return g_strdup_printf("%s job '%s'",
293                            BlockJobType_str(job->driver->job_type),
294                            job->id);
295 }
296 
297 static void child_job_drained_begin(BdrvChild *c)
298 {
299     BlockJob *job = c->opaque;
300     block_job_pause(job);
301 }
302 
303 static void child_job_drained_end(BdrvChild *c)
304 {
305     BlockJob *job = c->opaque;
306     block_job_resume(job);
307 }
308 
309 static const BdrvChildRole child_job = {
310     .get_parent_desc    = child_job_get_parent_desc,
311     .drained_begin      = child_job_drained_begin,
312     .drained_end        = child_job_drained_end,
313     .stay_at_node       = true,
314 };
315 
316 void block_job_remove_all_bdrv(BlockJob *job)
317 {
318     GSList *l;
319     for (l = job->nodes; l; l = l->next) {
320         BdrvChild *c = l->data;
321         bdrv_op_unblock_all(c->bs, job->blocker);
322         bdrv_root_unref_child(c);
323     }
324     g_slist_free(job->nodes);
325     job->nodes = NULL;
326 }
327 
328 int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
329                        uint64_t perm, uint64_t shared_perm, Error **errp)
330 {
331     BdrvChild *c;
332 
333     c = bdrv_root_attach_child(bs, name, &child_job, perm, shared_perm,
334                                job, errp);
335     if (c == NULL) {
336         return -EPERM;
337     }
338 
339     job->nodes = g_slist_prepend(job->nodes, c);
340     bdrv_ref(bs);
341     bdrv_op_block_all(bs, job->blocker);
342 
343     return 0;
344 }
345 
346 bool block_job_is_internal(BlockJob *job)
347 {
348     return (job->id == NULL);
349 }
350 
351 static bool block_job_started(BlockJob *job)
352 {
353     return job->co;
354 }
355 
356 /**
357  * All jobs must allow a pause point before entering their job proper. This
358  * ensures that jobs can be paused prior to being started, then resumed later.
359  */
360 static void coroutine_fn block_job_co_entry(void *opaque)
361 {
362     BlockJob *job = opaque;
363 
364     assert(job && job->driver && job->driver->start);
365     block_job_pause_point(job);
366     job->driver->start(job);
367 }
368 
369 static void block_job_sleep_timer_cb(void *opaque)
370 {
371     BlockJob *job = opaque;
372 
373     block_job_enter(job);
374 }
375 
376 void block_job_start(BlockJob *job)
377 {
378     assert(job && !block_job_started(job) && job->paused &&
379            job->driver && job->driver->start);
380     job->co = qemu_coroutine_create(block_job_co_entry, job);
381     job->pause_count--;
382     job->busy = true;
383     job->paused = false;
384     block_job_state_transition(job, BLOCK_JOB_STATUS_RUNNING);
385     bdrv_coroutine_enter(blk_bs(job->blk), job->co);
386 }
387 
388 static void block_job_decommission(BlockJob *job)
389 {
390     assert(job);
391     job->completed = true;
392     job->busy = false;
393     job->paused = false;
394     job->deferred_to_main_loop = true;
395     block_job_state_transition(job, BLOCK_JOB_STATUS_NULL);
396     block_job_unref(job);
397 }
398 
399 static void block_job_do_dismiss(BlockJob *job)
400 {
401     block_job_decommission(job);
402 }
403 
404 static void block_job_conclude(BlockJob *job)
405 {
406     block_job_state_transition(job, BLOCK_JOB_STATUS_CONCLUDED);
407     if (job->auto_dismiss || !block_job_started(job)) {
408         block_job_do_dismiss(job);
409     }
410 }
411 
412 static void block_job_update_rc(BlockJob *job)
413 {
414     if (!job->ret && block_job_is_cancelled(job)) {
415         job->ret = -ECANCELED;
416     }
417     if (job->ret) {
418         block_job_state_transition(job, BLOCK_JOB_STATUS_ABORTING);
419     }
420 }
421 
422 static int block_job_prepare(BlockJob *job)
423 {
424     if (job->ret == 0 && job->driver->prepare) {
425         job->ret = job->driver->prepare(job);
426     }
427     return job->ret;
428 }
429 
430 static void block_job_commit(BlockJob *job)
431 {
432     assert(!job->ret);
433     if (job->driver->commit) {
434         job->driver->commit(job);
435     }
436 }
437 
438 static void block_job_abort(BlockJob *job)
439 {
440     assert(job->ret);
441     if (job->driver->abort) {
442         job->driver->abort(job);
443     }
444 }
445 
446 static void block_job_clean(BlockJob *job)
447 {
448     if (job->driver->clean) {
449         job->driver->clean(job);
450     }
451 }
452 
453 static int block_job_finalize_single(BlockJob *job)
454 {
455     assert(job->completed);
456 
457     /* Ensure abort is called for late-transactional failures */
458     block_job_update_rc(job);
459 
460     if (!job->ret) {
461         block_job_commit(job);
462     } else {
463         block_job_abort(job);
464     }
465     block_job_clean(job);
466 
467     if (job->cb) {
468         job->cb(job->opaque, job->ret);
469     }
470 
471     /* Emit events only if we actually started */
472     if (block_job_started(job)) {
473         if (block_job_is_cancelled(job)) {
474             block_job_event_cancelled(job);
475         } else {
476             const char *msg = NULL;
477             if (job->ret < 0) {
478                 msg = strerror(-job->ret);
479             }
480             block_job_event_completed(job, msg);
481         }
482     }
483 
484     QLIST_REMOVE(job, txn_list);
485     block_job_txn_unref(job->txn);
486     block_job_conclude(job);
487     return 0;
488 }
489 
490 static void block_job_cancel_async(BlockJob *job, bool force)
491 {
492     if (job->iostatus != BLOCK_DEVICE_IO_STATUS_OK) {
493         block_job_iostatus_reset(job);
494     }
495     if (job->user_paused) {
496         /* Do not call block_job_enter here, the caller will handle it.  */
497         job->user_paused = false;
498         job->pause_count--;
499     }
500     job->cancelled = true;
501     /* To prevent 'force == false' overriding a previous 'force == true' */
502     job->force |= force;
503 }
504 
505 static int block_job_txn_apply(BlockJobTxn *txn, int fn(BlockJob *), bool lock)
506 {
507     AioContext *ctx;
508     BlockJob *job, *next;
509     int rc = 0;
510 
511     QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
512         if (lock) {
513             ctx = blk_get_aio_context(job->blk);
514             aio_context_acquire(ctx);
515         }
516         rc = fn(job);
517         if (lock) {
518             aio_context_release(ctx);
519         }
520         if (rc) {
521             break;
522         }
523     }
524     return rc;
525 }
526 
527 static int block_job_finish_sync(BlockJob *job,
528                                  void (*finish)(BlockJob *, Error **errp),
529                                  Error **errp)
530 {
531     Error *local_err = NULL;
532     int ret;
533 
534     assert(blk_bs(job->blk)->job == job);
535 
536     block_job_ref(job);
537 
538     if (finish) {
539         finish(job, &local_err);
540     }
541     if (local_err) {
542         error_propagate(errp, local_err);
543         block_job_unref(job);
544         return -EBUSY;
545     }
546     /* block_job_drain calls block_job_enter, and it should be enough to
547      * induce progress until the job completes or moves to the main thread.
548     */
549     while (!job->deferred_to_main_loop && !job->completed) {
550         block_job_drain(job);
551     }
552     while (!job->completed) {
553         aio_poll(qemu_get_aio_context(), true);
554     }
555     ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
556     block_job_unref(job);
557     return ret;
558 }
559 
560 static void block_job_completed_txn_abort(BlockJob *job)
561 {
562     AioContext *ctx;
563     BlockJobTxn *txn = job->txn;
564     BlockJob *other_job;
565 
566     if (txn->aborting) {
567         /*
568          * We are cancelled by another job, which will handle everything.
569          */
570         return;
571     }
572     txn->aborting = true;
573     block_job_txn_ref(txn);
574 
575     /* We are the first failed job. Cancel other jobs. */
576     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
577         ctx = blk_get_aio_context(other_job->blk);
578         aio_context_acquire(ctx);
579     }
580 
581     /* Other jobs are effectively cancelled by us, set the status for
582      * them; this job, however, may or may not be cancelled, depending
583      * on the caller, so leave it. */
584     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
585         if (other_job != job) {
586             block_job_cancel_async(other_job, false);
587         }
588     }
589     while (!QLIST_EMPTY(&txn->jobs)) {
590         other_job = QLIST_FIRST(&txn->jobs);
591         ctx = blk_get_aio_context(other_job->blk);
592         if (!other_job->completed) {
593             assert(other_job->cancelled);
594             block_job_finish_sync(other_job, NULL, NULL);
595         }
596         block_job_finalize_single(other_job);
597         aio_context_release(ctx);
598     }
599 
600     block_job_txn_unref(txn);
601 }
602 
603 static int block_job_needs_finalize(BlockJob *job)
604 {
605     return !job->auto_finalize;
606 }
607 
608 static void block_job_do_finalize(BlockJob *job)
609 {
610     int rc;
611     assert(job && job->txn);
612 
613     /* prepare the transaction to complete */
614     rc = block_job_txn_apply(job->txn, block_job_prepare, true);
615     if (rc) {
616         block_job_completed_txn_abort(job);
617     } else {
618         block_job_txn_apply(job->txn, block_job_finalize_single, true);
619     }
620 }
621 
622 static void block_job_completed_txn_success(BlockJob *job)
623 {
624     BlockJobTxn *txn = job->txn;
625     BlockJob *other_job;
626 
627     block_job_state_transition(job, BLOCK_JOB_STATUS_WAITING);
628 
629     /*
630      * Successful completion, see if there are other running jobs in this
631      * txn.
632      */
633     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
634         if (!other_job->completed) {
635             return;
636         }
637         assert(other_job->ret == 0);
638     }
639 
640     block_job_txn_apply(txn, block_job_event_pending, false);
641 
642     /* If no jobs need manual finalization, automatically do so */
643     if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) {
644         block_job_do_finalize(job);
645     }
646 }
647 
648 /* Assumes the block_job_mutex is held */
649 static bool block_job_timer_pending(BlockJob *job)
650 {
651     return timer_pending(&job->sleep_timer);
652 }
653 
654 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
655 {
656     Error *local_err = NULL;
657     int64_t old_speed = job->speed;
658 
659     if (!job->driver->set_speed) {
660         error_setg(errp, QERR_UNSUPPORTED);
661         return;
662     }
663     if (block_job_apply_verb(job, BLOCK_JOB_VERB_SET_SPEED, errp)) {
664         return;
665     }
666     job->driver->set_speed(job, speed, &local_err);
667     if (local_err) {
668         error_propagate(errp, local_err);
669         return;
670     }
671 
672     job->speed = speed;
673     if (speed && speed <= old_speed) {
674         return;
675     }
676 
677     /* kick only if a timer is pending */
678     block_job_enter_cond(job, block_job_timer_pending);
679 }
680 
681 void block_job_complete(BlockJob *job, Error **errp)
682 {
683     /* Should not be reachable via external interface for internal jobs */
684     assert(job->id);
685     if (block_job_apply_verb(job, BLOCK_JOB_VERB_COMPLETE, errp)) {
686         return;
687     }
688     if (job->pause_count || job->cancelled || !job->driver->complete) {
689         error_setg(errp, "The active block job '%s' cannot be completed",
690                    job->id);
691         return;
692     }
693 
694     job->driver->complete(job, errp);
695 }
696 
697 void block_job_finalize(BlockJob *job, Error **errp)
698 {
699     assert(job && job->id && job->txn);
700     if (block_job_apply_verb(job, BLOCK_JOB_VERB_FINALIZE, errp)) {
701         return;
702     }
703     block_job_do_finalize(job);
704 }
705 
706 void block_job_dismiss(BlockJob **jobptr, Error **errp)
707 {
708     BlockJob *job = *jobptr;
709     /* similarly to _complete, this is QMP-interface only. */
710     assert(job->id);
711     if (block_job_apply_verb(job, BLOCK_JOB_VERB_DISMISS, errp)) {
712         return;
713     }
714 
715     block_job_do_dismiss(job);
716     *jobptr = NULL;
717 }
718 
719 void block_job_user_pause(BlockJob *job, Error **errp)
720 {
721     if (block_job_apply_verb(job, BLOCK_JOB_VERB_PAUSE, errp)) {
722         return;
723     }
724     if (job->user_paused) {
725         error_setg(errp, "Job is already paused");
726         return;
727     }
728     job->user_paused = true;
729     block_job_pause(job);
730 }
731 
732 bool block_job_user_paused(BlockJob *job)
733 {
734     return job->user_paused;
735 }
736 
737 void block_job_user_resume(BlockJob *job, Error **errp)
738 {
739     assert(job);
740     if (!job->user_paused || job->pause_count <= 0) {
741         error_setg(errp, "Can't resume a job that was not paused");
742         return;
743     }
744     if (block_job_apply_verb(job, BLOCK_JOB_VERB_RESUME, errp)) {
745         return;
746     }
747     block_job_iostatus_reset(job);
748     job->user_paused = false;
749     block_job_resume(job);
750 }
751 
752 void block_job_cancel(BlockJob *job, bool force)
753 {
754     if (job->status == BLOCK_JOB_STATUS_CONCLUDED) {
755         block_job_do_dismiss(job);
756         return;
757     }
758     block_job_cancel_async(job, force);
759     if (!block_job_started(job)) {
760         block_job_completed(job, -ECANCELED);
761     } else if (job->deferred_to_main_loop) {
762         block_job_completed_txn_abort(job);
763     } else {
764         block_job_enter(job);
765     }
766 }
767 
768 void block_job_user_cancel(BlockJob *job, bool force, Error **errp)
769 {
770     if (block_job_apply_verb(job, BLOCK_JOB_VERB_CANCEL, errp)) {
771         return;
772     }
773     block_job_cancel(job, force);
774 }
775 
776 /* A wrapper around block_job_cancel() taking an Error ** parameter so it may be
777  * used with block_job_finish_sync() without the need for (rather nasty)
778  * function pointer casts there. */
779 static void block_job_cancel_err(BlockJob *job, Error **errp)
780 {
781     block_job_cancel(job, false);
782 }
783 
784 int block_job_cancel_sync(BlockJob *job)
785 {
786     return block_job_finish_sync(job, &block_job_cancel_err, NULL);
787 }
788 
789 void block_job_cancel_sync_all(void)
790 {
791     BlockJob *job;
792     AioContext *aio_context;
793 
794     while ((job = QLIST_FIRST(&block_jobs))) {
795         aio_context = blk_get_aio_context(job->blk);
796         aio_context_acquire(aio_context);
797         block_job_cancel_sync(job);
798         aio_context_release(aio_context);
799     }
800 }
801 
802 int block_job_complete_sync(BlockJob *job, Error **errp)
803 {
804     return block_job_finish_sync(job, &block_job_complete, errp);
805 }
806 
807 BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
808 {
809     BlockJobInfo *info;
810 
811     if (block_job_is_internal(job)) {
812         error_setg(errp, "Cannot query QEMU internal jobs");
813         return NULL;
814     }
815     info = g_new0(BlockJobInfo, 1);
816     info->type      = g_strdup(BlockJobType_str(job->driver->job_type));
817     info->device    = g_strdup(job->id);
818     info->len       = job->len;
819     info->busy      = atomic_read(&job->busy);
820     info->paused    = job->pause_count > 0;
821     info->offset    = job->offset;
822     info->speed     = job->speed;
823     info->io_status = job->iostatus;
824     info->ready     = job->ready;
825     info->status    = job->status;
826     info->auto_finalize = job->auto_finalize;
827     info->auto_dismiss  = job->auto_dismiss;
828     return info;
829 }
830 
831 static void block_job_iostatus_set_err(BlockJob *job, int error)
832 {
833     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
834         job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
835                                           BLOCK_DEVICE_IO_STATUS_FAILED;
836     }
837 }
838 
839 static void block_job_event_cancelled(BlockJob *job)
840 {
841     if (block_job_is_internal(job)) {
842         return;
843     }
844 
845     qapi_event_send_block_job_cancelled(job->driver->job_type,
846                                         job->id,
847                                         job->len,
848                                         job->offset,
849                                         job->speed,
850                                         &error_abort);
851 }
852 
853 static void block_job_event_completed(BlockJob *job, const char *msg)
854 {
855     if (block_job_is_internal(job)) {
856         return;
857     }
858 
859     qapi_event_send_block_job_completed(job->driver->job_type,
860                                         job->id,
861                                         job->len,
862                                         job->offset,
863                                         job->speed,
864                                         !!msg,
865                                         msg,
866                                         &error_abort);
867 }
868 
869 static int block_job_event_pending(BlockJob *job)
870 {
871     block_job_state_transition(job, BLOCK_JOB_STATUS_PENDING);
872     if (!job->auto_finalize && !block_job_is_internal(job)) {
873         qapi_event_send_block_job_pending(job->driver->job_type,
874                                           job->id,
875                                           &error_abort);
876     }
877     return 0;
878 }
879 
880 /*
881  * API for block job drivers and the block layer.  These functions are
882  * declared in blockjob_int.h.
883  */
884 
885 void *block_job_create(const char *job_id, const BlockJobDriver *driver,
886                        BlockJobTxn *txn, BlockDriverState *bs, uint64_t perm,
887                        uint64_t shared_perm, int64_t speed, int flags,
888                        BlockCompletionFunc *cb, void *opaque, Error **errp)
889 {
890     BlockBackend *blk;
891     BlockJob *job;
892     int ret;
893 
894     if (bs->job) {
895         error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs));
896         return NULL;
897     }
898 
899     if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) {
900         job_id = bdrv_get_device_name(bs);
901         if (!*job_id) {
902             error_setg(errp, "An explicit job ID is required for this node");
903             return NULL;
904         }
905     }
906 
907     if (job_id) {
908         if (flags & BLOCK_JOB_INTERNAL) {
909             error_setg(errp, "Cannot specify job ID for internal block job");
910             return NULL;
911         }
912 
913         if (!id_wellformed(job_id)) {
914             error_setg(errp, "Invalid job ID '%s'", job_id);
915             return NULL;
916         }
917 
918         if (block_job_get(job_id)) {
919             error_setg(errp, "Job ID '%s' already in use", job_id);
920             return NULL;
921         }
922     }
923 
924     blk = blk_new(perm, shared_perm);
925     ret = blk_insert_bs(blk, bs, errp);
926     if (ret < 0) {
927         blk_unref(blk);
928         return NULL;
929     }
930 
931     job = g_malloc0(driver->instance_size);
932     job->driver        = driver;
933     job->id            = g_strdup(job_id);
934     job->blk           = blk;
935     job->cb            = cb;
936     job->opaque        = opaque;
937     job->busy          = false;
938     job->paused        = true;
939     job->pause_count   = 1;
940     job->refcnt        = 1;
941     job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE);
942     job->auto_dismiss  = !(flags & BLOCK_JOB_MANUAL_DISMISS);
943     block_job_state_transition(job, BLOCK_JOB_STATUS_CREATED);
944     aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
945                    QEMU_CLOCK_REALTIME, SCALE_NS,
946                    block_job_sleep_timer_cb, job);
947 
948     error_setg(&job->blocker, "block device is in use by block job: %s",
949                BlockJobType_str(driver->job_type));
950     block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort);
951     bs->job = job;
952 
953     bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
954 
955     QLIST_INSERT_HEAD(&block_jobs, job, job_list);
956 
957     blk_add_aio_context_notifier(blk, block_job_attached_aio_context,
958                                  block_job_detach_aio_context, job);
959 
960     /* Only set speed when necessary to avoid NotSupported error */
961     if (speed != 0) {
962         Error *local_err = NULL;
963 
964         block_job_set_speed(job, speed, &local_err);
965         if (local_err) {
966             block_job_early_fail(job);
967             error_propagate(errp, local_err);
968             return NULL;
969         }
970     }
971 
972     /* Single jobs are modeled as single-job transactions for sake of
973      * consolidating the job management logic */
974     if (!txn) {
975         txn = block_job_txn_new();
976         block_job_txn_add_job(txn, job);
977         block_job_txn_unref(txn);
978     } else {
979         block_job_txn_add_job(txn, job);
980     }
981 
982     return job;
983 }
984 
985 void block_job_pause_all(void)
986 {
987     BlockJob *job = NULL;
988     while ((job = block_job_next(job))) {
989         AioContext *aio_context = blk_get_aio_context(job->blk);
990 
991         aio_context_acquire(aio_context);
992         block_job_ref(job);
993         block_job_pause(job);
994         aio_context_release(aio_context);
995     }
996 }
997 
998 void block_job_early_fail(BlockJob *job)
999 {
1000     assert(job->status == BLOCK_JOB_STATUS_CREATED);
1001     block_job_decommission(job);
1002 }
1003 
1004 void block_job_completed(BlockJob *job, int ret)
1005 {
1006     assert(job && job->txn && !job->completed);
1007     assert(blk_bs(job->blk)->job == job);
1008     job->completed = true;
1009     job->ret = ret;
1010     block_job_update_rc(job);
1011     trace_block_job_completed(job, ret, job->ret);
1012     if (job->ret) {
1013         block_job_completed_txn_abort(job);
1014     } else {
1015         block_job_completed_txn_success(job);
1016     }
1017 }
1018 
1019 static bool block_job_should_pause(BlockJob *job)
1020 {
1021     return job->pause_count > 0;
1022 }
1023 
1024 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
1025  * Reentering the job coroutine with block_job_enter() before the timer has
1026  * expired is allowed and cancels the timer.
1027  *
1028  * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
1029  * called explicitly. */
1030 static void block_job_do_yield(BlockJob *job, uint64_t ns)
1031 {
1032     block_job_lock();
1033     if (ns != -1) {
1034         timer_mod(&job->sleep_timer, ns);
1035     }
1036     job->busy = false;
1037     block_job_unlock();
1038     qemu_coroutine_yield();
1039 
1040     /* Set by block_job_enter before re-entering the coroutine.  */
1041     assert(job->busy);
1042 }
1043 
1044 void coroutine_fn block_job_pause_point(BlockJob *job)
1045 {
1046     assert(job && block_job_started(job));
1047 
1048     if (!block_job_should_pause(job)) {
1049         return;
1050     }
1051     if (block_job_is_cancelled(job)) {
1052         return;
1053     }
1054 
1055     if (job->driver->pause) {
1056         job->driver->pause(job);
1057     }
1058 
1059     if (block_job_should_pause(job) && !block_job_is_cancelled(job)) {
1060         BlockJobStatus status = job->status;
1061         block_job_state_transition(job, status == BLOCK_JOB_STATUS_READY ? \
1062                                    BLOCK_JOB_STATUS_STANDBY :           \
1063                                    BLOCK_JOB_STATUS_PAUSED);
1064         job->paused = true;
1065         block_job_do_yield(job, -1);
1066         job->paused = false;
1067         block_job_state_transition(job, status);
1068     }
1069 
1070     if (job->driver->resume) {
1071         job->driver->resume(job);
1072     }
1073 }
1074 
1075 void block_job_resume_all(void)
1076 {
1077     BlockJob *job, *next;
1078 
1079     QLIST_FOREACH_SAFE(job, &block_jobs, job_list, next) {
1080         AioContext *aio_context = blk_get_aio_context(job->blk);
1081 
1082         aio_context_acquire(aio_context);
1083         block_job_resume(job);
1084         block_job_unref(job);
1085         aio_context_release(aio_context);
1086     }
1087 }
1088 
1089 /*
1090  * Conditionally enter a block_job pending a call to fn() while
1091  * under the block_job_lock critical section.
1092  */
1093 static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job))
1094 {
1095     if (!block_job_started(job)) {
1096         return;
1097     }
1098     if (job->deferred_to_main_loop) {
1099         return;
1100     }
1101 
1102     block_job_lock();
1103     if (job->busy) {
1104         block_job_unlock();
1105         return;
1106     }
1107 
1108     if (fn && !fn(job)) {
1109         block_job_unlock();
1110         return;
1111     }
1112 
1113     assert(!job->deferred_to_main_loop);
1114     timer_del(&job->sleep_timer);
1115     job->busy = true;
1116     block_job_unlock();
1117     aio_co_wake(job->co);
1118 }
1119 
1120 void block_job_enter(BlockJob *job)
1121 {
1122     block_job_enter_cond(job, NULL);
1123 }
1124 
1125 bool block_job_is_cancelled(BlockJob *job)
1126 {
1127     return job->cancelled;
1128 }
1129 
1130 void block_job_sleep_ns(BlockJob *job, int64_t ns)
1131 {
1132     assert(job->busy);
1133 
1134     /* Check cancellation *before* setting busy = false, too!  */
1135     if (block_job_is_cancelled(job)) {
1136         return;
1137     }
1138 
1139     if (!block_job_should_pause(job)) {
1140         block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
1141     }
1142 
1143     block_job_pause_point(job);
1144 }
1145 
1146 void block_job_yield(BlockJob *job)
1147 {
1148     assert(job->busy);
1149 
1150     /* Check cancellation *before* setting busy = false, too!  */
1151     if (block_job_is_cancelled(job)) {
1152         return;
1153     }
1154 
1155     if (!block_job_should_pause(job)) {
1156         block_job_do_yield(job, -1);
1157     }
1158 
1159     block_job_pause_point(job);
1160 }
1161 
1162 void block_job_iostatus_reset(BlockJob *job)
1163 {
1164     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
1165         return;
1166     }
1167     assert(job->user_paused && job->pause_count > 0);
1168     job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
1169 }
1170 
1171 void block_job_event_ready(BlockJob *job)
1172 {
1173     block_job_state_transition(job, BLOCK_JOB_STATUS_READY);
1174     job->ready = true;
1175 
1176     if (block_job_is_internal(job)) {
1177         return;
1178     }
1179 
1180     qapi_event_send_block_job_ready(job->driver->job_type,
1181                                     job->id,
1182                                     job->len,
1183                                     job->offset,
1184                                     job->speed, &error_abort);
1185 }
1186 
1187 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
1188                                         int is_read, int error)
1189 {
1190     BlockErrorAction action;
1191 
1192     switch (on_err) {
1193     case BLOCKDEV_ON_ERROR_ENOSPC:
1194     case BLOCKDEV_ON_ERROR_AUTO:
1195         action = (error == ENOSPC) ?
1196                  BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
1197         break;
1198     case BLOCKDEV_ON_ERROR_STOP:
1199         action = BLOCK_ERROR_ACTION_STOP;
1200         break;
1201     case BLOCKDEV_ON_ERROR_REPORT:
1202         action = BLOCK_ERROR_ACTION_REPORT;
1203         break;
1204     case BLOCKDEV_ON_ERROR_IGNORE:
1205         action = BLOCK_ERROR_ACTION_IGNORE;
1206         break;
1207     default:
1208         abort();
1209     }
1210     if (!block_job_is_internal(job)) {
1211         qapi_event_send_block_job_error(job->id,
1212                                         is_read ? IO_OPERATION_TYPE_READ :
1213                                         IO_OPERATION_TYPE_WRITE,
1214                                         action, &error_abort);
1215     }
1216     if (action == BLOCK_ERROR_ACTION_STOP) {
1217         block_job_pause(job);
1218         /* make the pause user visible, which will be resumed from QMP. */
1219         job->user_paused = true;
1220         block_job_iostatus_set_err(job, error);
1221     }
1222     return action;
1223 }
1224 
1225 typedef struct {
1226     BlockJob *job;
1227     AioContext *aio_context;
1228     BlockJobDeferToMainLoopFn *fn;
1229     void *opaque;
1230 } BlockJobDeferToMainLoopData;
1231 
1232 static void block_job_defer_to_main_loop_bh(void *opaque)
1233 {
1234     BlockJobDeferToMainLoopData *data = opaque;
1235     AioContext *aio_context;
1236 
1237     /* Prevent race with block_job_defer_to_main_loop() */
1238     aio_context_acquire(data->aio_context);
1239 
1240     /* Fetch BDS AioContext again, in case it has changed */
1241     aio_context = blk_get_aio_context(data->job->blk);
1242     if (aio_context != data->aio_context) {
1243         aio_context_acquire(aio_context);
1244     }
1245 
1246     data->fn(data->job, data->opaque);
1247 
1248     if (aio_context != data->aio_context) {
1249         aio_context_release(aio_context);
1250     }
1251 
1252     aio_context_release(data->aio_context);
1253 
1254     g_free(data);
1255 }
1256 
1257 void block_job_defer_to_main_loop(BlockJob *job,
1258                                   BlockJobDeferToMainLoopFn *fn,
1259                                   void *opaque)
1260 {
1261     BlockJobDeferToMainLoopData *data = g_malloc(sizeof(*data));
1262     data->job = job;
1263     data->aio_context = blk_get_aio_context(job->blk);
1264     data->fn = fn;
1265     data->opaque = opaque;
1266     job->deferred_to_main_loop = true;
1267 
1268     aio_bh_schedule_oneshot(qemu_get_aio_context(),
1269                             block_job_defer_to_main_loop_bh, data);
1270 }
1271