xref: /openbmc/qemu/job.c (revision 880eeec6)
1 /*
2  * Background jobs (long-running operations)
3  *
4  * Copyright (c) 2011 IBM Corp.
5  * Copyright (c) 2012, 2018 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 #include "qapi/error.h"
28 #include "qemu/job.h"
29 #include "qemu/id.h"
30 #include "qemu/main-loop.h"
31 #include "block/aio-wait.h"
32 #include "trace/trace-root.h"
33 #include "qapi/qapi-events-job.h"
34 
35 /*
36  * The job API is composed of two categories of functions.
37  *
38  * The first includes functions used by the monitor.  The monitor is
39  * peculiar in that it accesses the job list with job_get, and
40  * therefore needs consistency across job_get and the actual operation
41  * (e.g. job_user_cancel). To achieve this consistency, the caller
42  * calls job_lock/job_unlock itself around the whole operation.
43  *
44  *
45  * The second includes functions used by the job drivers and sometimes
46  * by the core block layer. These delegate the locking to the callee instead.
47  *
48  * TODO Actually make this true
49  */
50 
51 /*
52  * job_mutex protects the jobs list, but also makes the
53  * struct job fields thread-safe.
54  */
55 QemuMutex job_mutex;
56 
57 /* Protected by job_mutex */
58 static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
59 
60 /* Job State Transition Table */
61 bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
62                                     /* U, C, R, P, Y, S, W, D, X, E, N */
63     /* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
64     /* C: */ [JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
65     /* R: */ [JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
66     /* P: */ [JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
67     /* Y: */ [JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
68     /* S: */ [JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
69     /* W: */ [JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
70     /* D: */ [JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
71     /* X: */ [JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
72     /* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
73     /* N: */ [JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
74 };
75 
76 bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
77                                     /* U, C, R, P, Y, S, W, D, X, E, N */
78     [JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
79     [JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
80     [JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
81     [JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
82     [JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
83     [JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
84     [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
85 };
86 
87 /* Transactional group of jobs */
88 struct JobTxn {
89 
90     /* Is this txn being cancelled? */
91     bool aborting;
92 
93     /* List of jobs */
94     QLIST_HEAD(, Job) jobs;
95 
96     /* Reference count */
97     int refcnt;
98 };
99 
100 void job_lock(void)
101 {
102     /* nop */
103 }
104 
105 void job_unlock(void)
106 {
107     /* nop */
108 }
109 
110 static void real_job_lock(void)
111 {
112     qemu_mutex_lock(&job_mutex);
113 }
114 
115 static void real_job_unlock(void)
116 {
117     qemu_mutex_unlock(&job_mutex);
118 }
119 
120 static void __attribute__((__constructor__)) job_init(void)
121 {
122     qemu_mutex_init(&job_mutex);
123 }
124 
125 JobTxn *job_txn_new(void)
126 {
127     JobTxn *txn = g_new0(JobTxn, 1);
128     QLIST_INIT(&txn->jobs);
129     txn->refcnt = 1;
130     return txn;
131 }
132 
133 /* Called with job_mutex held. */
134 static void job_txn_ref_locked(JobTxn *txn)
135 {
136     txn->refcnt++;
137 }
138 
139 void job_txn_unref_locked(JobTxn *txn)
140 {
141     if (txn && --txn->refcnt == 0) {
142         g_free(txn);
143     }
144 }
145 
146 void job_txn_unref(JobTxn *txn)
147 {
148     JOB_LOCK_GUARD();
149     job_txn_unref_locked(txn);
150 }
151 
152 /**
153  * @txn: The transaction (may be NULL)
154  * @job: Job to add to the transaction
155  *
156  * Add @job to the transaction.  The @job must not already be in a transaction.
157  * The caller must call either job_txn_unref() or job_completed() to release
158  * the reference that is automatically grabbed here.
159  *
160  * If @txn is NULL, the function does nothing.
161  *
162  * Called with job_mutex held.
163  */
164 static void job_txn_add_job_locked(JobTxn *txn, Job *job)
165 {
166     if (!txn) {
167         return;
168     }
169 
170     assert(!job->txn);
171     job->txn = txn;
172 
173     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
174     job_txn_ref_locked(txn);
175 }
176 
177 /* Called with job_mutex held. */
178 static void job_txn_del_job_locked(Job *job)
179 {
180     if (job->txn) {
181         QLIST_REMOVE(job, txn_list);
182         job_txn_unref_locked(job->txn);
183         job->txn = NULL;
184     }
185 }
186 
187 /* Called with job_mutex held, but releases it temporarily. */
188 static int job_txn_apply_locked(Job *job, int fn(Job *))
189 {
190     AioContext *inner_ctx;
191     Job *other_job, *next;
192     JobTxn *txn = job->txn;
193     int rc = 0;
194 
195     /*
196      * Similar to job_completed_txn_abort, we take each job's lock before
197      * applying fn, but since we assume that outer_ctx is held by the caller,
198      * we need to release it here to avoid holding the lock twice - which would
199      * break AIO_WAIT_WHILE from within fn.
200      */
201     job_ref_locked(job);
202     aio_context_release(job->aio_context);
203 
204     QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
205         inner_ctx = other_job->aio_context;
206         aio_context_acquire(inner_ctx);
207         rc = fn(other_job);
208         aio_context_release(inner_ctx);
209         if (rc) {
210             break;
211         }
212     }
213 
214     /*
215      * Note that job->aio_context might have been changed by calling fn, so we
216      * can't use a local variable to cache it.
217      */
218     aio_context_acquire(job->aio_context);
219     job_unref_locked(job);
220     return rc;
221 }
222 
223 bool job_is_internal(Job *job)
224 {
225     return (job->id == NULL);
226 }
227 
228 /* Called with job_mutex held. */
229 static void job_state_transition_locked(Job *job, JobStatus s1)
230 {
231     JobStatus s0 = job->status;
232     assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
233     trace_job_state_transition(job, job->ret,
234                                JobSTT[s0][s1] ? "allowed" : "disallowed",
235                                JobStatus_str(s0), JobStatus_str(s1));
236     assert(JobSTT[s0][s1]);
237     job->status = s1;
238 
239     if (!job_is_internal(job) && s1 != s0) {
240         qapi_event_send_job_status_change(job->id, job->status);
241     }
242 }
243 
244 int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp)
245 {
246     JobStatus s0 = job->status;
247     assert(verb >= 0 && verb < JOB_VERB__MAX);
248     trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
249                          JobVerbTable[verb][s0] ? "allowed" : "prohibited");
250     if (JobVerbTable[verb][s0]) {
251         return 0;
252     }
253     error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
254                job->id, JobStatus_str(s0), JobVerb_str(verb));
255     return -EPERM;
256 }
257 
258 int job_apply_verb(Job *job, JobVerb verb, Error **errp)
259 {
260     JOB_LOCK_GUARD();
261     return job_apply_verb_locked(job, verb, errp);
262 }
263 
264 JobType job_type(const Job *job)
265 {
266     return job->driver->job_type;
267 }
268 
269 const char *job_type_str(const Job *job)
270 {
271     return JobType_str(job_type(job));
272 }
273 
274 bool job_is_cancelled_locked(Job *job)
275 {
276     /* force_cancel may be true only if cancelled is true, too */
277     assert(job->cancelled || !job->force_cancel);
278     return job->force_cancel;
279 }
280 
281 bool job_is_cancelled(Job *job)
282 {
283     JOB_LOCK_GUARD();
284     return job_is_cancelled_locked(job);
285 }
286 
287 /* Called with job_mutex held. */
288 static bool job_cancel_requested_locked(Job *job)
289 {
290     return job->cancelled;
291 }
292 
293 bool job_cancel_requested(Job *job)
294 {
295     JOB_LOCK_GUARD();
296     return job_cancel_requested_locked(job);
297 }
298 
299 bool job_is_ready_locked(Job *job)
300 {
301     switch (job->status) {
302     case JOB_STATUS_UNDEFINED:
303     case JOB_STATUS_CREATED:
304     case JOB_STATUS_RUNNING:
305     case JOB_STATUS_PAUSED:
306     case JOB_STATUS_WAITING:
307     case JOB_STATUS_PENDING:
308     case JOB_STATUS_ABORTING:
309     case JOB_STATUS_CONCLUDED:
310     case JOB_STATUS_NULL:
311         return false;
312     case JOB_STATUS_READY:
313     case JOB_STATUS_STANDBY:
314         return true;
315     default:
316         g_assert_not_reached();
317     }
318     return false;
319 }
320 
321 bool job_is_ready(Job *job)
322 {
323     JOB_LOCK_GUARD();
324     return job_is_ready_locked(job);
325 }
326 
327 bool job_is_completed_locked(Job *job)
328 {
329     switch (job->status) {
330     case JOB_STATUS_UNDEFINED:
331     case JOB_STATUS_CREATED:
332     case JOB_STATUS_RUNNING:
333     case JOB_STATUS_PAUSED:
334     case JOB_STATUS_READY:
335     case JOB_STATUS_STANDBY:
336         return false;
337     case JOB_STATUS_WAITING:
338     case JOB_STATUS_PENDING:
339     case JOB_STATUS_ABORTING:
340     case JOB_STATUS_CONCLUDED:
341     case JOB_STATUS_NULL:
342         return true;
343     default:
344         g_assert_not_reached();
345     }
346     return false;
347 }
348 
349 bool job_is_completed(Job *job)
350 {
351     JOB_LOCK_GUARD();
352     return job_is_completed_locked(job);
353 }
354 
355 static bool job_started_locked(Job *job)
356 {
357     return job->co;
358 }
359 
360 /* Called with job_mutex held. */
361 static bool job_should_pause_locked(Job *job)
362 {
363     return job->pause_count > 0;
364 }
365 
366 Job *job_next_locked(Job *job)
367 {
368     if (!job) {
369         return QLIST_FIRST(&jobs);
370     }
371     return QLIST_NEXT(job, job_list);
372 }
373 
374 Job *job_next(Job *job)
375 {
376     JOB_LOCK_GUARD();
377     return job_next_locked(job);
378 }
379 
380 Job *job_get_locked(const char *id)
381 {
382     Job *job;
383 
384     QLIST_FOREACH(job, &jobs, job_list) {
385         if (job->id && !strcmp(id, job->id)) {
386             return job;
387         }
388     }
389 
390     return NULL;
391 }
392 
393 Job *job_get(const char *id)
394 {
395     JOB_LOCK_GUARD();
396     return job_get_locked(id);
397 }
398 
399 /* Called with job_mutex *not* held. */
400 static void job_sleep_timer_cb(void *opaque)
401 {
402     Job *job = opaque;
403 
404     job_enter(job);
405 }
406 
407 void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
408                  AioContext *ctx, int flags, BlockCompletionFunc *cb,
409                  void *opaque, Error **errp)
410 {
411     Job *job;
412 
413     JOB_LOCK_GUARD();
414 
415     if (job_id) {
416         if (flags & JOB_INTERNAL) {
417             error_setg(errp, "Cannot specify job ID for internal job");
418             return NULL;
419         }
420         if (!id_wellformed(job_id)) {
421             error_setg(errp, "Invalid job ID '%s'", job_id);
422             return NULL;
423         }
424         if (job_get_locked(job_id)) {
425             error_setg(errp, "Job ID '%s' already in use", job_id);
426             return NULL;
427         }
428     } else if (!(flags & JOB_INTERNAL)) {
429         error_setg(errp, "An explicit job ID is required");
430         return NULL;
431     }
432 
433     job = g_malloc0(driver->instance_size);
434     job->driver        = driver;
435     job->id            = g_strdup(job_id);
436     job->refcnt        = 1;
437     job->aio_context   = ctx;
438     job->busy          = false;
439     job->paused        = true;
440     job->pause_count   = 1;
441     job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
442     job->auto_dismiss  = !(flags & JOB_MANUAL_DISMISS);
443     job->cb            = cb;
444     job->opaque        = opaque;
445 
446     progress_init(&job->progress);
447 
448     notifier_list_init(&job->on_finalize_cancelled);
449     notifier_list_init(&job->on_finalize_completed);
450     notifier_list_init(&job->on_pending);
451     notifier_list_init(&job->on_ready);
452     notifier_list_init(&job->on_idle);
453 
454     job_state_transition_locked(job, JOB_STATUS_CREATED);
455     aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
456                    QEMU_CLOCK_REALTIME, SCALE_NS,
457                    job_sleep_timer_cb, job);
458 
459     QLIST_INSERT_HEAD(&jobs, job, job_list);
460 
461     /* Single jobs are modeled as single-job transactions for sake of
462      * consolidating the job management logic */
463     if (!txn) {
464         txn = job_txn_new();
465         job_txn_add_job_locked(txn, job);
466         job_txn_unref_locked(txn);
467     } else {
468         job_txn_add_job_locked(txn, job);
469     }
470 
471     return job;
472 }
473 
474 void job_ref_locked(Job *job)
475 {
476     ++job->refcnt;
477 }
478 
479 void job_ref(Job *job)
480 {
481     JOB_LOCK_GUARD();
482     job_ref_locked(job);
483 }
484 
485 void job_unref_locked(Job *job)
486 {
487     GLOBAL_STATE_CODE();
488 
489     if (--job->refcnt == 0) {
490         assert(job->status == JOB_STATUS_NULL);
491         assert(!timer_pending(&job->sleep_timer));
492         assert(!job->txn);
493 
494         if (job->driver->free) {
495             job_unlock();
496             job->driver->free(job);
497             job_lock();
498         }
499 
500         QLIST_REMOVE(job, job_list);
501 
502         progress_destroy(&job->progress);
503         error_free(job->err);
504         g_free(job->id);
505         g_free(job);
506     }
507 }
508 
509 void job_unref(Job *job)
510 {
511     JOB_LOCK_GUARD();
512     job_unref_locked(job);
513 }
514 
515 void job_progress_update(Job *job, uint64_t done)
516 {
517     progress_work_done(&job->progress, done);
518 }
519 
520 void job_progress_set_remaining(Job *job, uint64_t remaining)
521 {
522     progress_set_remaining(&job->progress, remaining);
523 }
524 
525 void job_progress_increase_remaining(Job *job, uint64_t delta)
526 {
527     progress_increase_remaining(&job->progress, delta);
528 }
529 
530 /**
531  * To be called when a cancelled job is finalised.
532  * Called with job_mutex held.
533  */
534 static void job_event_cancelled_locked(Job *job)
535 {
536     notifier_list_notify(&job->on_finalize_cancelled, job);
537 }
538 
539 /**
540  * To be called when a successfully completed job is finalised.
541  * Called with job_mutex held.
542  */
543 static void job_event_completed_locked(Job *job)
544 {
545     notifier_list_notify(&job->on_finalize_completed, job);
546 }
547 
548 /* Called with job_mutex held. */
549 static void job_event_pending_locked(Job *job)
550 {
551     notifier_list_notify(&job->on_pending, job);
552 }
553 
554 /* Called with job_mutex held. */
555 static void job_event_ready_locked(Job *job)
556 {
557     notifier_list_notify(&job->on_ready, job);
558 }
559 
560 /* Called with job_mutex held. */
561 static void job_event_idle_locked(Job *job)
562 {
563     notifier_list_notify(&job->on_idle, job);
564 }
565 
566 void job_enter_cond_locked(Job *job, bool(*fn)(Job *job))
567 {
568     if (!job_started_locked(job)) {
569         return;
570     }
571     if (job->deferred_to_main_loop) {
572         return;
573     }
574 
575     real_job_lock();
576     if (job->busy) {
577         real_job_unlock();
578         return;
579     }
580 
581     if (fn && !fn(job)) {
582         real_job_unlock();
583         return;
584     }
585 
586     assert(!job->deferred_to_main_loop);
587     timer_del(&job->sleep_timer);
588     job->busy = true;
589     real_job_unlock();
590     job_unlock();
591     aio_co_enter(job->aio_context, job->co);
592     job_lock();
593 }
594 
595 void job_enter_cond(Job *job, bool(*fn)(Job *job))
596 {
597     JOB_LOCK_GUARD();
598     job_enter_cond_locked(job, fn);
599 }
600 
601 void job_enter(Job *job)
602 {
603     JOB_LOCK_GUARD();
604     job_enter_cond_locked(job, NULL);
605 }
606 
607 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
608  * Reentering the job coroutine with job_enter() before the timer has expired
609  * is allowed and cancels the timer.
610  *
611  * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
612  * called explicitly.
613  *
614  * Called with job_mutex held, but releases it temporarily.
615  */
616 static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
617 {
618     real_job_lock();
619     if (ns != -1) {
620         timer_mod(&job->sleep_timer, ns);
621     }
622     job->busy = false;
623     job_event_idle_locked(job);
624     real_job_unlock();
625     job_unlock();
626     qemu_coroutine_yield();
627     job_lock();
628 
629     /* Set by job_enter_cond() before re-entering the coroutine.  */
630     assert(job->busy);
631 }
632 
633 /* Called with job_mutex held, but releases it temporarily. */
634 static void coroutine_fn job_pause_point_locked(Job *job)
635 {
636     assert(job && job_started_locked(job));
637 
638     if (!job_should_pause_locked(job)) {
639         return;
640     }
641     if (job_is_cancelled_locked(job)) {
642         return;
643     }
644 
645     if (job->driver->pause) {
646         job_unlock();
647         job->driver->pause(job);
648         job_lock();
649     }
650 
651     if (job_should_pause_locked(job) && !job_is_cancelled_locked(job)) {
652         JobStatus status = job->status;
653         job_state_transition_locked(job, status == JOB_STATUS_READY
654                                     ? JOB_STATUS_STANDBY
655                                     : JOB_STATUS_PAUSED);
656         job->paused = true;
657         job_do_yield_locked(job, -1);
658         job->paused = false;
659         job_state_transition_locked(job, status);
660     }
661 
662     if (job->driver->resume) {
663         job_unlock();
664         job->driver->resume(job);
665         job_lock();
666     }
667 }
668 
669 void coroutine_fn job_pause_point(Job *job)
670 {
671     JOB_LOCK_GUARD();
672     job_pause_point_locked(job);
673 }
674 
675 static void coroutine_fn job_yield_locked(Job *job)
676 {
677     assert(job->busy);
678 
679     /* Check cancellation *before* setting busy = false, too!  */
680     if (job_is_cancelled_locked(job)) {
681         return;
682     }
683 
684     if (!job_should_pause_locked(job)) {
685         job_do_yield_locked(job, -1);
686     }
687 
688     job_pause_point_locked(job);
689 }
690 
691 void coroutine_fn job_yield(Job *job)
692 {
693     JOB_LOCK_GUARD();
694     job_yield_locked(job);
695 }
696 
697 void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
698 {
699     JOB_LOCK_GUARD();
700     assert(job->busy);
701 
702     /* Check cancellation *before* setting busy = false, too!  */
703     if (job_is_cancelled_locked(job)) {
704         return;
705     }
706 
707     if (!job_should_pause_locked(job)) {
708         job_do_yield_locked(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
709     }
710 
711     job_pause_point_locked(job);
712 }
713 
714 /* Assumes the job_mutex is held */
715 static bool job_timer_not_pending_locked(Job *job)
716 {
717     return !timer_pending(&job->sleep_timer);
718 }
719 
720 void job_pause_locked(Job *job)
721 {
722     job->pause_count++;
723     if (!job->paused) {
724         job_enter_cond_locked(job, NULL);
725     }
726 }
727 
728 void job_pause(Job *job)
729 {
730     JOB_LOCK_GUARD();
731     job_pause_locked(job);
732 }
733 
734 void job_resume_locked(Job *job)
735 {
736     assert(job->pause_count > 0);
737     job->pause_count--;
738     if (job->pause_count) {
739         return;
740     }
741 
742     /* kick only if no timer is pending */
743     job_enter_cond_locked(job, job_timer_not_pending_locked);
744 }
745 
746 void job_resume(Job *job)
747 {
748     JOB_LOCK_GUARD();
749     job_resume_locked(job);
750 }
751 
752 void job_user_pause_locked(Job *job, Error **errp)
753 {
754     if (job_apply_verb_locked(job, JOB_VERB_PAUSE, errp)) {
755         return;
756     }
757     if (job->user_paused) {
758         error_setg(errp, "Job is already paused");
759         return;
760     }
761     job->user_paused = true;
762     job_pause_locked(job);
763 }
764 
765 void job_user_pause(Job *job, Error **errp)
766 {
767     JOB_LOCK_GUARD();
768     job_user_pause_locked(job, errp);
769 }
770 
771 bool job_user_paused_locked(Job *job)
772 {
773     return job->user_paused;
774 }
775 
776 bool job_user_paused(Job *job)
777 {
778     JOB_LOCK_GUARD();
779     return job_user_paused_locked(job);
780 }
781 
782 void job_user_resume_locked(Job *job, Error **errp)
783 {
784     assert(job);
785     GLOBAL_STATE_CODE();
786     if (!job->user_paused || job->pause_count <= 0) {
787         error_setg(errp, "Can't resume a job that was not paused");
788         return;
789     }
790     if (job_apply_verb_locked(job, JOB_VERB_RESUME, errp)) {
791         return;
792     }
793     if (job->driver->user_resume) {
794         job_unlock();
795         job->driver->user_resume(job);
796         job_lock();
797     }
798     job->user_paused = false;
799     job_resume_locked(job);
800 }
801 
802 void job_user_resume(Job *job, Error **errp)
803 {
804     JOB_LOCK_GUARD();
805     job_user_resume_locked(job, errp);
806 }
807 
808 /* Called with job_mutex held, but releases it temporarily. */
809 static void job_do_dismiss_locked(Job *job)
810 {
811     assert(job);
812     job->busy = false;
813     job->paused = false;
814     job->deferred_to_main_loop = true;
815 
816     job_txn_del_job_locked(job);
817 
818     job_state_transition_locked(job, JOB_STATUS_NULL);
819     job_unref_locked(job);
820 }
821 
822 void job_dismiss_locked(Job **jobptr, Error **errp)
823 {
824     Job *job = *jobptr;
825     /* similarly to _complete, this is QMP-interface only. */
826     assert(job->id);
827     if (job_apply_verb_locked(job, JOB_VERB_DISMISS, errp)) {
828         return;
829     }
830 
831     job_do_dismiss_locked(job);
832     *jobptr = NULL;
833 }
834 
835 void job_dismiss(Job **jobptr, Error **errp)
836 {
837     JOB_LOCK_GUARD();
838     job_dismiss_locked(jobptr, errp);
839 }
840 
841 void job_early_fail(Job *job)
842 {
843     JOB_LOCK_GUARD();
844     assert(job->status == JOB_STATUS_CREATED);
845     job_do_dismiss_locked(job);
846 }
847 
848 /* Called with job_mutex held. */
849 static void job_conclude_locked(Job *job)
850 {
851     job_state_transition_locked(job, JOB_STATUS_CONCLUDED);
852     if (job->auto_dismiss || !job_started_locked(job)) {
853         job_do_dismiss_locked(job);
854     }
855 }
856 
857 /* Called with job_mutex held. */
858 static void job_update_rc_locked(Job *job)
859 {
860     if (!job->ret && job_is_cancelled_locked(job)) {
861         job->ret = -ECANCELED;
862     }
863     if (job->ret) {
864         if (!job->err) {
865             error_setg(&job->err, "%s", strerror(-job->ret));
866         }
867         job_state_transition_locked(job, JOB_STATUS_ABORTING);
868     }
869 }
870 
871 static void job_commit(Job *job)
872 {
873     assert(!job->ret);
874     GLOBAL_STATE_CODE();
875     if (job->driver->commit) {
876         job->driver->commit(job);
877     }
878 }
879 
880 static void job_abort(Job *job)
881 {
882     assert(job->ret);
883     GLOBAL_STATE_CODE();
884     if (job->driver->abort) {
885         job->driver->abort(job);
886     }
887 }
888 
889 static void job_clean(Job *job)
890 {
891     GLOBAL_STATE_CODE();
892     if (job->driver->clean) {
893         job->driver->clean(job);
894     }
895 }
896 
897 /* Called with job_mutex held, but releases it temporarily */
898 static int job_finalize_single_locked(Job *job)
899 {
900     int job_ret;
901 
902     assert(job_is_completed_locked(job));
903 
904     /* Ensure abort is called for late-transactional failures */
905     job_update_rc_locked(job);
906 
907     job_ret = job->ret;
908     job_unlock();
909 
910     if (!job_ret) {
911         job_commit(job);
912     } else {
913         job_abort(job);
914     }
915     job_clean(job);
916 
917     job_lock();
918 
919     if (job->cb) {
920         job_ret = job->ret;
921         job_unlock();
922         job->cb(job->opaque, job_ret);
923         job_lock();
924     }
925 
926     /* Emit events only if we actually started */
927     if (job_started_locked(job)) {
928         if (job_is_cancelled_locked(job)) {
929             job_event_cancelled_locked(job);
930         } else {
931             job_event_completed_locked(job);
932         }
933     }
934 
935     job_txn_del_job_locked(job);
936     job_conclude_locked(job);
937     return 0;
938 }
939 
940 /* Called with job_mutex held, but releases it temporarily */
941 static void job_cancel_async_locked(Job *job, bool force)
942 {
943     GLOBAL_STATE_CODE();
944     if (job->driver->cancel) {
945         job_unlock();
946         force = job->driver->cancel(job, force);
947         job_lock();
948     } else {
949         /* No .cancel() means the job will behave as if force-cancelled */
950         force = true;
951     }
952 
953     if (job->user_paused) {
954         /* Do not call job_enter here, the caller will handle it.  */
955         if (job->driver->user_resume) {
956             job_unlock();
957             job->driver->user_resume(job);
958             job_lock();
959         }
960         job->user_paused = false;
961         assert(job->pause_count > 0);
962         job->pause_count--;
963     }
964 
965     /*
966      * Ignore soft cancel requests after the job is already done
967      * (We will still invoke job->driver->cancel() above, but if the
968      * job driver supports soft cancelling and the job is done, that
969      * should be a no-op, too.  We still call it so it can override
970      * @force.)
971      */
972     if (force || !job->deferred_to_main_loop) {
973         job->cancelled = true;
974         /* To prevent 'force == false' overriding a previous 'force == true' */
975         job->force_cancel |= force;
976     }
977 }
978 
979 /* Called with job_mutex held, but releases it temporarily. */
980 static void job_completed_txn_abort_locked(Job *job)
981 {
982     AioContext *ctx;
983     JobTxn *txn = job->txn;
984     Job *other_job;
985 
986     if (txn->aborting) {
987         /*
988          * We are cancelled by another job, which will handle everything.
989          */
990         return;
991     }
992     txn->aborting = true;
993     job_txn_ref_locked(txn);
994 
995     /*
996      * We can only hold the single job's AioContext lock while calling
997      * job_finalize_single() because the finalization callbacks can involve
998      * calls of AIO_WAIT_WHILE(), which could deadlock otherwise.
999      * Note that the job's AioContext may change when it is finalized.
1000      */
1001     job_ref_locked(job);
1002     aio_context_release(job->aio_context);
1003 
1004     /* Other jobs are effectively cancelled by us, set the status for
1005      * them; this job, however, may or may not be cancelled, depending
1006      * on the caller, so leave it. */
1007     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
1008         if (other_job != job) {
1009             ctx = other_job->aio_context;
1010             aio_context_acquire(ctx);
1011             /*
1012              * This is a transaction: If one job failed, no result will matter.
1013              * Therefore, pass force=true to terminate all other jobs as quickly
1014              * as possible.
1015              */
1016             job_cancel_async_locked(other_job, true);
1017             aio_context_release(ctx);
1018         }
1019     }
1020     while (!QLIST_EMPTY(&txn->jobs)) {
1021         other_job = QLIST_FIRST(&txn->jobs);
1022         /*
1023          * The job's AioContext may change, so store it in @ctx so we
1024          * release the same context that we have acquired before.
1025          */
1026         ctx = other_job->aio_context;
1027         aio_context_acquire(ctx);
1028         if (!job_is_completed_locked(other_job)) {
1029             assert(job_cancel_requested_locked(other_job));
1030             job_finish_sync_locked(other_job, NULL, NULL);
1031         }
1032         job_finalize_single_locked(other_job);
1033         aio_context_release(ctx);
1034     }
1035 
1036     /*
1037      * Use job_ref()/job_unref() so we can read the AioContext here
1038      * even if the job went away during job_finalize_single().
1039      */
1040     aio_context_acquire(job->aio_context);
1041     job_unref_locked(job);
1042 
1043     job_txn_unref_locked(txn);
1044 }
1045 
1046 /* Called with job_mutex held, but releases it temporarily */
1047 static int job_prepare_locked(Job *job)
1048 {
1049     int ret;
1050 
1051     GLOBAL_STATE_CODE();
1052     if (job->ret == 0 && job->driver->prepare) {
1053         job_unlock();
1054         ret = job->driver->prepare(job);
1055         job_lock();
1056         job->ret = ret;
1057         job_update_rc_locked(job);
1058     }
1059     return job->ret;
1060 }
1061 
1062 /* Called with job_mutex held */
1063 static int job_needs_finalize_locked(Job *job)
1064 {
1065     return !job->auto_finalize;
1066 }
1067 
1068 /* Called with job_mutex held */
1069 static void job_do_finalize_locked(Job *job)
1070 {
1071     int rc;
1072     assert(job && job->txn);
1073 
1074     /* prepare the transaction to complete */
1075     rc = job_txn_apply_locked(job, job_prepare_locked);
1076     if (rc) {
1077         job_completed_txn_abort_locked(job);
1078     } else {
1079         job_txn_apply_locked(job, job_finalize_single_locked);
1080     }
1081 }
1082 
1083 void job_finalize_locked(Job *job, Error **errp)
1084 {
1085     assert(job && job->id);
1086     if (job_apply_verb_locked(job, JOB_VERB_FINALIZE, errp)) {
1087         return;
1088     }
1089     job_do_finalize_locked(job);
1090 }
1091 
1092 void job_finalize(Job *job, Error **errp)
1093 {
1094     JOB_LOCK_GUARD();
1095     job_finalize_locked(job, errp);
1096 }
1097 
1098 /* Called with job_mutex held. */
1099 static int job_transition_to_pending_locked(Job *job)
1100 {
1101     job_state_transition_locked(job, JOB_STATUS_PENDING);
1102     if (!job->auto_finalize) {
1103         job_event_pending_locked(job);
1104     }
1105     return 0;
1106 }
1107 
1108 void job_transition_to_ready(Job *job)
1109 {
1110     JOB_LOCK_GUARD();
1111     job_state_transition_locked(job, JOB_STATUS_READY);
1112     job_event_ready_locked(job);
1113 }
1114 
1115 /* Called with job_mutex held. */
1116 static void job_completed_txn_success_locked(Job *job)
1117 {
1118     JobTxn *txn = job->txn;
1119     Job *other_job;
1120 
1121     job_state_transition_locked(job, JOB_STATUS_WAITING);
1122 
1123     /*
1124      * Successful completion, see if there are other running jobs in this
1125      * txn.
1126      */
1127     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
1128         if (!job_is_completed_locked(other_job)) {
1129             return;
1130         }
1131         assert(other_job->ret == 0);
1132     }
1133 
1134     job_txn_apply_locked(job, job_transition_to_pending_locked);
1135 
1136     /* If no jobs need manual finalization, automatically do so */
1137     if (job_txn_apply_locked(job, job_needs_finalize_locked) == 0) {
1138         job_do_finalize_locked(job);
1139     }
1140 }
1141 
1142 /* Called with job_mutex held. */
1143 static void job_completed_locked(Job *job)
1144 {
1145     assert(job && job->txn && !job_is_completed_locked(job));
1146 
1147     job_update_rc_locked(job);
1148     trace_job_completed(job, job->ret);
1149     if (job->ret) {
1150         job_completed_txn_abort_locked(job);
1151     } else {
1152         job_completed_txn_success_locked(job);
1153     }
1154 }
1155 
1156 /**
1157  * Useful only as a type shim for aio_bh_schedule_oneshot.
1158  * Called with job_mutex *not* held.
1159  */
1160 static void job_exit(void *opaque)
1161 {
1162     Job *job = (Job *)opaque;
1163     AioContext *ctx;
1164     JOB_LOCK_GUARD();
1165 
1166     job_ref_locked(job);
1167     aio_context_acquire(job->aio_context);
1168 
1169     /* This is a lie, we're not quiescent, but still doing the completion
1170      * callbacks. However, completion callbacks tend to involve operations that
1171      * drain block nodes, and if .drained_poll still returned true, we would
1172      * deadlock. */
1173     job->busy = false;
1174     job_event_idle_locked(job);
1175 
1176     job_completed_locked(job);
1177 
1178     /*
1179      * Note that calling job_completed can move the job to a different
1180      * aio_context, so we cannot cache from above. job_txn_apply takes care of
1181      * acquiring the new lock, and we ref/unref to avoid job_completed freeing
1182      * the job underneath us.
1183      */
1184     ctx = job->aio_context;
1185     job_unref_locked(job);
1186     aio_context_release(ctx);
1187 }
1188 
1189 /**
1190  * All jobs must allow a pause point before entering their job proper. This
1191  * ensures that jobs can be paused prior to being started, then resumed later.
1192  */
1193 static void coroutine_fn job_co_entry(void *opaque)
1194 {
1195     Job *job = opaque;
1196     int ret;
1197 
1198     assert(job && job->driver && job->driver->run);
1199     WITH_JOB_LOCK_GUARD() {
1200         assert(job->aio_context == qemu_get_current_aio_context());
1201         job_pause_point_locked(job);
1202     }
1203     ret = job->driver->run(job, &job->err);
1204     WITH_JOB_LOCK_GUARD() {
1205         job->ret = ret;
1206         job->deferred_to_main_loop = true;
1207         job->busy = true;
1208     }
1209     aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
1210 }
1211 
1212 void job_start(Job *job)
1213 {
1214     assert(qemu_in_main_thread());
1215 
1216     WITH_JOB_LOCK_GUARD() {
1217         assert(job && !job_started_locked(job) && job->paused &&
1218             job->driver && job->driver->run);
1219         job->co = qemu_coroutine_create(job_co_entry, job);
1220         job->pause_count--;
1221         job->busy = true;
1222         job->paused = false;
1223         job_state_transition_locked(job, JOB_STATUS_RUNNING);
1224     }
1225     aio_co_enter(job->aio_context, job->co);
1226 }
1227 
1228 void job_cancel_locked(Job *job, bool force)
1229 {
1230     if (job->status == JOB_STATUS_CONCLUDED) {
1231         job_do_dismiss_locked(job);
1232         return;
1233     }
1234     job_cancel_async_locked(job, force);
1235     if (!job_started_locked(job)) {
1236         job_completed_locked(job);
1237     } else if (job->deferred_to_main_loop) {
1238         /*
1239          * job_cancel_async() ignores soft-cancel requests for jobs
1240          * that are already done (i.e. deferred to the main loop).  We
1241          * have to check again whether the job is really cancelled.
1242          * (job_cancel_requested() and job_is_cancelled() are equivalent
1243          * here, because job_cancel_async() will make soft-cancel
1244          * requests no-ops when deferred_to_main_loop is true.  We
1245          * choose to call job_is_cancelled() to show that we invoke
1246          * job_completed_txn_abort() only for force-cancelled jobs.)
1247          */
1248         if (job_is_cancelled_locked(job)) {
1249             job_completed_txn_abort_locked(job);
1250         }
1251     } else {
1252         job_enter_cond_locked(job, NULL);
1253     }
1254 }
1255 
1256 void job_cancel(Job *job, bool force)
1257 {
1258     JOB_LOCK_GUARD();
1259     job_cancel_locked(job, force);
1260 }
1261 
1262 void job_user_cancel_locked(Job *job, bool force, Error **errp)
1263 {
1264     if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
1265         return;
1266     }
1267     job_cancel_locked(job, force);
1268 }
1269 
1270 void job_user_cancel(Job *job, bool force, Error **errp)
1271 {
1272     JOB_LOCK_GUARD();
1273     job_user_cancel_locked(job, force, errp);
1274 }
1275 
1276 /* A wrapper around job_cancel() taking an Error ** parameter so it may be
1277  * used with job_finish_sync() without the need for (rather nasty) function
1278  * pointer casts there.
1279  *
1280  * Called with job_mutex held.
1281  */
1282 static void job_cancel_err_locked(Job *job, Error **errp)
1283 {
1284     job_cancel_locked(job, false);
1285 }
1286 
1287 /**
1288  * Same as job_cancel_err(), but force-cancel.
1289  * Called with job_mutex held.
1290  */
1291 static void job_force_cancel_err_locked(Job *job, Error **errp)
1292 {
1293     job_cancel_locked(job, true);
1294 }
1295 
1296 int job_cancel_sync_locked(Job *job, bool force)
1297 {
1298     if (force) {
1299         return job_finish_sync_locked(job, &job_force_cancel_err_locked, NULL);
1300     } else {
1301         return job_finish_sync_locked(job, &job_cancel_err_locked, NULL);
1302     }
1303 }
1304 
1305 int job_cancel_sync(Job *job, bool force)
1306 {
1307     JOB_LOCK_GUARD();
1308     return job_cancel_sync_locked(job, force);
1309 }
1310 
1311 void job_cancel_sync_all(void)
1312 {
1313     Job *job;
1314     AioContext *aio_context;
1315     JOB_LOCK_GUARD();
1316 
1317     while ((job = job_next_locked(NULL))) {
1318         aio_context = job->aio_context;
1319         aio_context_acquire(aio_context);
1320         job_cancel_sync_locked(job, true);
1321         aio_context_release(aio_context);
1322     }
1323 }
1324 
1325 int job_complete_sync_locked(Job *job, Error **errp)
1326 {
1327     return job_finish_sync_locked(job, job_complete_locked, errp);
1328 }
1329 
1330 int job_complete_sync(Job *job, Error **errp)
1331 {
1332     JOB_LOCK_GUARD();
1333     return job_complete_sync_locked(job, errp);
1334 }
1335 
1336 void job_complete_locked(Job *job, Error **errp)
1337 {
1338     /* Should not be reachable via external interface for internal jobs */
1339     assert(job->id);
1340     GLOBAL_STATE_CODE();
1341     if (job_apply_verb_locked(job, JOB_VERB_COMPLETE, errp)) {
1342         return;
1343     }
1344     if (job_cancel_requested_locked(job) || !job->driver->complete) {
1345         error_setg(errp, "The active block job '%s' cannot be completed",
1346                    job->id);
1347         return;
1348     }
1349 
1350     job_unlock();
1351     job->driver->complete(job, errp);
1352     job_lock();
1353 }
1354 
1355 void job_complete(Job *job, Error **errp)
1356 {
1357     JOB_LOCK_GUARD();
1358     job_complete_locked(job, errp);
1359 }
1360 
1361 int job_finish_sync_locked(Job *job,
1362                            void (*finish)(Job *, Error **errp),
1363                            Error **errp)
1364 {
1365     Error *local_err = NULL;
1366     int ret;
1367 
1368     job_ref_locked(job);
1369 
1370     if (finish) {
1371         finish(job, &local_err);
1372     }
1373     if (local_err) {
1374         error_propagate(errp, local_err);
1375         job_unref_locked(job);
1376         return -EBUSY;
1377     }
1378 
1379     job_unlock();
1380     AIO_WAIT_WHILE(job->aio_context,
1381                    (job_enter(job), !job_is_completed(job)));
1382     job_lock();
1383 
1384     ret = (job_is_cancelled_locked(job) && job->ret == 0)
1385           ? -ECANCELED : job->ret;
1386     job_unref_locked(job);
1387     return ret;
1388 }
1389 
1390 int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
1391 {
1392     JOB_LOCK_GUARD();
1393     return job_finish_sync_locked(job, finish, errp);
1394 }
1395