xref: /openbmc/qemu/block/quorum.c (revision b6235a759a4552d21c5b68d16c894aa5b96d4b96)
1  /*
2   * Quorum Block filter
3   *
4   * Copyright (C) 2012-2014 Nodalink, EURL.
5   *
6   * Author:
7   *   BenoĆ®t Canet <benoit.canet@irqsave.net>
8   *
9   * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp)
10   * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc).
11   *
12   * This work is licensed under the terms of the GNU GPL, version 2 or later.
13   * See the COPYING file in the top-level directory.
14   */
15  
16  #include "qemu/osdep.h"
17  #include "qemu/cutils.h"
18  #include "qemu/module.h"
19  #include "qemu/option.h"
20  #include "qemu/memalign.h"
21  #include "block/block_int.h"
22  #include "block/coroutines.h"
23  #include "block/qdict.h"
24  #include "qapi/error.h"
25  #include "qapi/qapi-events-block.h"
26  #include "qapi/qmp/qdict.h"
27  #include "qapi/qmp/qerror.h"
28  #include "qapi/qmp/qlist.h"
29  #include "qapi/qmp/qstring.h"
30  #include "crypto/hash.h"
31  
32  #define HASH_LENGTH 32
33  
34  #define INDEXSTR_LEN 32
35  
36  #define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold"
37  #define QUORUM_OPT_BLKVERIFY      "blkverify"
38  #define QUORUM_OPT_REWRITE        "rewrite-corrupted"
39  #define QUORUM_OPT_READ_PATTERN   "read-pattern"
40  
41  /* This union holds a vote hash value */
42  typedef union QuorumVoteValue {
43      uint8_t h[HASH_LENGTH];    /* SHA-256 hash */
44      int64_t l;                 /* simpler 64 bits hash */
45  } QuorumVoteValue;
46  
47  /* A vote item */
48  typedef struct QuorumVoteItem {
49      int index;
50      QLIST_ENTRY(QuorumVoteItem) next;
51  } QuorumVoteItem;
52  
53  /* this structure is a vote version. A version is the set of votes sharing the
54   * same vote value.
55   * The set of votes will be tracked with the items field and its cardinality is
56   * vote_count.
57   */
58  typedef struct QuorumVoteVersion {
59      QuorumVoteValue value;
60      int index;
61      int vote_count;
62      QLIST_HEAD(, QuorumVoteItem) items;
63      QLIST_ENTRY(QuorumVoteVersion) next;
64  } QuorumVoteVersion;
65  
66  /* this structure holds a group of vote versions together */
67  typedef struct QuorumVotes {
68      QLIST_HEAD(, QuorumVoteVersion) vote_list;
69      bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b);
70  } QuorumVotes;
71  
72  /* the following structure holds the state of one quorum instance */
73  typedef struct BDRVQuorumState {
74      BdrvChild **children;  /* children BlockDriverStates */
75      int num_children;      /* children count */
76      unsigned next_child_index;  /* the index of the next child that should
77                                   * be added
78                                   */
79      int threshold;         /* if less than threshold children reads gave the
80                              * same result a quorum error occurs.
81                              */
82      bool is_blkverify;     /* true if the driver is in blkverify mode
83                              * Writes are mirrored on two children devices.
84                              * On reads the two children devices' contents are
85                              * compared and if a difference is spotted its
86                              * location is printed and the code aborts.
87                              * It is useful to debug other block drivers by
88                              * comparing them with a reference one.
89                              */
90      bool rewrite_corrupted;/* true if the driver must rewrite-on-read corrupted
91                              * block if Quorum is reached.
92                              */
93  
94      QuorumReadPattern read_pattern;
95  } BDRVQuorumState;
96  
97  typedef struct QuorumAIOCB QuorumAIOCB;
98  
99  /* Quorum will create one instance of the following structure per operation it
100   * performs on its children.
101   * So for each read/write operation coming from the upper layer there will be
102   * $children_count QuorumChildRequest.
103   */
104  typedef struct QuorumChildRequest {
105      BlockDriverState *bs;
106      QEMUIOVector qiov;
107      uint8_t *buf;
108      int ret;
109      QuorumAIOCB *parent;
110  } QuorumChildRequest;
111  
112  /* Quorum will use the following structure to track progress of each read/write
113   * operation received by the upper layer.
114   * This structure hold pointers to the QuorumChildRequest structures instances
115   * used to do operations on each children and track overall progress.
116   */
117  struct QuorumAIOCB {
118      BlockDriverState *bs;
119      Coroutine *co;
120  
121      /* Request metadata */
122      uint64_t offset;
123      uint64_t bytes;
124      int flags;
125  
126      QEMUIOVector *qiov;         /* calling IOV */
127  
128      QuorumChildRequest *qcrs;   /* individual child requests */
129      int count;                  /* number of completed AIOCB */
130      int success_count;          /* number of successfully completed AIOCB */
131  
132      int rewrite_count;          /* number of replica to rewrite: count down to
133                                   * zero once writes are fired
134                                   */
135  
136      QuorumVotes votes;
137  
138      bool is_read;
139      int vote_ret;
140      int children_read;          /* how many children have been read from */
141  };
142  
143  typedef struct QuorumCo {
144      QuorumAIOCB *acb;
145      int idx;
146  } QuorumCo;
147  
148  static void quorum_aio_finalize(QuorumAIOCB *acb)
149  {
150      g_free(acb->qcrs);
151      g_free(acb);
152  }
153  
154  static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b)
155  {
156      return !memcmp(a->h, b->h, HASH_LENGTH);
157  }
158  
159  static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b)
160  {
161      return a->l == b->l;
162  }
163  
164  static QuorumAIOCB *coroutine_fn quorum_aio_get(BlockDriverState *bs,
165                                                  QEMUIOVector *qiov,
166                                                  uint64_t offset, uint64_t bytes,
167                                                  int flags)
168  {
169      BDRVQuorumState *s = bs->opaque;
170      QuorumAIOCB *acb = g_new(QuorumAIOCB, 1);
171      int i;
172  
173      *acb = (QuorumAIOCB) {
174          .co                 = qemu_coroutine_self(),
175          .bs                 = bs,
176          .offset             = offset,
177          .bytes              = bytes,
178          .flags              = flags,
179          .qiov               = qiov,
180          .votes.compare      = quorum_sha256_compare,
181          .votes.vote_list    = QLIST_HEAD_INITIALIZER(acb.votes.vote_list),
182      };
183  
184      acb->qcrs = g_new0(QuorumChildRequest, s->num_children);
185      for (i = 0; i < s->num_children; i++) {
186          acb->qcrs[i].buf = NULL;
187          acb->qcrs[i].ret = 0;
188          acb->qcrs[i].parent = acb;
189      }
190  
191      return acb;
192  }
193  
194  static void quorum_report_bad(QuorumOpType type, uint64_t offset,
195                                uint64_t bytes, char *node_name, int ret)
196  {
197      const char *msg = NULL;
198      int64_t start_sector = offset / BDRV_SECTOR_SIZE;
199      int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE);
200  
201      if (ret < 0) {
202          msg = strerror(-ret);
203      }
204  
205      qapi_event_send_quorum_report_bad(type, msg, node_name, start_sector,
206                                        end_sector - start_sector);
207  }
208  
209  static void quorum_report_failure(QuorumAIOCB *acb)
210  {
211      const char *reference = bdrv_get_device_or_node_name(acb->bs);
212      int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE;
213      int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes,
214                                        BDRV_SECTOR_SIZE);
215  
216      qapi_event_send_quorum_failure(reference, start_sector,
217                                     end_sector - start_sector);
218  }
219  
220  static int quorum_vote_error(QuorumAIOCB *acb);
221  
222  static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb)
223  {
224      BDRVQuorumState *s = acb->bs->opaque;
225  
226      if (acb->success_count < s->threshold) {
227          acb->vote_ret = quorum_vote_error(acb);
228          quorum_report_failure(acb);
229          return true;
230      }
231  
232      return false;
233  }
234  
235  static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
236  {
237      int i;
238      assert(dest->niov == source->niov);
239      assert(dest->size == source->size);
240      for (i = 0; i < source->niov; i++) {
241          assert(dest->iov[i].iov_len == source->iov[i].iov_len);
242          memcpy(dest->iov[i].iov_base,
243                 source->iov[i].iov_base,
244                 source->iov[i].iov_len);
245      }
246  }
247  
248  static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret)
249  {
250      QuorumAIOCB *acb = sacb->parent;
251      QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE;
252      quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret);
253  }
254  
255  static void quorum_report_bad_versions(BDRVQuorumState *s,
256                                         QuorumAIOCB *acb,
257                                         QuorumVoteValue *value)
258  {
259      QuorumVoteVersion *version;
260      QuorumVoteItem *item;
261  
262      QLIST_FOREACH(version, &acb->votes.vote_list, next) {
263          if (acb->votes.compare(&version->value, value)) {
264              continue;
265          }
266          QLIST_FOREACH(item, &version->items, next) {
267              quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes,
268                                s->children[item->index]->bs->node_name, 0);
269          }
270      }
271  }
272  
273  /*
274   * This function can count as GRAPH_RDLOCK because read_quorum_children() holds
275   * the graph lock and keeps it until this coroutine has terminated.
276   */
277  static void coroutine_fn GRAPH_RDLOCK quorum_rewrite_entry(void *opaque)
278  {
279      QuorumCo *co = opaque;
280      QuorumAIOCB *acb = co->acb;
281      BDRVQuorumState *s = acb->bs->opaque;
282  
283      /* Ignore any errors, it's just a correction attempt for already
284       * corrupted data.
285       * Mask out BDRV_REQ_WRITE_UNCHANGED because this overwrites the
286       * area with different data from the other children. */
287      bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes,
288                      acb->qiov, acb->flags & ~BDRV_REQ_WRITE_UNCHANGED);
289  
290      /* Wake up the caller after the last rewrite */
291      acb->rewrite_count--;
292      if (!acb->rewrite_count) {
293          qemu_coroutine_enter_if_inactive(acb->co);
294      }
295  }
296  
297  static bool coroutine_fn GRAPH_RDLOCK
298  quorum_rewrite_bad_versions(QuorumAIOCB *acb, QuorumVoteValue *value)
299  {
300      QuorumVoteVersion *version;
301      QuorumVoteItem *item;
302      int count = 0;
303  
304      /* first count the number of bad versions: done first to avoid concurrency
305       * issues.
306       */
307      QLIST_FOREACH(version, &acb->votes.vote_list, next) {
308          if (acb->votes.compare(&version->value, value)) {
309              continue;
310          }
311          QLIST_FOREACH(item, &version->items, next) {
312              count++;
313          }
314      }
315  
316      /* quorum_rewrite_entry will count down this to zero */
317      acb->rewrite_count = count;
318  
319      /* now fire the correcting rewrites */
320      QLIST_FOREACH(version, &acb->votes.vote_list, next) {
321          if (acb->votes.compare(&version->value, value)) {
322              continue;
323          }
324          QLIST_FOREACH(item, &version->items, next) {
325              Coroutine *co;
326              QuorumCo data = {
327                  .acb = acb,
328                  .idx = item->index,
329              };
330  
331              co = qemu_coroutine_create(quorum_rewrite_entry, &data);
332              qemu_coroutine_enter(co);
333          }
334      }
335  
336      /* return true if any rewrite is done else false */
337      return count;
338  }
339  
340  static void quorum_count_vote(QuorumVotes *votes,
341                                QuorumVoteValue *value,
342                                int index)
343  {
344      QuorumVoteVersion *v = NULL, *version = NULL;
345      QuorumVoteItem *item;
346  
347      /* look if we have something with this hash */
348      QLIST_FOREACH(v, &votes->vote_list, next) {
349          if (votes->compare(&v->value, value)) {
350              version = v;
351              break;
352          }
353      }
354  
355      /* It's a version not yet in the list add it */
356      if (!version) {
357          version = g_new0(QuorumVoteVersion, 1);
358          QLIST_INIT(&version->items);
359          memcpy(&version->value, value, sizeof(version->value));
360          version->index = index;
361          version->vote_count = 0;
362          QLIST_INSERT_HEAD(&votes->vote_list, version, next);
363      }
364  
365      version->vote_count++;
366  
367      item = g_new0(QuorumVoteItem, 1);
368      item->index = index;
369      QLIST_INSERT_HEAD(&version->items, item, next);
370  }
371  
372  static void quorum_free_vote_list(QuorumVotes *votes)
373  {
374      QuorumVoteVersion *version, *next_version;
375      QuorumVoteItem *item, *next_item;
376  
377      QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
378          QLIST_REMOVE(version, next);
379          QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
380              QLIST_REMOVE(item, next);
381              g_free(item);
382          }
383          g_free(version);
384      }
385  }
386  
387  static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash)
388  {
389      QEMUIOVector *qiov = &acb->qcrs[i].qiov;
390      size_t len = sizeof(hash->h);
391      uint8_t *data = hash->h;
392  
393      /* XXX - would be nice if we could pass in the Error **
394       * and propagate that back, but this quorum code is
395       * restricted to just errno values currently */
396      if (qcrypto_hash_bytesv(QCRYPTO_HASH_ALG_SHA256,
397                              qiov->iov, qiov->niov,
398                              &data, &len,
399                              NULL) < 0) {
400          return -EINVAL;
401      }
402  
403      return 0;
404  }
405  
406  static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes)
407  {
408      int max = 0;
409      QuorumVoteVersion *candidate, *winner = NULL;
410  
411      QLIST_FOREACH(candidate, &votes->vote_list, next) {
412          if (candidate->vote_count > max) {
413              max = candidate->vote_count;
414              winner = candidate;
415          }
416      }
417  
418      return winner;
419  }
420  
421  /* qemu_iovec_compare is handy for blkverify mode because it returns the first
422   * differing byte location. Yet it is handcoded to compare vectors one byte
423   * after another so it does not benefit from the libc SIMD optimizations.
424   * quorum_iovec_compare is written for speed and should be used in the non
425   * blkverify mode of quorum.
426   */
427  static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b)
428  {
429      int i;
430      int result;
431  
432      assert(a->niov == b->niov);
433      for (i = 0; i < a->niov; i++) {
434          assert(a->iov[i].iov_len == b->iov[i].iov_len);
435          result = memcmp(a->iov[i].iov_base,
436                          b->iov[i].iov_base,
437                          a->iov[i].iov_len);
438          if (result) {
439              return false;
440          }
441      }
442  
443      return true;
444  }
445  
446  static bool quorum_compare(QuorumAIOCB *acb, QEMUIOVector *a, QEMUIOVector *b)
447  {
448      BDRVQuorumState *s = acb->bs->opaque;
449      ssize_t offset;
450  
451      /* This driver will replace blkverify in this particular case */
452      if (s->is_blkverify) {
453          offset = qemu_iovec_compare(a, b);
454          if (offset != -1) {
455              fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64
456                      " contents mismatch at offset %" PRIu64 "\n",
457                      acb->offset, acb->bytes, acb->offset + offset);
458              exit(1);
459          }
460          return true;
461      }
462  
463      return quorum_iovec_compare(a, b);
464  }
465  
466  /* Do a vote to get the error code */
467  static int quorum_vote_error(QuorumAIOCB *acb)
468  {
469      BDRVQuorumState *s = acb->bs->opaque;
470      QuorumVoteVersion *winner = NULL;
471      QuorumVotes error_votes;
472      QuorumVoteValue result_value;
473      int i, ret = 0;
474      bool error = false;
475  
476      QLIST_INIT(&error_votes.vote_list);
477      error_votes.compare = quorum_64bits_compare;
478  
479      for (i = 0; i < s->num_children; i++) {
480          ret = acb->qcrs[i].ret;
481          if (ret) {
482              error = true;
483              result_value.l = ret;
484              quorum_count_vote(&error_votes, &result_value, i);
485          }
486      }
487  
488      if (error) {
489          winner = quorum_get_vote_winner(&error_votes);
490          ret = winner->value.l;
491      }
492  
493      quorum_free_vote_list(&error_votes);
494  
495      return ret;
496  }
497  
498  static void coroutine_fn GRAPH_RDLOCK quorum_vote(QuorumAIOCB *acb)
499  {
500      bool quorum = true;
501      int i, j, ret;
502      QuorumVoteValue hash;
503      BDRVQuorumState *s = acb->bs->opaque;
504      QuorumVoteVersion *winner;
505  
506      if (quorum_has_too_much_io_failed(acb)) {
507          return;
508      }
509  
510      /* get the index of the first successful read */
511      for (i = 0; i < s->num_children; i++) {
512          if (!acb->qcrs[i].ret) {
513              break;
514          }
515      }
516  
517      assert(i < s->num_children);
518  
519      /* compare this read with all other successful reads stopping at quorum
520       * failure
521       */
522      for (j = i + 1; j < s->num_children; j++) {
523          if (acb->qcrs[j].ret) {
524              continue;
525          }
526          quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov);
527          if (!quorum) {
528              break;
529         }
530      }
531  
532      /* Every successful read agrees */
533      if (quorum) {
534          quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov);
535          return;
536      }
537  
538      /* compute hashes for each successful read, also store indexes */
539      for (i = 0; i < s->num_children; i++) {
540          if (acb->qcrs[i].ret) {
541              continue;
542          }
543          ret = quorum_compute_hash(acb, i, &hash);
544          /* if ever the hash computation failed */
545          if (ret < 0) {
546              acb->vote_ret = ret;
547              goto free_exit;
548          }
549          quorum_count_vote(&acb->votes, &hash, i);
550      }
551  
552      /* vote to select the most represented version */
553      winner = quorum_get_vote_winner(&acb->votes);
554  
555      /* if the winner count is smaller than threshold the read fails */
556      if (winner->vote_count < s->threshold) {
557          quorum_report_failure(acb);
558          acb->vote_ret = -EIO;
559          goto free_exit;
560      }
561  
562      /* we have a winner: copy it */
563      quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov);
564  
565      /* some versions are bad print them */
566      quorum_report_bad_versions(s, acb, &winner->value);
567  
568      /* corruption correction is enabled */
569      if (s->rewrite_corrupted) {
570          quorum_rewrite_bad_versions(acb, &winner->value);
571      }
572  
573  free_exit:
574      /* free lists */
575      quorum_free_vote_list(&acb->votes);
576  }
577  
578  /*
579   * This function can count as GRAPH_RDLOCK because read_quorum_children() holds
580   * the graph lock and keeps it until this coroutine has terminated.
581   */
582  static void coroutine_fn GRAPH_RDLOCK read_quorum_children_entry(void *opaque)
583  {
584      QuorumCo *co = opaque;
585      QuorumAIOCB *acb = co->acb;
586      BDRVQuorumState *s = acb->bs->opaque;
587      int i = co->idx;
588      QuorumChildRequest *sacb = &acb->qcrs[i];
589  
590      sacb->bs = s->children[i]->bs;
591      sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes,
592                                 &acb->qcrs[i].qiov, 0);
593  
594      if (sacb->ret == 0) {
595          acb->success_count++;
596      } else {
597          quorum_report_bad_acb(sacb, sacb->ret);
598      }
599  
600      acb->count++;
601      assert(acb->count <= s->num_children);
602      assert(acb->success_count <= s->num_children);
603  
604      /* Wake up the caller after the last read */
605      if (acb->count == s->num_children) {
606          qemu_coroutine_enter_if_inactive(acb->co);
607      }
608  }
609  
610  static int coroutine_fn GRAPH_RDLOCK read_quorum_children(QuorumAIOCB *acb)
611  {
612      BDRVQuorumState *s = acb->bs->opaque;
613      int i;
614  
615      acb->children_read = s->num_children;
616      for (i = 0; i < s->num_children; i++) {
617          acb->qcrs[i].buf = qemu_blockalign(s->children[i]->bs, acb->qiov->size);
618          qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov);
619          qemu_iovec_clone(&acb->qcrs[i].qiov, acb->qiov, acb->qcrs[i].buf);
620      }
621  
622      for (i = 0; i < s->num_children; i++) {
623          Coroutine *co;
624          QuorumCo data = {
625              .acb = acb,
626              .idx = i,
627          };
628  
629          co = qemu_coroutine_create(read_quorum_children_entry, &data);
630          qemu_coroutine_enter(co);
631      }
632  
633      while (acb->count < s->num_children) {
634          qemu_coroutine_yield();
635      }
636  
637      /* Do the vote on read */
638      quorum_vote(acb);
639      for (i = 0; i < s->num_children; i++) {
640          qemu_vfree(acb->qcrs[i].buf);
641          qemu_iovec_destroy(&acb->qcrs[i].qiov);
642      }
643  
644      while (acb->rewrite_count) {
645          qemu_coroutine_yield();
646      }
647  
648      return acb->vote_ret;
649  }
650  
651  static int coroutine_fn GRAPH_RDLOCK read_fifo_child(QuorumAIOCB *acb)
652  {
653      BDRVQuorumState *s = acb->bs->opaque;
654      int n, ret;
655  
656      /* We try to read the next child in FIFO order if we failed to read */
657      do {
658          n = acb->children_read++;
659          acb->qcrs[n].bs = s->children[n]->bs;
660          ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes,
661                               acb->qiov, 0);
662          if (ret < 0) {
663              quorum_report_bad_acb(&acb->qcrs[n], ret);
664          }
665      } while (ret < 0 && acb->children_read < s->num_children);
666  
667      /* FIXME: rewrite failed children if acb->children_read > 1? */
668  
669      return ret;
670  }
671  
672  static int coroutine_fn GRAPH_RDLOCK
673  quorum_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
674                   QEMUIOVector *qiov, BdrvRequestFlags flags)
675  {
676      BDRVQuorumState *s = bs->opaque;
677      QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
678      int ret;
679  
680      acb->is_read = true;
681      acb->children_read = 0;
682  
683      if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
684          ret = read_quorum_children(acb);
685      } else {
686          ret = read_fifo_child(acb);
687      }
688      quorum_aio_finalize(acb);
689  
690      return ret;
691  }
692  
693  /*
694   * This function can count as GRAPH_RDLOCK because quorum_co_pwritev() holds the
695   * graph lock and keeps it until this coroutine has terminated.
696   */
697  static void coroutine_fn GRAPH_RDLOCK write_quorum_entry(void *opaque)
698  {
699      QuorumCo *co = opaque;
700      QuorumAIOCB *acb = co->acb;
701      BDRVQuorumState *s = acb->bs->opaque;
702      int i = co->idx;
703      QuorumChildRequest *sacb = &acb->qcrs[i];
704  
705      sacb->bs = s->children[i]->bs;
706      if (acb->flags & BDRV_REQ_ZERO_WRITE) {
707          sacb->ret = bdrv_co_pwrite_zeroes(s->children[i], acb->offset,
708                                            acb->bytes, acb->flags);
709      } else {
710          sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes,
711                                      acb->qiov, acb->flags);
712      }
713      if (sacb->ret == 0) {
714          acb->success_count++;
715      } else {
716          quorum_report_bad_acb(sacb, sacb->ret);
717      }
718      acb->count++;
719      assert(acb->count <= s->num_children);
720      assert(acb->success_count <= s->num_children);
721  
722      /* Wake up the caller after the last write */
723      if (acb->count == s->num_children) {
724          qemu_coroutine_enter_if_inactive(acb->co);
725      }
726  }
727  
728  static int coroutine_fn GRAPH_RDLOCK
729  quorum_co_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
730                    QEMUIOVector *qiov, BdrvRequestFlags flags)
731  {
732      BDRVQuorumState *s = bs->opaque;
733      QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags);
734      int i, ret;
735  
736      for (i = 0; i < s->num_children; i++) {
737          Coroutine *co;
738          QuorumCo data = {
739              .acb = acb,
740              .idx = i,
741          };
742  
743          co = qemu_coroutine_create(write_quorum_entry, &data);
744          qemu_coroutine_enter(co);
745      }
746  
747      while (acb->count < s->num_children) {
748          qemu_coroutine_yield();
749      }
750  
751      quorum_has_too_much_io_failed(acb);
752  
753      ret = acb->vote_ret;
754      quorum_aio_finalize(acb);
755  
756      return ret;
757  }
758  
759  static int coroutine_fn GRAPH_RDLOCK
760  quorum_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int64_t bytes,
761                          BdrvRequestFlags flags)
762  {
763      return quorum_co_pwritev(bs, offset, bytes, NULL,
764                               flags | BDRV_REQ_ZERO_WRITE);
765  }
766  
767  static int64_t coroutine_fn GRAPH_RDLOCK
768  quorum_co_getlength(BlockDriverState *bs)
769  {
770      BDRVQuorumState *s = bs->opaque;
771      int64_t result;
772      int i;
773  
774      /* check that all file have the same length */
775      result = bdrv_co_getlength(s->children[0]->bs);
776      if (result < 0) {
777          return result;
778      }
779      for (i = 1; i < s->num_children; i++) {
780          int64_t value = bdrv_co_getlength(s->children[i]->bs);
781          if (value < 0) {
782              return value;
783          }
784          if (value != result) {
785              return -EIO;
786          }
787      }
788  
789      return result;
790  }
791  
792  static coroutine_fn GRAPH_RDLOCK int quorum_co_flush(BlockDriverState *bs)
793  {
794      BDRVQuorumState *s = bs->opaque;
795      QuorumVoteVersion *winner = NULL;
796      QuorumVotes error_votes;
797      QuorumVoteValue result_value;
798      int i;
799      int result = 0;
800      int success_count = 0;
801  
802      QLIST_INIT(&error_votes.vote_list);
803      error_votes.compare = quorum_64bits_compare;
804  
805      for (i = 0; i < s->num_children; i++) {
806          result = bdrv_co_flush(s->children[i]->bs);
807          if (result) {
808              quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0, 0,
809                                s->children[i]->bs->node_name, result);
810              result_value.l = result;
811              quorum_count_vote(&error_votes, &result_value, i);
812          } else {
813              success_count++;
814          }
815      }
816  
817      if (success_count >= s->threshold) {
818          result = 0;
819      } else {
820          winner = quorum_get_vote_winner(&error_votes);
821          result = winner->value.l;
822      }
823      quorum_free_vote_list(&error_votes);
824  
825      return result;
826  }
827  
828  static bool GRAPH_RDLOCK
829  quorum_recurse_can_replace(BlockDriverState *bs, BlockDriverState *to_replace)
830  {
831      BDRVQuorumState *s = bs->opaque;
832      int i;
833  
834      for (i = 0; i < s->num_children; i++) {
835          /*
836           * We have no idea whether our children show the same data as
837           * this node (@bs).  It is actually highly likely that
838           * @to_replace does not, because replacing a broken child is
839           * one of the main use cases here.
840           *
841           * We do know that the new BDS will match @bs, so replacing
842           * any of our children by it will be safe.  It cannot change
843           * the data this quorum node presents to its parents.
844           *
845           * However, replacing @to_replace by @bs in any of our
846           * children's chains may change visible data somewhere in
847           * there.  We therefore cannot recurse down those chains with
848           * bdrv_recurse_can_replace().
849           * (More formally, bdrv_recurse_can_replace() requires that
850           * @to_replace will be replaced by something matching the @bs
851           * passed to it.  We cannot guarantee that.)
852           *
853           * Thus, we can only check whether any of our immediate
854           * children matches @to_replace.
855           *
856           * (In the future, we might add a function to recurse down a
857           * chain that checks that nothing there cares about a change
858           * in data from the respective child in question.  For
859           * example, most filters do not care when their child's data
860           * suddenly changes, as long as their parents do not care.)
861           */
862          if (s->children[i]->bs == to_replace) {
863              /*
864               * We now have to ensure that there is no other parent
865               * that cares about replacing this child by a node with
866               * potentially different data.
867               * We do so by checking whether there are any other parents
868               * at all, which is stricter than necessary, but also very
869               * simple.  (We may decide to implement something more
870               * complex and permissive when there is an actual need for
871               * it.)
872               */
873              return QLIST_FIRST(&to_replace->parents) == s->children[i] &&
874                  QLIST_NEXT(s->children[i], next_parent) == NULL;
875          }
876      }
877  
878      return false;
879  }
880  
881  static int quorum_valid_threshold(int threshold, int num_children, Error **errp)
882  {
883  
884      if (threshold < 1) {
885          error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
886                     "vote-threshold", "a value >= 1");
887          return -ERANGE;
888      }
889  
890      if (threshold > num_children) {
891          error_setg(errp, "threshold may not exceed children count");
892          return -ERANGE;
893      }
894  
895      return 0;
896  }
897  
898  static QemuOptsList quorum_runtime_opts = {
899      .name = "quorum",
900      .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head),
901      .desc = {
902          {
903              .name = QUORUM_OPT_VOTE_THRESHOLD,
904              .type = QEMU_OPT_NUMBER,
905              .help = "The number of vote needed for reaching quorum",
906          },
907          {
908              .name = QUORUM_OPT_BLKVERIFY,
909              .type = QEMU_OPT_BOOL,
910              .help = "Trigger block verify mode if set",
911          },
912          {
913              .name = QUORUM_OPT_REWRITE,
914              .type = QEMU_OPT_BOOL,
915              .help = "Rewrite corrupted block on read quorum",
916          },
917          {
918              .name = QUORUM_OPT_READ_PATTERN,
919              .type = QEMU_OPT_STRING,
920              .help = "Allowed pattern: quorum, fifo. Quorum is default",
921          },
922          { /* end of list */ }
923      },
924  };
925  
926  static void quorum_refresh_flags(BlockDriverState *bs)
927  {
928      BDRVQuorumState *s = bs->opaque;
929      int i;
930  
931      bs->supported_zero_flags =
932          BDRV_REQ_FUA | BDRV_REQ_MAY_UNMAP | BDRV_REQ_NO_FALLBACK;
933  
934      for (i = 0; i < s->num_children; i++) {
935          bs->supported_zero_flags &= s->children[i]->bs->supported_zero_flags;
936      }
937  
938      bs->supported_zero_flags |= BDRV_REQ_WRITE_UNCHANGED;
939  }
940  
941  static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
942                         Error **errp)
943  {
944      BDRVQuorumState *s = bs->opaque;
945      QemuOpts *opts = NULL;
946      const char *pattern_str;
947      bool *opened;
948      int i;
949      int ret = 0;
950  
951      qdict_flatten(options);
952  
953      /* count how many different children are present */
954      s->num_children = qdict_array_entries(options, "children.");
955      if (s->num_children < 0) {
956          error_setg(errp, "Option children is not a valid array");
957          ret = -EINVAL;
958          goto exit;
959      }
960      if (s->num_children < 1) {
961          error_setg(errp, "Number of provided children must be 1 or more");
962          ret = -EINVAL;
963          goto exit;
964      }
965  
966      opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort);
967      if (!qemu_opts_absorb_qdict(opts, options, errp)) {
968          ret = -EINVAL;
969          goto exit;
970      }
971  
972      s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0);
973      /* and validate it against s->num_children */
974      ret = quorum_valid_threshold(s->threshold, s->num_children, errp);
975      if (ret < 0) {
976          goto exit;
977      }
978  
979      pattern_str = qemu_opt_get(opts, QUORUM_OPT_READ_PATTERN);
980      if (!pattern_str) {
981          ret = QUORUM_READ_PATTERN_QUORUM;
982      } else {
983          ret = qapi_enum_parse(&QuorumReadPattern_lookup, pattern_str,
984                                -EINVAL, NULL);
985      }
986      if (ret < 0) {
987          error_setg(errp, "Please set read-pattern as fifo or quorum");
988          goto exit;
989      }
990      s->read_pattern = ret;
991  
992      if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) {
993          s->is_blkverify = qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false);
994          if (s->is_blkverify && (s->num_children != 2 || s->threshold != 2)) {
995              error_setg(errp, "blkverify=on can only be set if there are "
996                         "exactly two files and vote-threshold is 2");
997              ret = -EINVAL;
998              goto exit;
999          }
1000  
1001          s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE,
1002                                                   false);
1003          if (s->rewrite_corrupted && s->is_blkverify) {
1004              error_setg(errp,
1005                         "rewrite-corrupted=on cannot be used with blkverify=on");
1006              ret = -EINVAL;
1007              goto exit;
1008          }
1009      }
1010  
1011      /* allocate the children array */
1012      s->children = g_new0(BdrvChild *, s->num_children);
1013      opened = g_new0(bool, s->num_children);
1014  
1015      for (i = 0; i < s->num_children; i++) {
1016          char indexstr[INDEXSTR_LEN];
1017          ret = snprintf(indexstr, INDEXSTR_LEN, "children.%d", i);
1018          assert(ret < INDEXSTR_LEN);
1019  
1020          s->children[i] = bdrv_open_child(NULL, options, indexstr, bs,
1021                                           &child_of_bds, BDRV_CHILD_DATA, false,
1022                                           errp);
1023          if (!s->children[i]) {
1024              ret = -EINVAL;
1025              goto close_exit;
1026          }
1027  
1028          opened[i] = true;
1029      }
1030      s->next_child_index = s->num_children;
1031  
1032      bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
1033      quorum_refresh_flags(bs);
1034  
1035      g_free(opened);
1036      goto exit;
1037  
1038  close_exit:
1039      /* cleanup on error */
1040      for (i = 0; i < s->num_children; i++) {
1041          if (!opened[i]) {
1042              continue;
1043          }
1044          bdrv_unref_child(bs, s->children[i]);
1045      }
1046      g_free(s->children);
1047      g_free(opened);
1048  exit:
1049      qemu_opts_del(opts);
1050      return ret;
1051  }
1052  
1053  static void quorum_close(BlockDriverState *bs)
1054  {
1055      BDRVQuorumState *s = bs->opaque;
1056      int i;
1057  
1058      for (i = 0; i < s->num_children; i++) {
1059          bdrv_unref_child(bs, s->children[i]);
1060      }
1061  
1062      g_free(s->children);
1063  }
1064  
1065  static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs,
1066                               Error **errp)
1067  {
1068      BDRVQuorumState *s = bs->opaque;
1069      BdrvChild *child;
1070      char indexstr[INDEXSTR_LEN];
1071      int ret;
1072  
1073      if (s->is_blkverify) {
1074          error_setg(errp, "Cannot add a child to a quorum in blkverify mode");
1075          return;
1076      }
1077  
1078      assert(s->num_children <= INT_MAX / sizeof(BdrvChild *));
1079      if (s->num_children == INT_MAX / sizeof(BdrvChild *) ||
1080          s->next_child_index == UINT_MAX) {
1081          error_setg(errp, "Too many children");
1082          return;
1083      }
1084  
1085      ret = snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index);
1086      if (ret < 0 || ret >= INDEXSTR_LEN) {
1087          error_setg(errp, "cannot generate child name");
1088          return;
1089      }
1090      s->next_child_index++;
1091  
1092      bdrv_drained_begin(bs);
1093  
1094      /* We can safely add the child now */
1095      bdrv_ref(child_bs);
1096  
1097      child = bdrv_attach_child(bs, child_bs, indexstr, &child_of_bds,
1098                                BDRV_CHILD_DATA, errp);
1099      if (child == NULL) {
1100          s->next_child_index--;
1101          goto out;
1102      }
1103      s->children = g_renew(BdrvChild *, s->children, s->num_children + 1);
1104      s->children[s->num_children++] = child;
1105      quorum_refresh_flags(bs);
1106  
1107  out:
1108      bdrv_drained_end(bs);
1109  }
1110  
1111  static void quorum_del_child(BlockDriverState *bs, BdrvChild *child,
1112                               Error **errp)
1113  {
1114      BDRVQuorumState *s = bs->opaque;
1115      char indexstr[INDEXSTR_LEN];
1116      int i;
1117  
1118      for (i = 0; i < s->num_children; i++) {
1119          if (s->children[i] == child) {
1120              break;
1121          }
1122      }
1123  
1124      /* we have checked it in bdrv_del_child() */
1125      assert(i < s->num_children);
1126  
1127      if (s->num_children <= s->threshold) {
1128          error_setg(errp,
1129              "The number of children cannot be lower than the vote threshold %d",
1130              s->threshold);
1131          return;
1132      }
1133  
1134      /* We know now that num_children > threshold, so blkverify must be false */
1135      assert(!s->is_blkverify);
1136  
1137      snprintf(indexstr, INDEXSTR_LEN, "children.%u", s->next_child_index - 1);
1138      if (!strncmp(child->name, indexstr, INDEXSTR_LEN)) {
1139          s->next_child_index--;
1140      }
1141  
1142      bdrv_drained_begin(bs);
1143  
1144      /* We can safely remove this child now */
1145      memmove(&s->children[i], &s->children[i + 1],
1146              (s->num_children - i - 1) * sizeof(BdrvChild *));
1147      s->children = g_renew(BdrvChild *, s->children, --s->num_children);
1148      bdrv_unref_child(bs, child);
1149  
1150      quorum_refresh_flags(bs);
1151      bdrv_drained_end(bs);
1152  }
1153  
1154  static void quorum_gather_child_options(BlockDriverState *bs, QDict *target,
1155                                          bool backing_overridden)
1156  {
1157      BDRVQuorumState *s = bs->opaque;
1158      QList *children_list;
1159      int i;
1160  
1161      /*
1162       * The generic implementation for gathering child options in
1163       * bdrv_refresh_filename() would use the names of the children
1164       * as specified for bdrv_open_child() or bdrv_attach_child(),
1165       * which is "children.%u" with %u being a value
1166       * (s->next_child_index) that is incremented each time a new child
1167       * is added (and never decremented).  Since children can be
1168       * deleted at runtime, there may be gaps in that enumeration.
1169       * When creating a new quorum BDS and specifying the children for
1170       * it through runtime options, the enumeration used there may not
1171       * have any gaps, though.
1172       *
1173       * Therefore, we have to create a new gap-less enumeration here
1174       * (which we can achieve by simply putting all of the children's
1175       * full_open_options into a QList).
1176       *
1177       * XXX: Note that there are issues with the current child option
1178       *      structure quorum uses (such as the fact that children do
1179       *      not really have unique permanent names).  Therefore, this
1180       *      is going to have to change in the future and ideally we
1181       *      want quorum to be covered by the generic implementation.
1182       */
1183  
1184      children_list = qlist_new();
1185      qdict_put(target, "children", children_list);
1186  
1187      for (i = 0; i < s->num_children; i++) {
1188          qlist_append(children_list,
1189                       qobject_ref(s->children[i]->bs->full_open_options));
1190      }
1191  }
1192  
1193  static char *quorum_dirname(BlockDriverState *bs, Error **errp)
1194  {
1195      /* In general, there are multiple BDSs with different dirnames below this
1196       * one; so there is no unique dirname we could return (unless all are equal
1197       * by chance, or there is only one). Therefore, to be consistent, just
1198       * always return NULL. */
1199      error_setg(errp, "Cannot generate a base directory for quorum nodes");
1200      return NULL;
1201  }
1202  
1203  static void quorum_child_perm(BlockDriverState *bs, BdrvChild *c,
1204                                BdrvChildRole role,
1205                                BlockReopenQueue *reopen_queue,
1206                                uint64_t perm, uint64_t shared,
1207                                uint64_t *nperm, uint64_t *nshared)
1208  {
1209      BDRVQuorumState *s = bs->opaque;
1210  
1211      *nperm = perm & DEFAULT_PERM_PASSTHROUGH;
1212      if (s->rewrite_corrupted) {
1213          *nperm |= BLK_PERM_WRITE;
1214      }
1215  
1216      /*
1217       * We cannot share RESIZE or WRITE, as this would make the
1218       * children differ from each other.
1219       */
1220      *nshared = (shared & (BLK_PERM_CONSISTENT_READ |
1221                            BLK_PERM_WRITE_UNCHANGED))
1222               | DEFAULT_PERM_UNCHANGED;
1223  }
1224  
1225  /*
1226   * Each one of the children can report different status flags even
1227   * when they contain the same data, so what this function does is
1228   * return BDRV_BLOCK_ZERO if *all* children agree that a certain
1229   * region contains zeroes, and BDRV_BLOCK_DATA otherwise.
1230   */
1231  static int coroutine_fn GRAPH_RDLOCK
1232  quorum_co_block_status(BlockDriverState *bs, bool want_zero,
1233                         int64_t offset, int64_t count,
1234                         int64_t *pnum, int64_t *map, BlockDriverState **file)
1235  {
1236      BDRVQuorumState *s = bs->opaque;
1237      int i, ret;
1238      int64_t pnum_zero = count;
1239      int64_t pnum_data = 0;
1240  
1241      for (i = 0; i < s->num_children; i++) {
1242          int64_t bytes;
1243          ret = bdrv_co_common_block_status_above(s->children[i]->bs, NULL, false,
1244                                                  want_zero, offset, count,
1245                                                  &bytes, NULL, NULL, NULL);
1246          if (ret < 0) {
1247              quorum_report_bad(QUORUM_OP_TYPE_READ, offset, count,
1248                                s->children[i]->bs->node_name, ret);
1249              pnum_data = count;
1250              break;
1251          }
1252          /*
1253           * Even if all children agree about whether there are zeroes
1254           * or not at @offset they might disagree on the size, so use
1255           * the smallest when reporting BDRV_BLOCK_ZERO and the largest
1256           * when reporting BDRV_BLOCK_DATA.
1257           */
1258          if (ret & BDRV_BLOCK_ZERO) {
1259              pnum_zero = MIN(pnum_zero, bytes);
1260          } else {
1261              pnum_data = MAX(pnum_data, bytes);
1262          }
1263      }
1264  
1265      if (pnum_data) {
1266          *pnum = pnum_data;
1267          return BDRV_BLOCK_DATA;
1268      } else {
1269          *pnum = pnum_zero;
1270          return BDRV_BLOCK_ZERO;
1271      }
1272  }
1273  
1274  static const char *const quorum_strong_runtime_opts[] = {
1275      QUORUM_OPT_VOTE_THRESHOLD,
1276      QUORUM_OPT_BLKVERIFY,
1277      QUORUM_OPT_REWRITE,
1278      QUORUM_OPT_READ_PATTERN,
1279  
1280      NULL
1281  };
1282  
1283  static BlockDriver bdrv_quorum = {
1284      .format_name                        = "quorum",
1285  
1286      .instance_size                      = sizeof(BDRVQuorumState),
1287  
1288      .bdrv_open                          = quorum_open,
1289      .bdrv_close                         = quorum_close,
1290      .bdrv_gather_child_options          = quorum_gather_child_options,
1291      .bdrv_dirname                       = quorum_dirname,
1292      .bdrv_co_block_status               = quorum_co_block_status,
1293  
1294      .bdrv_co_flush                      = quorum_co_flush,
1295  
1296      .bdrv_co_getlength                  = quorum_co_getlength,
1297  
1298      .bdrv_co_preadv                     = quorum_co_preadv,
1299      .bdrv_co_pwritev                    = quorum_co_pwritev,
1300      .bdrv_co_pwrite_zeroes              = quorum_co_pwrite_zeroes,
1301  
1302      .bdrv_add_child                     = quorum_add_child,
1303      .bdrv_del_child                     = quorum_del_child,
1304  
1305      .bdrv_child_perm                    = quorum_child_perm,
1306  
1307      .bdrv_recurse_can_replace           = quorum_recurse_can_replace,
1308  
1309      .strong_runtime_opts                = quorum_strong_runtime_opts,
1310  };
1311  
1312  static void bdrv_quorum_init(void)
1313  {
1314      if (!qcrypto_hash_supports(QCRYPTO_HASH_ALG_SHA256)) {
1315          /* SHA256 hash support is required for quorum device */
1316          return;
1317      }
1318      bdrv_register(&bdrv_quorum);
1319  }
1320  
1321  block_init(bdrv_quorum_init);
1322