xref: /openbmc/qemu/blockjob.c (revision 8779fccb)
1 /*
2  * QEMU System Emulator block driver
3  *
4  * Copyright (c) 2011 IBM Corp.
5  * Copyright (c) 2012 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 #include "qemu-common.h"
28 #include "block/block.h"
29 #include "block/blockjob_int.h"
30 #include "block/block_int.h"
31 #include "sysemu/block-backend.h"
32 #include "qapi/qmp/qerror.h"
33 #include "qapi/qmp/qjson.h"
34 #include "qemu/coroutine.h"
35 #include "qemu/id.h"
36 #include "qmp-commands.h"
37 #include "qemu/timer.h"
38 #include "qapi-event.h"
39 
40 static void block_job_event_cancelled(BlockJob *job);
41 static void block_job_event_completed(BlockJob *job, const char *msg);
42 
43 /* Transactional group of block jobs */
44 struct BlockJobTxn {
45 
46     /* Is this txn being cancelled? */
47     bool aborting;
48 
49     /* List of jobs */
50     QLIST_HEAD(, BlockJob) jobs;
51 
52     /* Reference count */
53     int refcnt;
54 };
55 
56 static QLIST_HEAD(, BlockJob) block_jobs = QLIST_HEAD_INITIALIZER(block_jobs);
57 
58 BlockJob *block_job_next(BlockJob *job)
59 {
60     if (!job) {
61         return QLIST_FIRST(&block_jobs);
62     }
63     return QLIST_NEXT(job, job_list);
64 }
65 
66 BlockJob *block_job_get(const char *id)
67 {
68     BlockJob *job;
69 
70     QLIST_FOREACH(job, &block_jobs, job_list) {
71         if (job->id && !strcmp(id, job->id)) {
72             return job;
73         }
74     }
75 
76     return NULL;
77 }
78 
79 static void block_job_attached_aio_context(AioContext *new_context,
80                                            void *opaque)
81 {
82     BlockJob *job = opaque;
83 
84     if (job->driver->attached_aio_context) {
85         job->driver->attached_aio_context(job, new_context);
86     }
87 
88     block_job_resume(job);
89 }
90 
91 static void block_job_drain(BlockJob *job)
92 {
93     /* If job is !job->busy this kicks it into the next pause point. */
94     block_job_enter(job);
95 
96     blk_drain(job->blk);
97     if (job->driver->drain) {
98         job->driver->drain(job);
99     }
100 }
101 
102 static void block_job_detach_aio_context(void *opaque)
103 {
104     BlockJob *job = opaque;
105 
106     /* In case the job terminates during aio_poll()... */
107     block_job_ref(job);
108 
109     block_job_pause(job);
110 
111     while (!job->paused && !job->completed) {
112         block_job_drain(job);
113     }
114 
115     block_job_unref(job);
116 }
117 
118 void block_job_add_bdrv(BlockJob *job, BlockDriverState *bs)
119 {
120     job->nodes = g_slist_prepend(job->nodes, bs);
121     bdrv_ref(bs);
122     bdrv_op_block_all(bs, job->blocker);
123 }
124 
125 void *block_job_create(const char *job_id, const BlockJobDriver *driver,
126                        BlockDriverState *bs, int64_t speed, int flags,
127                        BlockCompletionFunc *cb, void *opaque, Error **errp)
128 {
129     BlockBackend *blk;
130     BlockJob *job;
131 
132     if (bs->job) {
133         error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs));
134         return NULL;
135     }
136 
137     if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) {
138         job_id = bdrv_get_device_name(bs);
139         if (!*job_id) {
140             error_setg(errp, "An explicit job ID is required for this node");
141             return NULL;
142         }
143     }
144 
145     if (job_id) {
146         if (flags & BLOCK_JOB_INTERNAL) {
147             error_setg(errp, "Cannot specify job ID for internal block job");
148             return NULL;
149         }
150 
151         if (!id_wellformed(job_id)) {
152             error_setg(errp, "Invalid job ID '%s'", job_id);
153             return NULL;
154         }
155 
156         if (block_job_get(job_id)) {
157             error_setg(errp, "Job ID '%s' already in use", job_id);
158             return NULL;
159         }
160     }
161 
162     blk = blk_new();
163     blk_insert_bs(blk, bs);
164 
165     job = g_malloc0(driver->instance_size);
166     error_setg(&job->blocker, "block device is in use by block job: %s",
167                BlockJobType_lookup[driver->job_type]);
168     block_job_add_bdrv(job, bs);
169     bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
170 
171     job->driver        = driver;
172     job->id            = g_strdup(job_id);
173     job->blk           = blk;
174     job->cb            = cb;
175     job->opaque        = opaque;
176     job->busy          = false;
177     job->paused        = true;
178     job->pause_count   = 1;
179     job->refcnt        = 1;
180     bs->job = job;
181 
182     QLIST_INSERT_HEAD(&block_jobs, job, job_list);
183 
184     blk_add_aio_context_notifier(blk, block_job_attached_aio_context,
185                                  block_job_detach_aio_context, job);
186 
187     /* Only set speed when necessary to avoid NotSupported error */
188     if (speed != 0) {
189         Error *local_err = NULL;
190 
191         block_job_set_speed(job, speed, &local_err);
192         if (local_err) {
193             block_job_unref(job);
194             error_propagate(errp, local_err);
195             return NULL;
196         }
197     }
198     return job;
199 }
200 
201 bool block_job_is_internal(BlockJob *job)
202 {
203     return (job->id == NULL);
204 }
205 
206 static bool block_job_started(BlockJob *job)
207 {
208     return job->co;
209 }
210 
211 void block_job_start(BlockJob *job)
212 {
213     assert(job && !block_job_started(job) && job->paused &&
214            !job->busy && job->driver->start);
215     job->co = qemu_coroutine_create(job->driver->start, job);
216     if (--job->pause_count == 0) {
217         job->paused = false;
218         job->busy = true;
219         qemu_coroutine_enter(job->co);
220     }
221 }
222 
223 void block_job_ref(BlockJob *job)
224 {
225     ++job->refcnt;
226 }
227 
228 void block_job_unref(BlockJob *job)
229 {
230     if (--job->refcnt == 0) {
231         GSList *l;
232         BlockDriverState *bs = blk_bs(job->blk);
233         bs->job = NULL;
234         for (l = job->nodes; l; l = l->next) {
235             bs = l->data;
236             bdrv_op_unblock_all(bs, job->blocker);
237             bdrv_unref(bs);
238         }
239         g_slist_free(job->nodes);
240         blk_remove_aio_context_notifier(job->blk,
241                                         block_job_attached_aio_context,
242                                         block_job_detach_aio_context, job);
243         blk_unref(job->blk);
244         error_free(job->blocker);
245         g_free(job->id);
246         QLIST_REMOVE(job, job_list);
247         g_free(job);
248     }
249 }
250 
251 static void block_job_completed_single(BlockJob *job)
252 {
253     if (!job->ret) {
254         if (job->driver->commit) {
255             job->driver->commit(job);
256         }
257     } else {
258         if (job->driver->abort) {
259             job->driver->abort(job);
260         }
261     }
262     if (job->driver->clean) {
263         job->driver->clean(job);
264     }
265 
266     if (job->cb) {
267         job->cb(job->opaque, job->ret);
268     }
269 
270     /* Emit events only if we actually started */
271     if (block_job_started(job)) {
272         if (block_job_is_cancelled(job)) {
273             block_job_event_cancelled(job);
274         } else {
275             const char *msg = NULL;
276             if (job->ret < 0) {
277                 msg = strerror(-job->ret);
278             }
279             block_job_event_completed(job, msg);
280         }
281     }
282 
283     if (job->txn) {
284         QLIST_REMOVE(job, txn_list);
285         block_job_txn_unref(job->txn);
286     }
287     block_job_unref(job);
288 }
289 
290 static void block_job_completed_txn_abort(BlockJob *job)
291 {
292     AioContext *ctx;
293     BlockJobTxn *txn = job->txn;
294     BlockJob *other_job, *next;
295 
296     if (txn->aborting) {
297         /*
298          * We are cancelled by another job, which will handle everything.
299          */
300         return;
301     }
302     txn->aborting = true;
303     /* We are the first failed job. Cancel other jobs. */
304     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
305         ctx = blk_get_aio_context(other_job->blk);
306         aio_context_acquire(ctx);
307     }
308     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
309         if (other_job == job || other_job->completed) {
310             /* Other jobs are "effectively" cancelled by us, set the status for
311              * them; this job, however, may or may not be cancelled, depending
312              * on the caller, so leave it. */
313             if (other_job != job) {
314                 other_job->cancelled = true;
315             }
316             continue;
317         }
318         block_job_cancel_sync(other_job);
319         assert(other_job->completed);
320     }
321     QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
322         ctx = blk_get_aio_context(other_job->blk);
323         block_job_completed_single(other_job);
324         aio_context_release(ctx);
325     }
326 }
327 
328 static void block_job_completed_txn_success(BlockJob *job)
329 {
330     AioContext *ctx;
331     BlockJobTxn *txn = job->txn;
332     BlockJob *other_job, *next;
333     /*
334      * Successful completion, see if there are other running jobs in this
335      * txn.
336      */
337     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
338         if (!other_job->completed) {
339             return;
340         }
341     }
342     /* We are the last completed job, commit the transaction. */
343     QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
344         ctx = blk_get_aio_context(other_job->blk);
345         aio_context_acquire(ctx);
346         assert(other_job->ret == 0);
347         block_job_completed_single(other_job);
348         aio_context_release(ctx);
349     }
350 }
351 
352 void block_job_completed(BlockJob *job, int ret)
353 {
354     assert(blk_bs(job->blk)->job == job);
355     assert(!job->completed);
356     job->completed = true;
357     job->ret = ret;
358     if (!job->txn) {
359         block_job_completed_single(job);
360     } else if (ret < 0 || block_job_is_cancelled(job)) {
361         block_job_completed_txn_abort(job);
362     } else {
363         block_job_completed_txn_success(job);
364     }
365 }
366 
367 void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
368 {
369     Error *local_err = NULL;
370 
371     if (!job->driver->set_speed) {
372         error_setg(errp, QERR_UNSUPPORTED);
373         return;
374     }
375     job->driver->set_speed(job, speed, &local_err);
376     if (local_err) {
377         error_propagate(errp, local_err);
378         return;
379     }
380 
381     job->speed = speed;
382 }
383 
384 void block_job_complete(BlockJob *job, Error **errp)
385 {
386     /* Should not be reachable via external interface for internal jobs */
387     assert(job->id);
388     if (job->pause_count || job->cancelled ||
389         !block_job_started(job) || !job->driver->complete) {
390         error_setg(errp, "The active block job '%s' cannot be completed",
391                    job->id);
392         return;
393     }
394 
395     job->driver->complete(job, errp);
396 }
397 
398 void block_job_pause(BlockJob *job)
399 {
400     job->pause_count++;
401 }
402 
403 void block_job_user_pause(BlockJob *job)
404 {
405     job->user_paused = true;
406     block_job_pause(job);
407 }
408 
409 static bool block_job_should_pause(BlockJob *job)
410 {
411     return job->pause_count > 0;
412 }
413 
414 bool block_job_user_paused(BlockJob *job)
415 {
416     return job ? job->user_paused : 0;
417 }
418 
419 void coroutine_fn block_job_pause_point(BlockJob *job)
420 {
421     assert(job && block_job_started(job));
422 
423     if (!block_job_should_pause(job)) {
424         return;
425     }
426     if (block_job_is_cancelled(job)) {
427         return;
428     }
429 
430     if (job->driver->pause) {
431         job->driver->pause(job);
432     }
433 
434     if (block_job_should_pause(job) && !block_job_is_cancelled(job)) {
435         job->paused = true;
436         job->busy = false;
437         qemu_coroutine_yield(); /* wait for block_job_resume() */
438         job->busy = true;
439         job->paused = false;
440     }
441 
442     if (job->driver->resume) {
443         job->driver->resume(job);
444     }
445 }
446 
447 void block_job_resume(BlockJob *job)
448 {
449     assert(job->pause_count > 0);
450     job->pause_count--;
451     if (job->pause_count) {
452         return;
453     }
454     block_job_enter(job);
455 }
456 
457 void block_job_user_resume(BlockJob *job)
458 {
459     if (job && job->user_paused && job->pause_count > 0) {
460         job->user_paused = false;
461         block_job_resume(job);
462     }
463 }
464 
465 void block_job_enter(BlockJob *job)
466 {
467     if (job->co && !job->busy) {
468         qemu_coroutine_enter(job->co);
469     }
470 }
471 
472 void block_job_cancel(BlockJob *job)
473 {
474     if (block_job_started(job)) {
475         job->cancelled = true;
476         block_job_iostatus_reset(job);
477         block_job_enter(job);
478     } else {
479         block_job_completed(job, -ECANCELED);
480     }
481 }
482 
483 bool block_job_is_cancelled(BlockJob *job)
484 {
485     return job->cancelled;
486 }
487 
488 void block_job_iostatus_reset(BlockJob *job)
489 {
490     job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
491     if (job->driver->iostatus_reset) {
492         job->driver->iostatus_reset(job);
493     }
494 }
495 
496 static int block_job_finish_sync(BlockJob *job,
497                                  void (*finish)(BlockJob *, Error **errp),
498                                  Error **errp)
499 {
500     Error *local_err = NULL;
501     int ret;
502 
503     assert(blk_bs(job->blk)->job == job);
504 
505     block_job_ref(job);
506 
507     finish(job, &local_err);
508     if (local_err) {
509         error_propagate(errp, local_err);
510         block_job_unref(job);
511         return -EBUSY;
512     }
513     /* block_job_drain calls block_job_enter, and it should be enough to
514      * induce progress until the job completes or moves to the main thread.
515     */
516     while (!job->deferred_to_main_loop && !job->completed) {
517         block_job_drain(job);
518     }
519     while (!job->completed) {
520         aio_poll(qemu_get_aio_context(), true);
521     }
522     ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
523     block_job_unref(job);
524     return ret;
525 }
526 
527 /* A wrapper around block_job_cancel() taking an Error ** parameter so it may be
528  * used with block_job_finish_sync() without the need for (rather nasty)
529  * function pointer casts there. */
530 static void block_job_cancel_err(BlockJob *job, Error **errp)
531 {
532     block_job_cancel(job);
533 }
534 
535 int block_job_cancel_sync(BlockJob *job)
536 {
537     return block_job_finish_sync(job, &block_job_cancel_err, NULL);
538 }
539 
540 void block_job_cancel_sync_all(void)
541 {
542     BlockJob *job;
543     AioContext *aio_context;
544 
545     while ((job = QLIST_FIRST(&block_jobs))) {
546         aio_context = blk_get_aio_context(job->blk);
547         aio_context_acquire(aio_context);
548         block_job_cancel_sync(job);
549         aio_context_release(aio_context);
550     }
551 }
552 
553 int block_job_complete_sync(BlockJob *job, Error **errp)
554 {
555     return block_job_finish_sync(job, &block_job_complete, errp);
556 }
557 
558 void block_job_sleep_ns(BlockJob *job, QEMUClockType type, int64_t ns)
559 {
560     assert(job->busy);
561 
562     /* Check cancellation *before* setting busy = false, too!  */
563     if (block_job_is_cancelled(job)) {
564         return;
565     }
566 
567     job->busy = false;
568     if (!block_job_should_pause(job)) {
569         co_aio_sleep_ns(blk_get_aio_context(job->blk), type, ns);
570     }
571     job->busy = true;
572 
573     block_job_pause_point(job);
574 }
575 
576 void block_job_yield(BlockJob *job)
577 {
578     assert(job->busy);
579 
580     /* Check cancellation *before* setting busy = false, too!  */
581     if (block_job_is_cancelled(job)) {
582         return;
583     }
584 
585     job->busy = false;
586     if (!block_job_should_pause(job)) {
587         qemu_coroutine_yield();
588     }
589     job->busy = true;
590 
591     block_job_pause_point(job);
592 }
593 
594 BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
595 {
596     BlockJobInfo *info;
597 
598     if (block_job_is_internal(job)) {
599         error_setg(errp, "Cannot query QEMU internal jobs");
600         return NULL;
601     }
602     info = g_new0(BlockJobInfo, 1);
603     info->type      = g_strdup(BlockJobType_lookup[job->driver->job_type]);
604     info->device    = g_strdup(job->id);
605     info->len       = job->len;
606     info->busy      = job->busy;
607     info->paused    = job->pause_count > 0;
608     info->offset    = job->offset;
609     info->speed     = job->speed;
610     info->io_status = job->iostatus;
611     info->ready     = job->ready;
612     return info;
613 }
614 
615 static void block_job_iostatus_set_err(BlockJob *job, int error)
616 {
617     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
618         job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
619                                           BLOCK_DEVICE_IO_STATUS_FAILED;
620     }
621 }
622 
623 static void block_job_event_cancelled(BlockJob *job)
624 {
625     if (block_job_is_internal(job)) {
626         return;
627     }
628 
629     qapi_event_send_block_job_cancelled(job->driver->job_type,
630                                         job->id,
631                                         job->len,
632                                         job->offset,
633                                         job->speed,
634                                         &error_abort);
635 }
636 
637 static void block_job_event_completed(BlockJob *job, const char *msg)
638 {
639     if (block_job_is_internal(job)) {
640         return;
641     }
642 
643     qapi_event_send_block_job_completed(job->driver->job_type,
644                                         job->id,
645                                         job->len,
646                                         job->offset,
647                                         job->speed,
648                                         !!msg,
649                                         msg,
650                                         &error_abort);
651 }
652 
653 void block_job_event_ready(BlockJob *job)
654 {
655     job->ready = true;
656 
657     if (block_job_is_internal(job)) {
658         return;
659     }
660 
661     qapi_event_send_block_job_ready(job->driver->job_type,
662                                     job->id,
663                                     job->len,
664                                     job->offset,
665                                     job->speed, &error_abort);
666 }
667 
668 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
669                                         int is_read, int error)
670 {
671     BlockErrorAction action;
672 
673     switch (on_err) {
674     case BLOCKDEV_ON_ERROR_ENOSPC:
675     case BLOCKDEV_ON_ERROR_AUTO:
676         action = (error == ENOSPC) ?
677                  BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
678         break;
679     case BLOCKDEV_ON_ERROR_STOP:
680         action = BLOCK_ERROR_ACTION_STOP;
681         break;
682     case BLOCKDEV_ON_ERROR_REPORT:
683         action = BLOCK_ERROR_ACTION_REPORT;
684         break;
685     case BLOCKDEV_ON_ERROR_IGNORE:
686         action = BLOCK_ERROR_ACTION_IGNORE;
687         break;
688     default:
689         abort();
690     }
691     if (!block_job_is_internal(job)) {
692         qapi_event_send_block_job_error(job->id,
693                                         is_read ? IO_OPERATION_TYPE_READ :
694                                         IO_OPERATION_TYPE_WRITE,
695                                         action, &error_abort);
696     }
697     if (action == BLOCK_ERROR_ACTION_STOP) {
698         /* make the pause user visible, which will be resumed from QMP. */
699         block_job_user_pause(job);
700         block_job_iostatus_set_err(job, error);
701     }
702     return action;
703 }
704 
705 typedef struct {
706     BlockJob *job;
707     AioContext *aio_context;
708     BlockJobDeferToMainLoopFn *fn;
709     void *opaque;
710 } BlockJobDeferToMainLoopData;
711 
712 static void block_job_defer_to_main_loop_bh(void *opaque)
713 {
714     BlockJobDeferToMainLoopData *data = opaque;
715     AioContext *aio_context;
716 
717     /* Prevent race with block_job_defer_to_main_loop() */
718     aio_context_acquire(data->aio_context);
719 
720     /* Fetch BDS AioContext again, in case it has changed */
721     aio_context = blk_get_aio_context(data->job->blk);
722     aio_context_acquire(aio_context);
723 
724     data->job->deferred_to_main_loop = false;
725     data->fn(data->job, data->opaque);
726 
727     aio_context_release(aio_context);
728 
729     aio_context_release(data->aio_context);
730 
731     g_free(data);
732 }
733 
734 void block_job_defer_to_main_loop(BlockJob *job,
735                                   BlockJobDeferToMainLoopFn *fn,
736                                   void *opaque)
737 {
738     BlockJobDeferToMainLoopData *data = g_malloc(sizeof(*data));
739     data->job = job;
740     data->aio_context = blk_get_aio_context(job->blk);
741     data->fn = fn;
742     data->opaque = opaque;
743     job->deferred_to_main_loop = true;
744 
745     aio_bh_schedule_oneshot(qemu_get_aio_context(),
746                             block_job_defer_to_main_loop_bh, data);
747 }
748 
749 BlockJobTxn *block_job_txn_new(void)
750 {
751     BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
752     QLIST_INIT(&txn->jobs);
753     txn->refcnt = 1;
754     return txn;
755 }
756 
757 static void block_job_txn_ref(BlockJobTxn *txn)
758 {
759     txn->refcnt++;
760 }
761 
762 void block_job_txn_unref(BlockJobTxn *txn)
763 {
764     if (txn && --txn->refcnt == 0) {
765         g_free(txn);
766     }
767 }
768 
769 void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
770 {
771     if (!txn) {
772         return;
773     }
774 
775     assert(!job->txn);
776     job->txn = txn;
777 
778     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
779     block_job_txn_ref(txn);
780 }
781