xref: /openbmc/qemu/job.c (revision a75ed3c43064528f3409f0be286b62b9c3a47218)
1  /*
2   * Background jobs (long-running operations)
3   *
4   * Copyright (c) 2011 IBM Corp.
5   * Copyright (c) 2012, 2018 Red Hat, Inc.
6   *
7   * Permission is hereby granted, free of charge, to any person obtaining a copy
8   * of this software and associated documentation files (the "Software"), to deal
9   * in the Software without restriction, including without limitation the rights
10   * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11   * copies of the Software, and to permit persons to whom the Software is
12   * furnished to do so, subject to the following conditions:
13   *
14   * The above copyright notice and this permission notice shall be included in
15   * all copies or substantial portions of the Software.
16   *
17   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20   * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23   * THE SOFTWARE.
24   */
25  
26  #include "qemu/osdep.h"
27  #include "qapi/error.h"
28  #include "qemu/job.h"
29  #include "qemu/id.h"
30  #include "qemu/main-loop.h"
31  #include "block/aio-wait.h"
32  #include "trace/trace-root.h"
33  #include "qapi/qapi-events-job.h"
34  
35  /*
36   * The job API is composed of two categories of functions.
37   *
38   * The first includes functions used by the monitor.  The monitor is
39   * peculiar in that it accesses the job list with job_get, and
40   * therefore needs consistency across job_get and the actual operation
41   * (e.g. job_user_cancel). To achieve this consistency, the caller
42   * calls job_lock/job_unlock itself around the whole operation.
43   *
44   *
45   * The second includes functions used by the job drivers and sometimes
46   * by the core block layer. These delegate the locking to the callee instead.
47   */
48  
49  /*
50   * job_mutex protects the jobs list, but also makes the
51   * struct job fields thread-safe.
52   */
53  QemuMutex job_mutex;
54  
55  /* Protected by job_mutex */
56  static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
57  
58  /* Job State Transition Table */
59  bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
60                                      /* U, C, R, P, Y, S, W, D, X, E, N */
61      /* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
62      /* C: */ [JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
63      /* R: */ [JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
64      /* P: */ [JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
65      /* Y: */ [JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
66      /* S: */ [JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
67      /* W: */ [JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
68      /* D: */ [JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
69      /* X: */ [JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
70      /* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
71      /* N: */ [JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
72  };
73  
74  bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
75                                      /* U, C, R, P, Y, S, W, D, X, E, N */
76      [JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
77      [JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
78      [JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
79      [JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
80      [JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
81      [JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
82      [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
83  };
84  
85  /* Transactional group of jobs */
86  struct JobTxn {
87  
88      /* Is this txn being cancelled? */
89      bool aborting;
90  
91      /* List of jobs */
92      QLIST_HEAD(, Job) jobs;
93  
94      /* Reference count */
95      int refcnt;
96  };
97  
98  void job_lock(void)
99  {
100      qemu_mutex_lock(&job_mutex);
101  }
102  
103  void job_unlock(void)
104  {
105      qemu_mutex_unlock(&job_mutex);
106  }
107  
108  static void __attribute__((__constructor__)) job_init(void)
109  {
110      qemu_mutex_init(&job_mutex);
111  }
112  
113  JobTxn *job_txn_new(void)
114  {
115      JobTxn *txn = g_new0(JobTxn, 1);
116      QLIST_INIT(&txn->jobs);
117      txn->refcnt = 1;
118      return txn;
119  }
120  
121  /* Called with job_mutex held. */
122  static void job_txn_ref_locked(JobTxn *txn)
123  {
124      txn->refcnt++;
125  }
126  
127  void job_txn_unref_locked(JobTxn *txn)
128  {
129      if (txn && --txn->refcnt == 0) {
130          g_free(txn);
131      }
132  }
133  
134  void job_txn_unref(JobTxn *txn)
135  {
136      JOB_LOCK_GUARD();
137      job_txn_unref_locked(txn);
138  }
139  
140  /**
141   * @txn: The transaction (may be NULL)
142   * @job: Job to add to the transaction
143   *
144   * Add @job to the transaction.  The @job must not already be in a transaction.
145   * The caller must call either job_txn_unref() or job_completed() to release
146   * the reference that is automatically grabbed here.
147   *
148   * If @txn is NULL, the function does nothing.
149   *
150   * Called with job_mutex held.
151   */
152  static void job_txn_add_job_locked(JobTxn *txn, Job *job)
153  {
154      if (!txn) {
155          return;
156      }
157  
158      assert(!job->txn);
159      job->txn = txn;
160  
161      QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
162      job_txn_ref_locked(txn);
163  }
164  
165  /* Called with job_mutex held. */
166  static void job_txn_del_job_locked(Job *job)
167  {
168      if (job->txn) {
169          QLIST_REMOVE(job, txn_list);
170          job_txn_unref_locked(job->txn);
171          job->txn = NULL;
172      }
173  }
174  
175  /* Called with job_mutex held, but releases it temporarily. */
176  static int job_txn_apply_locked(Job *job, int fn(Job *))
177  {
178      Job *other_job, *next;
179      JobTxn *txn = job->txn;
180      int rc = 0;
181  
182      /*
183       * Similar to job_completed_txn_abort, we take each job's lock before
184       * applying fn, but since we assume that outer_ctx is held by the caller,
185       * we need to release it here to avoid holding the lock twice - which would
186       * break AIO_WAIT_WHILE from within fn.
187       */
188      job_ref_locked(job);
189  
190      QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
191          rc = fn(other_job);
192          if (rc) {
193              break;
194          }
195      }
196  
197      job_unref_locked(job);
198      return rc;
199  }
200  
201  bool job_is_internal(Job *job)
202  {
203      return (job->id == NULL);
204  }
205  
206  /* Called with job_mutex held. */
207  static void job_state_transition_locked(Job *job, JobStatus s1)
208  {
209      JobStatus s0 = job->status;
210      assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
211      trace_job_state_transition(job, job->ret,
212                                 JobSTT[s0][s1] ? "allowed" : "disallowed",
213                                 JobStatus_str(s0), JobStatus_str(s1));
214      assert(JobSTT[s0][s1]);
215      job->status = s1;
216  
217      if (!job_is_internal(job) && s1 != s0) {
218          qapi_event_send_job_status_change(job->id, job->status);
219      }
220  }
221  
222  int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp)
223  {
224      JobStatus s0 = job->status;
225      assert(verb >= 0 && verb < JOB_VERB__MAX);
226      trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
227                           JobVerbTable[verb][s0] ? "allowed" : "prohibited");
228      if (JobVerbTable[verb][s0]) {
229          return 0;
230      }
231      error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
232                 job->id, JobStatus_str(s0), JobVerb_str(verb));
233      return -EPERM;
234  }
235  
236  JobType job_type(const Job *job)
237  {
238      return job->driver->job_type;
239  }
240  
241  const char *job_type_str(const Job *job)
242  {
243      return JobType_str(job_type(job));
244  }
245  
246  bool job_is_cancelled_locked(Job *job)
247  {
248      /* force_cancel may be true only if cancelled is true, too */
249      assert(job->cancelled || !job->force_cancel);
250      return job->force_cancel;
251  }
252  
253  bool job_is_cancelled(Job *job)
254  {
255      JOB_LOCK_GUARD();
256      return job_is_cancelled_locked(job);
257  }
258  
259  /* Called with job_mutex held. */
260  static bool job_cancel_requested_locked(Job *job)
261  {
262      return job->cancelled;
263  }
264  
265  bool job_cancel_requested(Job *job)
266  {
267      JOB_LOCK_GUARD();
268      return job_cancel_requested_locked(job);
269  }
270  
271  bool job_is_ready_locked(Job *job)
272  {
273      switch (job->status) {
274      case JOB_STATUS_UNDEFINED:
275      case JOB_STATUS_CREATED:
276      case JOB_STATUS_RUNNING:
277      case JOB_STATUS_PAUSED:
278      case JOB_STATUS_WAITING:
279      case JOB_STATUS_PENDING:
280      case JOB_STATUS_ABORTING:
281      case JOB_STATUS_CONCLUDED:
282      case JOB_STATUS_NULL:
283          return false;
284      case JOB_STATUS_READY:
285      case JOB_STATUS_STANDBY:
286          return true;
287      default:
288          g_assert_not_reached();
289      }
290      return false;
291  }
292  
293  bool job_is_ready(Job *job)
294  {
295      JOB_LOCK_GUARD();
296      return job_is_ready_locked(job);
297  }
298  
299  bool job_is_completed_locked(Job *job)
300  {
301      switch (job->status) {
302      case JOB_STATUS_UNDEFINED:
303      case JOB_STATUS_CREATED:
304      case JOB_STATUS_RUNNING:
305      case JOB_STATUS_PAUSED:
306      case JOB_STATUS_READY:
307      case JOB_STATUS_STANDBY:
308          return false;
309      case JOB_STATUS_WAITING:
310      case JOB_STATUS_PENDING:
311      case JOB_STATUS_ABORTING:
312      case JOB_STATUS_CONCLUDED:
313      case JOB_STATUS_NULL:
314          return true;
315      default:
316          g_assert_not_reached();
317      }
318      return false;
319  }
320  
321  static bool job_is_completed(Job *job)
322  {
323      JOB_LOCK_GUARD();
324      return job_is_completed_locked(job);
325  }
326  
327  static bool job_started_locked(Job *job)
328  {
329      return job->co;
330  }
331  
332  /* Called with job_mutex held. */
333  static bool job_should_pause_locked(Job *job)
334  {
335      return job->pause_count > 0;
336  }
337  
338  Job *job_next_locked(Job *job)
339  {
340      if (!job) {
341          return QLIST_FIRST(&jobs);
342      }
343      return QLIST_NEXT(job, job_list);
344  }
345  
346  Job *job_next(Job *job)
347  {
348      JOB_LOCK_GUARD();
349      return job_next_locked(job);
350  }
351  
352  Job *job_get_locked(const char *id)
353  {
354      Job *job;
355  
356      QLIST_FOREACH(job, &jobs, job_list) {
357          if (job->id && !strcmp(id, job->id)) {
358              return job;
359          }
360      }
361  
362      return NULL;
363  }
364  
365  void job_set_aio_context(Job *job, AioContext *ctx)
366  {
367      /* protect against read in job_finish_sync_locked and job_start */
368      GLOBAL_STATE_CODE();
369      /* protect against read in job_do_yield_locked */
370      JOB_LOCK_GUARD();
371      /* ensure the job is quiescent while the AioContext is changed */
372      assert(job->paused || job_is_completed_locked(job));
373      job->aio_context = ctx;
374  }
375  
376  /* Called with job_mutex *not* held. */
377  static void job_sleep_timer_cb(void *opaque)
378  {
379      Job *job = opaque;
380  
381      job_enter(job);
382  }
383  
384  void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
385                   AioContext *ctx, int flags, BlockCompletionFunc *cb,
386                   void *opaque, Error **errp)
387  {
388      Job *job;
389  
390      JOB_LOCK_GUARD();
391  
392      if (job_id) {
393          if (flags & JOB_INTERNAL) {
394              error_setg(errp, "Cannot specify job ID for internal job");
395              return NULL;
396          }
397          if (!id_wellformed(job_id)) {
398              error_setg(errp, "Invalid job ID '%s'", job_id);
399              return NULL;
400          }
401          if (job_get_locked(job_id)) {
402              error_setg(errp, "Job ID '%s' already in use", job_id);
403              return NULL;
404          }
405      } else if (!(flags & JOB_INTERNAL)) {
406          error_setg(errp, "An explicit job ID is required");
407          return NULL;
408      }
409  
410      job = g_malloc0(driver->instance_size);
411      job->driver        = driver;
412      job->id            = g_strdup(job_id);
413      job->refcnt        = 1;
414      job->aio_context   = ctx;
415      job->busy          = false;
416      job->paused        = true;
417      job->pause_count   = 1;
418      job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
419      job->auto_dismiss  = !(flags & JOB_MANUAL_DISMISS);
420      job->cb            = cb;
421      job->opaque        = opaque;
422  
423      progress_init(&job->progress);
424  
425      notifier_list_init(&job->on_finalize_cancelled);
426      notifier_list_init(&job->on_finalize_completed);
427      notifier_list_init(&job->on_pending);
428      notifier_list_init(&job->on_ready);
429      notifier_list_init(&job->on_idle);
430  
431      job_state_transition_locked(job, JOB_STATUS_CREATED);
432      aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
433                     QEMU_CLOCK_REALTIME, SCALE_NS,
434                     job_sleep_timer_cb, job);
435  
436      QLIST_INSERT_HEAD(&jobs, job, job_list);
437  
438      /* Single jobs are modeled as single-job transactions for sake of
439       * consolidating the job management logic */
440      if (!txn) {
441          txn = job_txn_new();
442          job_txn_add_job_locked(txn, job);
443          job_txn_unref_locked(txn);
444      } else {
445          job_txn_add_job_locked(txn, job);
446      }
447  
448      return job;
449  }
450  
451  void job_ref_locked(Job *job)
452  {
453      ++job->refcnt;
454  }
455  
456  void job_unref_locked(Job *job)
457  {
458      GLOBAL_STATE_CODE();
459  
460      if (--job->refcnt == 0) {
461          assert(job->status == JOB_STATUS_NULL);
462          assert(!timer_pending(&job->sleep_timer));
463          assert(!job->txn);
464  
465          if (job->driver->free) {
466              AioContext *aio_context = job->aio_context;
467              job_unlock();
468              /* FIXME: aiocontext lock is required because cb calls blk_unref */
469              aio_context_acquire(aio_context);
470              job->driver->free(job);
471              aio_context_release(aio_context);
472              job_lock();
473          }
474  
475          QLIST_REMOVE(job, job_list);
476  
477          progress_destroy(&job->progress);
478          error_free(job->err);
479          g_free(job->id);
480          g_free(job);
481      }
482  }
483  
484  void job_progress_update(Job *job, uint64_t done)
485  {
486      progress_work_done(&job->progress, done);
487  }
488  
489  void job_progress_set_remaining(Job *job, uint64_t remaining)
490  {
491      progress_set_remaining(&job->progress, remaining);
492  }
493  
494  void job_progress_increase_remaining(Job *job, uint64_t delta)
495  {
496      progress_increase_remaining(&job->progress, delta);
497  }
498  
499  /**
500   * To be called when a cancelled job is finalised.
501   * Called with job_mutex held.
502   */
503  static void job_event_cancelled_locked(Job *job)
504  {
505      notifier_list_notify(&job->on_finalize_cancelled, job);
506  }
507  
508  /**
509   * To be called when a successfully completed job is finalised.
510   * Called with job_mutex held.
511   */
512  static void job_event_completed_locked(Job *job)
513  {
514      notifier_list_notify(&job->on_finalize_completed, job);
515  }
516  
517  /* Called with job_mutex held. */
518  static void job_event_pending_locked(Job *job)
519  {
520      notifier_list_notify(&job->on_pending, job);
521  }
522  
523  /* Called with job_mutex held. */
524  static void job_event_ready_locked(Job *job)
525  {
526      notifier_list_notify(&job->on_ready, job);
527  }
528  
529  /* Called with job_mutex held. */
530  static void job_event_idle_locked(Job *job)
531  {
532      notifier_list_notify(&job->on_idle, job);
533  }
534  
535  void job_enter_cond_locked(Job *job, bool(*fn)(Job *job))
536  {
537      if (!job_started_locked(job)) {
538          return;
539      }
540      if (job->deferred_to_main_loop) {
541          return;
542      }
543  
544      if (job->busy) {
545          return;
546      }
547  
548      if (fn && !fn(job)) {
549          return;
550      }
551  
552      assert(!job->deferred_to_main_loop);
553      timer_del(&job->sleep_timer);
554      job->busy = true;
555      job_unlock();
556      aio_co_wake(job->co);
557      job_lock();
558  }
559  
560  void job_enter(Job *job)
561  {
562      JOB_LOCK_GUARD();
563      job_enter_cond_locked(job, NULL);
564  }
565  
566  /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
567   * Reentering the job coroutine with job_enter() before the timer has expired
568   * is allowed and cancels the timer.
569   *
570   * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
571   * called explicitly.
572   *
573   * Called with job_mutex held, but releases it temporarily.
574   */
575  static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
576  {
577      AioContext *next_aio_context;
578  
579      if (ns != -1) {
580          timer_mod(&job->sleep_timer, ns);
581      }
582      job->busy = false;
583      job_event_idle_locked(job);
584      job_unlock();
585      qemu_coroutine_yield();
586      job_lock();
587  
588      next_aio_context = job->aio_context;
589      /*
590       * Coroutine has resumed, but in the meanwhile the job AioContext
591       * might have changed via bdrv_try_change_aio_context(), so we need to move
592       * the coroutine too in the new aiocontext.
593       */
594      while (qemu_get_current_aio_context() != next_aio_context) {
595          job_unlock();
596          aio_co_reschedule_self(next_aio_context);
597          job_lock();
598          next_aio_context = job->aio_context;
599      }
600  
601      /* Set by job_enter_cond_locked() before re-entering the coroutine.  */
602      assert(job->busy);
603  }
604  
605  /* Called with job_mutex held, but releases it temporarily. */
606  static void coroutine_fn job_pause_point_locked(Job *job)
607  {
608      assert(job && job_started_locked(job));
609  
610      if (!job_should_pause_locked(job)) {
611          return;
612      }
613      if (job_is_cancelled_locked(job)) {
614          return;
615      }
616  
617      if (job->driver->pause) {
618          job_unlock();
619          job->driver->pause(job);
620          job_lock();
621      }
622  
623      if (job_should_pause_locked(job) && !job_is_cancelled_locked(job)) {
624          JobStatus status = job->status;
625          job_state_transition_locked(job, status == JOB_STATUS_READY
626                                      ? JOB_STATUS_STANDBY
627                                      : JOB_STATUS_PAUSED);
628          job->paused = true;
629          job_do_yield_locked(job, -1);
630          job->paused = false;
631          job_state_transition_locked(job, status);
632      }
633  
634      if (job->driver->resume) {
635          job_unlock();
636          job->driver->resume(job);
637          job_lock();
638      }
639  }
640  
641  void coroutine_fn job_pause_point(Job *job)
642  {
643      JOB_LOCK_GUARD();
644      job_pause_point_locked(job);
645  }
646  
647  void coroutine_fn job_yield(Job *job)
648  {
649      JOB_LOCK_GUARD();
650      assert(job->busy);
651  
652      /* Check cancellation *before* setting busy = false, too!  */
653      if (job_is_cancelled_locked(job)) {
654          return;
655      }
656  
657      if (!job_should_pause_locked(job)) {
658          job_do_yield_locked(job, -1);
659      }
660  
661      job_pause_point_locked(job);
662  }
663  
664  void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
665  {
666      JOB_LOCK_GUARD();
667      assert(job->busy);
668  
669      /* Check cancellation *before* setting busy = false, too!  */
670      if (job_is_cancelled_locked(job)) {
671          return;
672      }
673  
674      if (!job_should_pause_locked(job)) {
675          job_do_yield_locked(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
676      }
677  
678      job_pause_point_locked(job);
679  }
680  
681  /* Assumes the job_mutex is held */
682  static bool job_timer_not_pending_locked(Job *job)
683  {
684      return !timer_pending(&job->sleep_timer);
685  }
686  
687  void job_pause_locked(Job *job)
688  {
689      job->pause_count++;
690      if (!job->paused) {
691          job_enter_cond_locked(job, NULL);
692      }
693  }
694  
695  void job_pause(Job *job)
696  {
697      JOB_LOCK_GUARD();
698      job_pause_locked(job);
699  }
700  
701  void job_resume_locked(Job *job)
702  {
703      assert(job->pause_count > 0);
704      job->pause_count--;
705      if (job->pause_count) {
706          return;
707      }
708  
709      /* kick only if no timer is pending */
710      job_enter_cond_locked(job, job_timer_not_pending_locked);
711  }
712  
713  void job_resume(Job *job)
714  {
715      JOB_LOCK_GUARD();
716      job_resume_locked(job);
717  }
718  
719  void job_user_pause_locked(Job *job, Error **errp)
720  {
721      if (job_apply_verb_locked(job, JOB_VERB_PAUSE, errp)) {
722          return;
723      }
724      if (job->user_paused) {
725          error_setg(errp, "Job is already paused");
726          return;
727      }
728      job->user_paused = true;
729      job_pause_locked(job);
730  }
731  
732  bool job_user_paused_locked(Job *job)
733  {
734      return job->user_paused;
735  }
736  
737  void job_user_resume_locked(Job *job, Error **errp)
738  {
739      assert(job);
740      GLOBAL_STATE_CODE();
741      if (!job->user_paused || job->pause_count <= 0) {
742          error_setg(errp, "Can't resume a job that was not paused");
743          return;
744      }
745      if (job_apply_verb_locked(job, JOB_VERB_RESUME, errp)) {
746          return;
747      }
748      if (job->driver->user_resume) {
749          job_unlock();
750          job->driver->user_resume(job);
751          job_lock();
752      }
753      job->user_paused = false;
754      job_resume_locked(job);
755  }
756  
757  /* Called with job_mutex held, but releases it temporarily. */
758  static void job_do_dismiss_locked(Job *job)
759  {
760      assert(job);
761      job->busy = false;
762      job->paused = false;
763      job->deferred_to_main_loop = true;
764  
765      job_txn_del_job_locked(job);
766  
767      job_state_transition_locked(job, JOB_STATUS_NULL);
768      job_unref_locked(job);
769  }
770  
771  void job_dismiss_locked(Job **jobptr, Error **errp)
772  {
773      Job *job = *jobptr;
774      /* similarly to _complete, this is QMP-interface only. */
775      assert(job->id);
776      if (job_apply_verb_locked(job, JOB_VERB_DISMISS, errp)) {
777          return;
778      }
779  
780      job_do_dismiss_locked(job);
781      *jobptr = NULL;
782  }
783  
784  void job_early_fail(Job *job)
785  {
786      JOB_LOCK_GUARD();
787      assert(job->status == JOB_STATUS_CREATED);
788      job_do_dismiss_locked(job);
789  }
790  
791  /* Called with job_mutex held. */
792  static void job_conclude_locked(Job *job)
793  {
794      job_state_transition_locked(job, JOB_STATUS_CONCLUDED);
795      if (job->auto_dismiss || !job_started_locked(job)) {
796          job_do_dismiss_locked(job);
797      }
798  }
799  
800  /* Called with job_mutex held. */
801  static void job_update_rc_locked(Job *job)
802  {
803      if (!job->ret && job_is_cancelled_locked(job)) {
804          job->ret = -ECANCELED;
805      }
806      if (job->ret) {
807          if (!job->err) {
808              error_setg(&job->err, "%s", strerror(-job->ret));
809          }
810          job_state_transition_locked(job, JOB_STATUS_ABORTING);
811      }
812  }
813  
814  static void job_commit(Job *job)
815  {
816      assert(!job->ret);
817      GLOBAL_STATE_CODE();
818      if (job->driver->commit) {
819          job->driver->commit(job);
820      }
821  }
822  
823  static void job_abort(Job *job)
824  {
825      assert(job->ret);
826      GLOBAL_STATE_CODE();
827      if (job->driver->abort) {
828          job->driver->abort(job);
829      }
830  }
831  
832  static void job_clean(Job *job)
833  {
834      GLOBAL_STATE_CODE();
835      if (job->driver->clean) {
836          job->driver->clean(job);
837      }
838  }
839  
840  /*
841   * Called with job_mutex held, but releases it temporarily.
842   * Takes AioContext lock internally to invoke a job->driver callback.
843   */
844  static int job_finalize_single_locked(Job *job)
845  {
846      int job_ret;
847      AioContext *ctx = job->aio_context;
848  
849      assert(job_is_completed_locked(job));
850  
851      /* Ensure abort is called for late-transactional failures */
852      job_update_rc_locked(job);
853  
854      job_ret = job->ret;
855      job_unlock();
856      aio_context_acquire(ctx);
857  
858      if (!job_ret) {
859          job_commit(job);
860      } else {
861          job_abort(job);
862      }
863      job_clean(job);
864  
865      if (job->cb) {
866          job->cb(job->opaque, job_ret);
867      }
868  
869      aio_context_release(ctx);
870      job_lock();
871  
872      /* Emit events only if we actually started */
873      if (job_started_locked(job)) {
874          if (job_is_cancelled_locked(job)) {
875              job_event_cancelled_locked(job);
876          } else {
877              job_event_completed_locked(job);
878          }
879      }
880  
881      job_txn_del_job_locked(job);
882      job_conclude_locked(job);
883      return 0;
884  }
885  
886  /*
887   * Called with job_mutex held, but releases it temporarily.
888   * Takes AioContext lock internally to invoke a job->driver callback.
889   */
890  static void job_cancel_async_locked(Job *job, bool force)
891  {
892      AioContext *ctx = job->aio_context;
893      GLOBAL_STATE_CODE();
894      if (job->driver->cancel) {
895          job_unlock();
896          aio_context_acquire(ctx);
897          force = job->driver->cancel(job, force);
898          aio_context_release(ctx);
899          job_lock();
900      } else {
901          /* No .cancel() means the job will behave as if force-cancelled */
902          force = true;
903      }
904  
905      if (job->user_paused) {
906          /* Do not call job_enter here, the caller will handle it.  */
907          if (job->driver->user_resume) {
908              job_unlock();
909              job->driver->user_resume(job);
910              job_lock();
911          }
912          job->user_paused = false;
913          assert(job->pause_count > 0);
914          job->pause_count--;
915      }
916  
917      /*
918       * Ignore soft cancel requests after the job is already done
919       * (We will still invoke job->driver->cancel() above, but if the
920       * job driver supports soft cancelling and the job is done, that
921       * should be a no-op, too.  We still call it so it can override
922       * @force.)
923       */
924      if (force || !job->deferred_to_main_loop) {
925          job->cancelled = true;
926          /* To prevent 'force == false' overriding a previous 'force == true' */
927          job->force_cancel |= force;
928      }
929  }
930  
931  /*
932   * Called with job_mutex held, but releases it temporarily.
933   * Takes AioContext lock internally to invoke a job->driver callback.
934   */
935  static void job_completed_txn_abort_locked(Job *job)
936  {
937      JobTxn *txn = job->txn;
938      Job *other_job;
939  
940      if (txn->aborting) {
941          /*
942           * We are cancelled by another job, which will handle everything.
943           */
944          return;
945      }
946      txn->aborting = true;
947      job_txn_ref_locked(txn);
948  
949      job_ref_locked(job);
950  
951      /* Other jobs are effectively cancelled by us, set the status for
952       * them; this job, however, may or may not be cancelled, depending
953       * on the caller, so leave it. */
954      QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
955          if (other_job != job) {
956              /*
957               * This is a transaction: If one job failed, no result will matter.
958               * Therefore, pass force=true to terminate all other jobs as quickly
959               * as possible.
960               */
961              job_cancel_async_locked(other_job, true);
962          }
963      }
964      while (!QLIST_EMPTY(&txn->jobs)) {
965          other_job = QLIST_FIRST(&txn->jobs);
966          if (!job_is_completed_locked(other_job)) {
967              assert(job_cancel_requested_locked(other_job));
968              job_finish_sync_locked(other_job, NULL, NULL);
969          }
970          job_finalize_single_locked(other_job);
971      }
972  
973      job_unref_locked(job);
974      job_txn_unref_locked(txn);
975  }
976  
977  /* Called with job_mutex held, but releases it temporarily */
978  static int job_prepare_locked(Job *job)
979  {
980      int ret;
981      AioContext *ctx = job->aio_context;
982  
983      GLOBAL_STATE_CODE();
984  
985      if (job->ret == 0 && job->driver->prepare) {
986          job_unlock();
987          aio_context_acquire(ctx);
988          ret = job->driver->prepare(job);
989          aio_context_release(ctx);
990          job_lock();
991          job->ret = ret;
992          job_update_rc_locked(job);
993      }
994  
995      return job->ret;
996  }
997  
998  /* Called with job_mutex held */
999  static int job_needs_finalize_locked(Job *job)
1000  {
1001      return !job->auto_finalize;
1002  }
1003  
1004  /* Called with job_mutex held */
1005  static void job_do_finalize_locked(Job *job)
1006  {
1007      int rc;
1008      assert(job && job->txn);
1009  
1010      /* prepare the transaction to complete */
1011      rc = job_txn_apply_locked(job, job_prepare_locked);
1012      if (rc) {
1013          job_completed_txn_abort_locked(job);
1014      } else {
1015          job_txn_apply_locked(job, job_finalize_single_locked);
1016      }
1017  }
1018  
1019  void job_finalize_locked(Job *job, Error **errp)
1020  {
1021      assert(job && job->id);
1022      if (job_apply_verb_locked(job, JOB_VERB_FINALIZE, errp)) {
1023          return;
1024      }
1025      job_do_finalize_locked(job);
1026  }
1027  
1028  /* Called with job_mutex held. */
1029  static int job_transition_to_pending_locked(Job *job)
1030  {
1031      job_state_transition_locked(job, JOB_STATUS_PENDING);
1032      if (!job->auto_finalize) {
1033          job_event_pending_locked(job);
1034      }
1035      return 0;
1036  }
1037  
1038  void job_transition_to_ready(Job *job)
1039  {
1040      JOB_LOCK_GUARD();
1041      job_state_transition_locked(job, JOB_STATUS_READY);
1042      job_event_ready_locked(job);
1043  }
1044  
1045  /* Called with job_mutex held. */
1046  static void job_completed_txn_success_locked(Job *job)
1047  {
1048      JobTxn *txn = job->txn;
1049      Job *other_job;
1050  
1051      job_state_transition_locked(job, JOB_STATUS_WAITING);
1052  
1053      /*
1054       * Successful completion, see if there are other running jobs in this
1055       * txn.
1056       */
1057      QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
1058          if (!job_is_completed_locked(other_job)) {
1059              return;
1060          }
1061          assert(other_job->ret == 0);
1062      }
1063  
1064      job_txn_apply_locked(job, job_transition_to_pending_locked);
1065  
1066      /* If no jobs need manual finalization, automatically do so */
1067      if (job_txn_apply_locked(job, job_needs_finalize_locked) == 0) {
1068          job_do_finalize_locked(job);
1069      }
1070  }
1071  
1072  /* Called with job_mutex held. */
1073  static void job_completed_locked(Job *job)
1074  {
1075      assert(job && job->txn && !job_is_completed_locked(job));
1076  
1077      job_update_rc_locked(job);
1078      trace_job_completed(job, job->ret);
1079      if (job->ret) {
1080          job_completed_txn_abort_locked(job);
1081      } else {
1082          job_completed_txn_success_locked(job);
1083      }
1084  }
1085  
1086  /**
1087   * Useful only as a type shim for aio_bh_schedule_oneshot.
1088   * Called with job_mutex *not* held.
1089   */
1090  static void job_exit(void *opaque)
1091  {
1092      Job *job = (Job *)opaque;
1093      JOB_LOCK_GUARD();
1094      job_ref_locked(job);
1095  
1096      /* This is a lie, we're not quiescent, but still doing the completion
1097       * callbacks. However, completion callbacks tend to involve operations that
1098       * drain block nodes, and if .drained_poll still returned true, we would
1099       * deadlock. */
1100      job->busy = false;
1101      job_event_idle_locked(job);
1102  
1103      job_completed_locked(job);
1104      job_unref_locked(job);
1105  }
1106  
1107  /**
1108   * All jobs must allow a pause point before entering their job proper. This
1109   * ensures that jobs can be paused prior to being started, then resumed later.
1110   */
1111  static void coroutine_fn job_co_entry(void *opaque)
1112  {
1113      Job *job = opaque;
1114      int ret;
1115  
1116      assert(job && job->driver && job->driver->run);
1117      WITH_JOB_LOCK_GUARD() {
1118          assert(job->aio_context == qemu_get_current_aio_context());
1119          job_pause_point_locked(job);
1120      }
1121      ret = job->driver->run(job, &job->err);
1122      WITH_JOB_LOCK_GUARD() {
1123          job->ret = ret;
1124          job->deferred_to_main_loop = true;
1125          job->busy = true;
1126      }
1127      aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
1128  }
1129  
1130  void job_start(Job *job)
1131  {
1132      assert(qemu_in_main_thread());
1133  
1134      WITH_JOB_LOCK_GUARD() {
1135          assert(job && !job_started_locked(job) && job->paused &&
1136              job->driver && job->driver->run);
1137          job->co = qemu_coroutine_create(job_co_entry, job);
1138          job->pause_count--;
1139          job->busy = true;
1140          job->paused = false;
1141          job_state_transition_locked(job, JOB_STATUS_RUNNING);
1142      }
1143      aio_co_enter(job->aio_context, job->co);
1144  }
1145  
1146  void job_cancel_locked(Job *job, bool force)
1147  {
1148      if (job->status == JOB_STATUS_CONCLUDED) {
1149          job_do_dismiss_locked(job);
1150          return;
1151      }
1152      job_cancel_async_locked(job, force);
1153      if (!job_started_locked(job)) {
1154          job_completed_locked(job);
1155      } else if (job->deferred_to_main_loop) {
1156          /*
1157           * job_cancel_async() ignores soft-cancel requests for jobs
1158           * that are already done (i.e. deferred to the main loop).  We
1159           * have to check again whether the job is really cancelled.
1160           * (job_cancel_requested() and job_is_cancelled() are equivalent
1161           * here, because job_cancel_async() will make soft-cancel
1162           * requests no-ops when deferred_to_main_loop is true.  We
1163           * choose to call job_is_cancelled() to show that we invoke
1164           * job_completed_txn_abort() only for force-cancelled jobs.)
1165           */
1166          if (job_is_cancelled_locked(job)) {
1167              job_completed_txn_abort_locked(job);
1168          }
1169      } else {
1170          job_enter_cond_locked(job, NULL);
1171      }
1172  }
1173  
1174  void job_user_cancel_locked(Job *job, bool force, Error **errp)
1175  {
1176      if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
1177          return;
1178      }
1179      job_cancel_locked(job, force);
1180  }
1181  
1182  /* A wrapper around job_cancel_locked() taking an Error ** parameter so it may
1183   * be used with job_finish_sync_locked() without the need for (rather nasty)
1184   * function pointer casts there.
1185   *
1186   * Called with job_mutex held.
1187   */
1188  static void job_cancel_err_locked(Job *job, Error **errp)
1189  {
1190      job_cancel_locked(job, false);
1191  }
1192  
1193  /**
1194   * Same as job_cancel_err(), but force-cancel.
1195   * Called with job_mutex held.
1196   */
1197  static void job_force_cancel_err_locked(Job *job, Error **errp)
1198  {
1199      job_cancel_locked(job, true);
1200  }
1201  
1202  int job_cancel_sync_locked(Job *job, bool force)
1203  {
1204      if (force) {
1205          return job_finish_sync_locked(job, &job_force_cancel_err_locked, NULL);
1206      } else {
1207          return job_finish_sync_locked(job, &job_cancel_err_locked, NULL);
1208      }
1209  }
1210  
1211  int job_cancel_sync(Job *job, bool force)
1212  {
1213      JOB_LOCK_GUARD();
1214      return job_cancel_sync_locked(job, force);
1215  }
1216  
1217  void job_cancel_sync_all(void)
1218  {
1219      Job *job;
1220      JOB_LOCK_GUARD();
1221  
1222      while ((job = job_next_locked(NULL))) {
1223          job_cancel_sync_locked(job, true);
1224      }
1225  }
1226  
1227  int job_complete_sync_locked(Job *job, Error **errp)
1228  {
1229      return job_finish_sync_locked(job, job_complete_locked, errp);
1230  }
1231  
1232  void job_complete_locked(Job *job, Error **errp)
1233  {
1234      /* Should not be reachable via external interface for internal jobs */
1235      assert(job->id);
1236      GLOBAL_STATE_CODE();
1237      if (job_apply_verb_locked(job, JOB_VERB_COMPLETE, errp)) {
1238          return;
1239      }
1240      if (job_cancel_requested_locked(job) || !job->driver->complete) {
1241          error_setg(errp, "The active block job '%s' cannot be completed",
1242                     job->id);
1243          return;
1244      }
1245  
1246      job_unlock();
1247      job->driver->complete(job, errp);
1248      job_lock();
1249  }
1250  
1251  int job_finish_sync_locked(Job *job,
1252                             void (*finish)(Job *, Error **errp),
1253                             Error **errp)
1254  {
1255      Error *local_err = NULL;
1256      int ret;
1257      GLOBAL_STATE_CODE();
1258  
1259      job_ref_locked(job);
1260  
1261      if (finish) {
1262          finish(job, &local_err);
1263      }
1264      if (local_err) {
1265          error_propagate(errp, local_err);
1266          job_unref_locked(job);
1267          return -EBUSY;
1268      }
1269  
1270      job_unlock();
1271      AIO_WAIT_WHILE_UNLOCKED(job->aio_context,
1272                              (job_enter(job), !job_is_completed(job)));
1273      job_lock();
1274  
1275      ret = (job_is_cancelled_locked(job) && job->ret == 0)
1276            ? -ECANCELED : job->ret;
1277      job_unref_locked(job);
1278      return ret;
1279  }
1280