1 /* 2 * Quorum Block filter 3 * 4 * Copyright (C) 2012-2014 Nodalink, EURL. 5 * 6 * Author: 7 * Benoît Canet <benoit.canet@irqsave.net> 8 * 9 * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp) 10 * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc). 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or later. 13 * See the COPYING file in the top-level directory. 14 */ 15 16 #include "qemu/osdep.h" 17 #include "qemu/cutils.h" 18 #include "qemu/module.h" 19 #include "qemu/option.h" 20 #include "block/block_int.h" 21 #include "block/qdict.h" 22 #include "qapi/error.h" 23 #include "qapi/qapi-events-block.h" 24 #include "qapi/qmp/qdict.h" 25 #include "qapi/qmp/qerror.h" 26 #include "qapi/qmp/qlist.h" 27 #include "qapi/qmp/qstring.h" 28 #include "crypto/hash.h" 29 30 #define HASH_LENGTH 32 31 32 #define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold" 33 #define QUORUM_OPT_BLKVERIFY "blkverify" 34 #define QUORUM_OPT_REWRITE "rewrite-corrupted" 35 #define QUORUM_OPT_READ_PATTERN "read-pattern" 36 37 /* This union holds a vote hash value */ 38 typedef union QuorumVoteValue { 39 uint8_t h[HASH_LENGTH]; /* SHA-256 hash */ 40 int64_t l; /* simpler 64 bits hash */ 41 } QuorumVoteValue; 42 43 /* A vote item */ 44 typedef struct QuorumVoteItem { 45 int index; 46 QLIST_ENTRY(QuorumVoteItem) next; 47 } QuorumVoteItem; 48 49 /* this structure is a vote version. A version is the set of votes sharing the 50 * same vote value. 51 * The set of votes will be tracked with the items field and its cardinality is 52 * vote_count. 53 */ 54 typedef struct QuorumVoteVersion { 55 QuorumVoteValue value; 56 int index; 57 int vote_count; 58 QLIST_HEAD(, QuorumVoteItem) items; 59 QLIST_ENTRY(QuorumVoteVersion) next; 60 } QuorumVoteVersion; 61 62 /* this structure holds a group of vote versions together */ 63 typedef struct QuorumVotes { 64 QLIST_HEAD(, QuorumVoteVersion) vote_list; 65 bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b); 66 } QuorumVotes; 67 68 /* the following structure holds the state of one quorum instance */ 69 typedef struct BDRVQuorumState { 70 BdrvChild **children; /* children BlockDriverStates */ 71 int num_children; /* children count */ 72 unsigned next_child_index; /* the index of the next child that should 73 * be added 74 */ 75 int threshold; /* if less than threshold children reads gave the 76 * same result a quorum error occurs. 77 */ 78 bool is_blkverify; /* true if the driver is in blkverify mode 79 * Writes are mirrored on two children devices. 80 * On reads the two children devices' contents are 81 * compared and if a difference is spotted its 82 * location is printed and the code aborts. 83 * It is useful to debug other block drivers by 84 * comparing them with a reference one. 85 */ 86 bool rewrite_corrupted;/* true if the driver must rewrite-on-read corrupted 87 * block if Quorum is reached. 88 */ 89 90 QuorumReadPattern read_pattern; 91 } BDRVQuorumState; 92 93 typedef struct QuorumAIOCB QuorumAIOCB; 94 95 /* Quorum will create one instance of the following structure per operation it 96 * performs on its children. 97 * So for each read/write operation coming from the upper layer there will be 98 * $children_count QuorumChildRequest. 99 */ 100 typedef struct QuorumChildRequest { 101 BlockDriverState *bs; 102 QEMUIOVector qiov; 103 uint8_t *buf; 104 int ret; 105 QuorumAIOCB *parent; 106 } QuorumChildRequest; 107 108 /* Quorum will use the following structure to track progress of each read/write 109 * operation received by the upper layer. 110 * This structure hold pointers to the QuorumChildRequest structures instances 111 * used to do operations on each children and track overall progress. 112 */ 113 struct QuorumAIOCB { 114 BlockDriverState *bs; 115 Coroutine *co; 116 117 /* Request metadata */ 118 uint64_t offset; 119 uint64_t bytes; 120 int flags; 121 122 QEMUIOVector *qiov; /* calling IOV */ 123 124 QuorumChildRequest *qcrs; /* individual child requests */ 125 int count; /* number of completed AIOCB */ 126 int success_count; /* number of successfully completed AIOCB */ 127 128 int rewrite_count; /* number of replica to rewrite: count down to 129 * zero once writes are fired 130 */ 131 132 QuorumVotes votes; 133 134 bool is_read; 135 int vote_ret; 136 int children_read; /* how many children have been read from */ 137 }; 138 139 typedef struct QuorumCo { 140 QuorumAIOCB *acb; 141 int idx; 142 } QuorumCo; 143 144 static void quorum_aio_finalize(QuorumAIOCB *acb) 145 { 146 g_free(acb->qcrs); 147 g_free(acb); 148 } 149 150 static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b) 151 { 152 return !memcmp(a->h, b->h, HASH_LENGTH); 153 } 154 155 static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b) 156 { 157 return a->l == b->l; 158 } 159 160 static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs, 161 QEMUIOVector *qiov, 162 uint64_t offset, 163 uint64_t bytes, 164 int flags) 165 { 166 BDRVQuorumState *s = bs->opaque; 167 QuorumAIOCB *acb = g_new(QuorumAIOCB, 1); 168 int i; 169 170 *acb = (QuorumAIOCB) { 171 .co = qemu_coroutine_self(), 172 .bs = bs, 173 .offset = offset, 174 .bytes = bytes, 175 .flags = flags, 176 .qiov = qiov, 177 .votes.compare = quorum_sha256_compare, 178 .votes.vote_list = QLIST_HEAD_INITIALIZER(acb.votes.vote_list), 179 }; 180 181 acb->qcrs = g_new0(QuorumChildRequest, s->num_children); 182 for (i = 0; i < s->num_children; i++) { 183 acb->qcrs[i].buf = NULL; 184 acb->qcrs[i].ret = 0; 185 acb->qcrs[i].parent = acb; 186 } 187 188 return acb; 189 } 190 191 static void quorum_report_bad(QuorumOpType type, uint64_t offset, 192 uint64_t bytes, char *node_name, int ret) 193 { 194 const char *msg = NULL; 195 int64_t start_sector = offset / BDRV_SECTOR_SIZE; 196 int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE); 197 198 if (ret < 0) { 199 msg = strerror(-ret); 200 } 201 202 qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector, 203 end_sector - start_sector); 204 } 205 206 static void quorum_report_failure(QuorumAIOCB *acb) 207 { 208 const char *reference = bdrv_get_device_or_node_name(acb->bs); 209 int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE; 210 int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes, 211 BDRV_SECTOR_SIZE); 212 213 qapi_event_send_quorum_failure(reference, start_sector, 214 end_sector - start_sector); 215 } 216 217 static int quorum_vote_error(QuorumAIOCB *acb); 218 219 static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb) 220 { 221 BDRVQuorumState *s = acb->bs->opaque; 222 223 if (acb->success_count < s->threshold) { 224 acb->vote_ret = quorum_vote_error(acb); 225 quorum_report_failure(acb); 226 return true; 227 } 228 229 return false; 230 } 231 232 static int read_fifo_child(QuorumAIOCB *acb); 233 234 static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source) 235 { 236 int i; 237 assert(dest->niov == source->niov); 238 assert(dest->size == source->size); 239 for (i = 0; i < source->niov; i++) { 240 assert(dest->iov[i].iov_len == source->iov[i].iov_len); 241 memcpy(dest->iov[i].iov_base, 242 source->iov[i].iov_base, 243 source->iov[i].iov_len); 244 } 245 } 246 247 static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret) 248 { 249 QuorumAIOCB *acb = sacb->parent; 250 QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE; 251 quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret); 252 } 253 254 static void quorum_report_bad_versions(BDRVQuorumState *s, 255 QuorumAIOCB *acb, 256 QuorumVoteValue *value) 257 { 258 QuorumVoteVersion *version; 259 QuorumVoteItem *item; 260 261 QLIST_FOREACH(version, &acb->votes.vote_list, next) { 262 if (acb->votes.compare(&version->value, value)) { 263 continue; 264 } 265 QLIST_FOREACH(item, &version->items, next) { 266 quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes, 267 s->children[item->index]->bs->node_name, 0); 268 } 269 } 270 } 271 272 static void quorum_rewrite_entry(void *opaque) 273 { 274 QuorumCo *co = opaque; 275 QuorumAIOCB *acb = co->acb; 276 BDRVQuorumState *s = acb->bs->opaque; 277 278 /* Ignore any errors, it's just a correction attempt for already 279 * corrupted data. 280 * Mask out BDRV_REQ_WRITE_UNCHANGED because this overwrites the 281 * area with different data from the other children. */ 282 bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes, 283 acb->qiov, acb->flags & ~BDRV_REQ_WRITE_UNCHANGED); 284 285 /* Wake up the caller after the last rewrite */ 286 acb->rewrite_count--; 287 if (!acb->rewrite_count) { 288 qemu_coroutine_enter_if_inactive(acb->co); 289 } 290 } 291 292 static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb, 293 QuorumVoteValue *value) 294 { 295 QuorumVoteVersion *version; 296 QuorumVoteItem *item; 297 int count = 0; 298 299 /* first count the number of bad versions: done first to avoid concurrency 300 * issues. 301 */ 302 QLIST_FOREACH(version, &acb->votes.vote_list, next) { 303 if (acb->votes.compare(&version->value, value)) { 304 continue; 305 } 306 QLIST_FOREACH(item, &version->items, next) { 307 count++; 308 } 309 } 310 311 /* quorum_rewrite_entry will count down this to zero */ 312 acb->rewrite_count = count; 313 314 /* now fire the correcting rewrites */ 315 QLIST_FOREACH(version, &acb->votes.vote_list, next) { 316 if (acb->votes.compare(&version->value, value)) { 317 continue; 318 } 319 QLIST_FOREACH(item, &version->items, next) { 320 Coroutine *co; 321 QuorumCo data = { 322 .acb = acb, 323 .idx = item->index, 324 }; 325 326 co = qemu_coroutine_create(quorum_rewrite_entry, &data); 327 qemu_coroutine_enter(co); 328 } 329 } 330 331 /* return true if any rewrite is done else false */ 332 return count; 333 } 334 335 static void quorum_count_vote(QuorumVotes *votes, 336 QuorumVoteValue *value, 337 int index) 338 { 339 QuorumVoteVersion *v = NULL, *version = NULL; 340 QuorumVoteItem *item; 341 342 /* look if we have something with this hash */ 343 QLIST_FOREACH(v, &votes->vote_list, next) { 344 if (votes->compare(&v->value, value)) { 345 version = v; 346 break; 347 } 348 } 349 350 /* It's a version not yet in the list add it */ 351 if (!version) { 352 version = g_new0(QuorumVoteVersion, 1); 353 QLIST_INIT(&version->items); 354 memcpy(&version->value, value, sizeof(version->value)); 355 version->index = index; 356 version->vote_count = 0; 357 QLIST_INSERT_HEAD(&votes->vote_list, version, next); 358 } 359 360 version->vote_count++; 361 362 item = g_new0(QuorumVoteItem, 1); 363 item->index = index; 364 QLIST_INSERT_HEAD(&version->items, item, next); 365 } 366 367 static void quorum_free_vote_list(QuorumVotes *votes) 368 { 369 QuorumVoteVersion *version, *next_version; 370 QuorumVoteItem *item, *next_item; 371 372 QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) { 373 QLIST_REMOVE(version, next); 374 QLIST_FOREACH_SAFE(item, &version->items, next, next_item) { 375 QLIST_REMOVE(item, next); 376 g_free(item); 377 } 378 g_free(version); 379 } 380 } 381 382 static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash) 383 { 384 QEMUIOVector *qiov = &acb->qcrs[i].qiov; 385 size_t len = sizeof(hash->h); 386 uint8_t *data = hash->h; 387 388 /* XXX - would be nice if we could pass in the Error ** 389 * and propagate that back, but this quorum code is 390 * restricted to just errno values currently */ 391 if (qcrypto_hash_bytesv(QCRYPTO_HASH_ALG_SHA256, 392 qiov->iov, qiov->niov, 393 &data, &len, 394 NULL) < 0) { 395 return -EINVAL; 396 } 397 398 return 0; 399 } 400 401 static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes) 402 { 403 int max = 0; 404 QuorumVoteVersion *candidate, *winner = NULL; 405 406 QLIST_FOREACH(candidate, &votes->vote_list, next) { 407 if (candidate->vote_count > max) { 408 max = candidate->vote_count; 409 winner = candidate; 410 } 411 } 412 413 return winner; 414 } 415 416 /* qemu_iovec_compare is handy for blkverify mode because it returns the first 417 * differing byte location. Yet it is handcoded to compare vectors one byte 418 * after another so it does not benefit from the libc SIMD optimizations. 419 * quorum_iovec_compare is written for speed and should be used in the non 420 * blkverify mode of quorum. 421 */ 422 static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b) 423 { 424 int i; 425 int result; 426 427 assert(a->niov == b->niov); 428 for (i = 0; i < a->niov; i++) { 429 assert(a->iov[i].iov_len == b->iov[i].iov_len); 430 result = memcmp(a->iov[i].iov_base, 431 b->iov[i].iov_base, 432 a->iov[i].iov_len); 433 if (result) { 434 return false; 435 } 436 } 437 438 return true; 439 } 440 441 static bool quorum_compare(QuorumAIOCB *acb, QEMUIOVector *a, QEMUIOVector *b) 442 { 443 BDRVQuorumState *s = acb->bs->opaque; 444 ssize_t offset; 445 446 /* This driver will replace blkverify in this particular case */ 447 if (s->is_blkverify) { 448 offset = qemu_iovec_compare(a, b); 449 if (offset != -1) { 450 fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64 451 " contents mismatch at offset %" PRIu64 "\n", 452 acb->offset, acb->bytes, acb->offset + offset); 453 exit(1); 454 } 455 return true; 456 } 457 458 return quorum_iovec_compare(a, b); 459 } 460 461 /* Do a vote to get the error code */ 462 static int quorum_vote_error(QuorumAIOCB *acb) 463 { 464 BDRVQuorumState *s = acb->bs->opaque; 465 QuorumVoteVersion *winner = NULL; 466 QuorumVotes error_votes; 467 QuorumVoteValue result_value; 468 int i, ret = 0; 469 bool error = false; 470 471 QLIST_INIT(&error_votes.vote_list); 472 error_votes.compare = quorum_64bits_compare; 473 474 for (i = 0; i < s->num_children; i++) { 475 ret = acb->qcrs[i].ret; 476 if (ret) { 477 error = true; 478 result_value.l = ret; 479 quorum_count_vote(&error_votes, &result_value, i); 480 } 481 } 482 483 if (error) { 484 winner = quorum_get_vote_winner(&error_votes); 485 ret = winner->value.l; 486 } 487 488 quorum_free_vote_list(&error_votes); 489 490 return ret; 491 } 492 493 static void quorum_vote(QuorumAIOCB *acb) 494 { 495 bool quorum = true; 496 int i, j, ret; 497 QuorumVoteValue hash; 498 BDRVQuorumState *s = acb->bs->opaque; 499 QuorumVoteVersion *winner; 500 501 if (quorum_has_too_much_io_failed(acb)) { 502 return; 503 } 504 505 /* get the index of the first successful read */ 506 for (i = 0; i < s->num_children; i++) { 507 if (!acb->qcrs[i].ret) { 508 break; 509 } 510 } 511 512 assert(i < s->num_children); 513 514 /* compare this read with all other successful reads stopping at quorum 515 * failure 516 */ 517 for (j = i + 1; j < s->num_children; j++) { 518 if (acb->qcrs[j].ret) { 519 continue; 520 } 521 quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov); 522 if (!quorum) { 523 break; 524 } 525 } 526 527 /* Every successful read agrees */ 528 if (quorum) { 529 quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov); 530 return; 531 } 532 533 /* compute hashes for each successful read, also store indexes */ 534 for (i = 0; i < s->num_children; i++) { 535 if (acb->qcrs[i].ret) { 536 continue; 537 } 538 ret = quorum_compute_hash(acb, i, &hash); 539 /* if ever the hash computation failed */ 540 if (ret < 0) { 541 acb->vote_ret = ret; 542 goto free_exit; 543 } 544 quorum_count_vote(&acb->votes, &hash, i); 545 } 546 547 /* vote to select the most represented version */ 548 winner = quorum_get_vote_winner(&acb->votes); 549 550 /* if the winner count is smaller than threshold the read fails */ 551 if (winner->vote_count < s->threshold) { 552 quorum_report_failure(acb); 553 acb->vote_ret = -EIO; 554 goto free_exit; 555 } 556 557 /* we have a winner: copy it */ 558 quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov); 559 560 /* some versions are bad print them */ 561 quorum_report_bad_versions(s, acb, &winner->value); 562 563 /* corruption correction is enabled */ 564 if (s->rewrite_corrupted) { 565 quorum_rewrite_bad_versions(acb, &winner->value); 566 } 567 568 free_exit: 569 /* free lists */ 570 quorum_free_vote_list(&acb->votes); 571 } 572 573 static void read_quorum_children_entry(void *opaque) 574 { 575 QuorumCo *co = opaque; 576 QuorumAIOCB *acb = co->acb; 577 BDRVQuorumState *s = acb->bs->opaque; 578 int i = co->idx; 579 QuorumChildRequest *sacb = &acb->qcrs[i]; 580 581 sacb->bs = s->children[i]->bs; 582 sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes, 583 &acb->qcrs[i].qiov, 0); 584 585 if (sacb->ret == 0) { 586 acb->success_count++; 587 } else { 588 quorum_report_bad_acb(sacb, sacb->ret); 589 } 590 591 acb->count++; 592 assert(acb->count <= s->num_children); 593 assert(acb->success_count <= s->num_children); 594 595 /* Wake up the caller after the last read */ 596 if (acb->count == s->num_children) { 597 qemu_coroutine_enter_if_inactive(acb->co); 598 } 599 } 600 601 static int read_quorum_children(QuorumAIOCB *acb) 602 { 603 BDRVQuorumState *s = acb->bs->opaque; 604 int i; 605 606 acb->children_read = s->num_children; 607 for (i = 0; i < s->num_children; i++) { 608 acb->qcrs[i].buf = qemu_blockalign(s->children[i]->bs, acb->qiov->size); 609 qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov); 610 qemu_iovec_clone(&acb->qcrs[i].qiov, acb->qiov, acb->qcrs[i].buf); 611 } 612 613 for (i = 0; i < s->num_children; i++) { 614 Coroutine *co; 615 QuorumCo data = { 616 .acb = acb, 617 .idx = i, 618 }; 619 620 co = qemu_coroutine_create(read_quorum_children_entry, &data); 621 qemu_coroutine_enter(co); 622 } 623 624 while (acb->count < s->num_children) { 625 qemu_coroutine_yield(); 626 } 627 628 /* Do the vote on read */ 629 quorum_vote(acb); 630 for (i = 0; i < s->num_children; i++) { 631 qemu_vfree(acb->qcrs[i].buf); 632 qemu_iovec_destroy(&acb->qcrs[i].qiov); 633 } 634 635 while (acb->rewrite_count) { 636 qemu_coroutine_yield(); 637 } 638 639 return acb->vote_ret; 640 } 641 642 static int read_fifo_child(QuorumAIOCB *acb) 643 { 644 BDRVQuorumState *s = acb->bs->opaque; 645 int n, ret; 646 647 /* We try to read the next child in FIFO order if we failed to read */ 648 do { 649 n = acb->children_read++; 650 acb->qcrs[n].bs = s->children[n]->bs; 651 ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes, 652 acb->qiov, 0); 653 if (ret < 0) { 654 quorum_report_bad_acb(&acb->qcrs[n], ret); 655 } 656 } while (ret < 0 && acb->children_read < s->num_children); 657 658 /* FIXME: rewrite failed children if acb->children_read > 1? */ 659 660 return ret; 661 } 662 663 static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset, 664 uint64_t bytes, QEMUIOVector *qiov, int flags) 665 { 666 BDRVQuorumState *s = bs->opaque; 667 QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags); 668 int ret; 669 670 acb->is_read = true; 671 acb->children_read = 0; 672 673 if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) { 674 ret = read_quorum_children(acb); 675 } else { 676 ret = read_fifo_child(acb); 677 } 678 quorum_aio_finalize(acb); 679 680 return ret; 681 } 682 683 static void write_quorum_entry(void *opaque) 684 { 685 QuorumCo *co = opaque; 686 QuorumAIOCB *acb = co->acb; 687 BDRVQuorumState *s = acb->bs->opaque; 688 int i = co->idx; 689 QuorumChildRequest *sacb = &acb->qcrs[i]; 690 691 sacb->bs = s->children[i]->bs; 692 sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes, 693 acb->qiov, acb->flags); 694 if (sacb->ret == 0) { 695 acb->success_count++; 696 } else { 697 quorum_report_bad_acb(sacb, sacb->ret); 698 } 699 acb->count++; 700 assert(acb->count <= s->num_children); 701 assert(acb->success_count <= s->num_children); 702 703 /* Wake up the caller after the last write */ 704 if (acb->count == s->num_children) { 705 qemu_coroutine_enter_if_inactive(acb->co); 706 } 707 } 708 709 static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset, 710 uint64_t bytes, QEMUIOVector *qiov, int flags) 711 { 712 BDRVQuorumState *s = bs->opaque; 713 QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags); 714 int i, ret; 715 716 for (i = 0; i < s->num_children; i++) { 717 Coroutine *co; 718 QuorumCo data = { 719 .acb = acb, 720 .idx = i, 721 }; 722 723 co = qemu_coroutine_create(write_quorum_entry, &data); 724 qemu_coroutine_enter(co); 725 } 726 727 while (acb->count < s->num_children) { 728 qemu_coroutine_yield(); 729 } 730 731 quorum_has_too_much_io_failed(acb); 732 733 ret = acb->vote_ret; 734 quorum_aio_finalize(acb); 735 736 return ret; 737 } 738 739 static int64_t quorum_getlength(BlockDriverState *bs) 740 { 741 BDRVQuorumState *s = bs->opaque; 742 int64_t result; 743 int i; 744 745 /* check that all file have the same length */ 746 result = bdrv_getlength(s->children[0]->bs); 747 if (result < 0) { 748 return result; 749 } 750 for (i = 1; i < s->num_children; i++) { 751 int64_t value = bdrv_getlength(s->children[i]->bs); 752 if (value < 0) { 753 return value; 754 } 755 if (value != result) { 756 return -EIO; 757 } 758 } 759 760 return result; 761 } 762 763 static coroutine_fn int quorum_co_flush(BlockDriverState *bs) 764 { 765 BDRVQuorumState *s = bs->opaque; 766 QuorumVoteVersion *winner = NULL; 767 QuorumVotes error_votes; 768 QuorumVoteValue result_value; 769 int i; 770 int result = 0; 771 int success_count = 0; 772 773 QLIST_INIT(&error_votes.vote_list); 774 error_votes.compare = quorum_64bits_compare; 775 776 for (i = 0; i < s->num_children; i++) { 777 result = bdrv_co_flush(s->children[i]->bs); 778 if (result) { 779 quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0, 0, 780 s->children[i]->bs->node_name, result); 781 result_value.l = result; 782 quorum_count_vote(&error_votes, &result_value, i); 783 } else { 784 success_count++; 785 } 786 } 787 788 if (success_count >= s->threshold) { 789 result = 0; 790 } else { 791 winner = quorum_get_vote_winner(&error_votes); 792 result = winner->value.l; 793 } 794 quorum_free_vote_list(&error_votes); 795 796 return result; 797 } 798 799 static bool quorum_recurse_can_replace(BlockDriverState *bs, 800 BlockDriverState *to_replace) 801 { 802 BDRVQuorumState *s = bs->opaque; 803 int i; 804 805 for (i = 0; i < s->num_children; i++) { 806 /* 807 * We have no idea whether our children show the same data as 808 * this node (@bs). It is actually highly likely that 809 * @to_replace does not, because replacing a broken child is 810 * one of the main use cases here. 811 * 812 * We do know that the new BDS will match @bs, so replacing 813 * any of our children by it will be safe. It cannot change 814 * the data this quorum node presents to its parents. 815 * 816 * However, replacing @to_replace by @bs in any of our 817 * children's chains may change visible data somewhere in 818 * there. We therefore cannot recurse down those chains with 819 * bdrv_recurse_can_replace(). 820 * (More formally, bdrv_recurse_can_replace() requires that 821 * @to_replace will be replaced by something matching the @bs 822 * passed to it. We cannot guarantee that.) 823 * 824 * Thus, we can only check whether any of our immediate 825 * children matches @to_replace. 826 * 827 * (In the future, we might add a function to recurse down a 828 * chain that checks that nothing there cares about a change 829 * in data from the respective child in question. For 830 * example, most filters do not care when their child's data 831 * suddenly changes, as long as their parents do not care.) 832 */ 833 if (s->children[i]->bs == to_replace) { 834 /* 835 * We now have to ensure that there is no other parent 836 * that cares about replacing this child by a node with 837 * potentially different data. 838 * We do so by checking whether there are any other parents 839 * at all, which is stricter than necessary, but also very 840 * simple. (We may decide to implement something more 841 * complex and permissive when there is an actual need for 842 * it.) 843 */ 844 return QLIST_FIRST(&to_replace->parents) == s->children[i] && 845 QLIST_NEXT(s->children[i], next_parent) == NULL; 846 } 847 } 848 849 return false; 850 } 851 852 static int quorum_valid_threshold(int threshold, int num_children, Error **errp) 853 { 854 855 if (threshold < 1) { 856 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, 857 "vote-threshold", "value >= 1"); 858 return -ERANGE; 859 } 860 861 if (threshold > num_children) { 862 error_setg(errp, "threshold may not exceed children count"); 863 return -ERANGE; 864 } 865 866 return 0; 867 } 868 869 static QemuOptsList quorum_runtime_opts = { 870 .name = "quorum", 871 .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head), 872 .desc = { 873 { 874 .name = QUORUM_OPT_VOTE_THRESHOLD, 875 .type = QEMU_OPT_NUMBER, 876 .help = "The number of vote needed for reaching quorum", 877 }, 878 { 879 .name = QUORUM_OPT_BLKVERIFY, 880 .type = QEMU_OPT_BOOL, 881 .help = "Trigger block verify mode if set", 882 }, 883 { 884 .name = QUORUM_OPT_REWRITE, 885 .type = QEMU_OPT_BOOL, 886 .help = "Rewrite corrupted block on read quorum", 887 }, 888 { 889 .name = QUORUM_OPT_READ_PATTERN, 890 .type = QEMU_OPT_STRING, 891 .help = "Allowed pattern: quorum, fifo. Quorum is default", 892 }, 893 { /* end of list */ } 894 }, 895 }; 896 897 static int quorum_open(BlockDriverState *bs, QDict *options, int flags, 898 Error **errp) 899 { 900 BDRVQuorumState *s = bs->opaque; 901 Error *local_err = NULL; 902 QemuOpts *opts = NULL; 903 const char *pattern_str; 904 bool *opened; 905 int i; 906 int ret = 0; 907 908 qdict_flatten(options); 909 910 /* count how many different children are present */ 911 s->num_children = qdict_array_entries(options, "children."); 912 if (s->num_children < 0) { 913 error_setg(errp, "Option children is not a valid array"); 914 ret = -EINVAL; 915 goto exit; 916 } 917 if (s->num_children < 1) { 918 error_setg(errp, "Number of provided children must be 1 or more"); 919 ret = -EINVAL; 920 goto exit; 921 } 922 923 opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort); 924 if (!qemu_opts_absorb_qdict(opts, options, errp)) { 925 ret = -EINVAL; 926 goto exit; 927 } 928 929 s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0); 930 /* and validate it against s->num_children */ 931 ret = quorum_valid_threshold(s->threshold, s->num_children, errp); 932 if (ret < 0) { 933 goto exit; 934 } 935 936 pattern_str = qemu_opt_get(opts, QUORUM_OPT_READ_PATTERN); 937 if (!pattern_str) { 938 ret = QUORUM_READ_PATTERN_QUORUM; 939 } else { 940 ret = qapi_enum_parse(&QuorumReadPattern_lookup, pattern_str, 941 -EINVAL, NULL); 942 } 943 if (ret < 0) { 944 error_setg(errp, "Please set read-pattern as fifo or quorum"); 945 goto exit; 946 } 947 s->read_pattern = ret; 948 949 if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) { 950 s->is_blkverify = qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false); 951 if (s->is_blkverify && (s->num_children != 2 || s->threshold != 2)) { 952 error_setg(errp, "blkverify=on can only be set if there are " 953 "exactly two files and vote-threshold is 2"); 954 ret = -EINVAL; 955 goto exit; 956 } 957 958 s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE, 959 false); 960 if (s->rewrite_corrupted && s->is_blkverify) { 961 error_setg(errp, 962 "rewrite-corrupted=on cannot be used with blkverify=on"); 963 ret = -EINVAL; 964 goto exit; 965 } 966 } 967 968 /* allocate the children array */ 969 s->children = g_new0(BdrvChild *, s->num_children); 970 opened = g_new0(bool, s->num_children); 971 972 for (i = 0; i < s->num_children; i++) { 973 char indexstr[32]; 974 ret = snprintf(indexstr, 32, "children.%d", i); 975 assert(ret < 32); 976 977 s->children[i] = bdrv_open_child(NULL, options, indexstr, bs, 978 &child_of_bds, BDRV_CHILD_DATA, false, 979 &local_err); 980 if (local_err) { 981 error_propagate(errp, local_err); 982 ret = -EINVAL; 983 goto close_exit; 984 } 985 986 opened[i] = true; 987 } 988 s->next_child_index = s->num_children; 989 990 bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED; 991 992 g_free(opened); 993 goto exit; 994 995 close_exit: 996 /* cleanup on error */ 997 for (i = 0; i < s->num_children; i++) { 998 if (!opened[i]) { 999 continue; 1000 } 1001 bdrv_unref_child(bs, s->children[i]); 1002 } 1003 g_free(s->children); 1004 g_free(opened); 1005 exit: 1006 qemu_opts_del(opts); 1007 return ret; 1008 } 1009 1010 static void quorum_close(BlockDriverState *bs) 1011 { 1012 BDRVQuorumState *s = bs->opaque; 1013 int i; 1014 1015 for (i = 0; i < s->num_children; i++) { 1016 bdrv_unref_child(bs, s->children[i]); 1017 } 1018 1019 g_free(s->children); 1020 } 1021 1022 static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs, 1023 Error **errp) 1024 { 1025 BDRVQuorumState *s = bs->opaque; 1026 BdrvChild *child; 1027 char indexstr[32]; 1028 int ret; 1029 1030 if (s->is_blkverify) { 1031 error_setg(errp, "Cannot add a child to a quorum in blkverify mode"); 1032 return; 1033 } 1034 1035 assert(s->num_children <= INT_MAX / sizeof(BdrvChild *)); 1036 if (s->num_children == INT_MAX / sizeof(BdrvChild *) || 1037 s->next_child_index == UINT_MAX) { 1038 error_setg(errp, "Too many children"); 1039 return; 1040 } 1041 1042 ret = snprintf(indexstr, 32, "children.%u", s->next_child_index); 1043 if (ret < 0 || ret >= 32) { 1044 error_setg(errp, "cannot generate child name"); 1045 return; 1046 } 1047 s->next_child_index++; 1048 1049 bdrv_drained_begin(bs); 1050 1051 /* We can safely add the child now */ 1052 bdrv_ref(child_bs); 1053 1054 child = bdrv_attach_child(bs, child_bs, indexstr, &child_of_bds, 1055 BDRV_CHILD_DATA, errp); 1056 if (child == NULL) { 1057 s->next_child_index--; 1058 goto out; 1059 } 1060 s->children = g_renew(BdrvChild *, s->children, s->num_children + 1); 1061 s->children[s->num_children++] = child; 1062 1063 out: 1064 bdrv_drained_end(bs); 1065 } 1066 1067 static void quorum_del_child(BlockDriverState *bs, BdrvChild *child, 1068 Error **errp) 1069 { 1070 BDRVQuorumState *s = bs->opaque; 1071 int i; 1072 1073 for (i = 0; i < s->num_children; i++) { 1074 if (s->children[i] == child) { 1075 break; 1076 } 1077 } 1078 1079 /* we have checked it in bdrv_del_child() */ 1080 assert(i < s->num_children); 1081 1082 if (s->num_children <= s->threshold) { 1083 error_setg(errp, 1084 "The number of children cannot be lower than the vote threshold %d", 1085 s->threshold); 1086 return; 1087 } 1088 1089 /* We know now that num_children > threshold, so blkverify must be false */ 1090 assert(!s->is_blkverify); 1091 1092 bdrv_drained_begin(bs); 1093 1094 /* We can safely remove this child now */ 1095 memmove(&s->children[i], &s->children[i + 1], 1096 (s->num_children - i - 1) * sizeof(BdrvChild *)); 1097 s->children = g_renew(BdrvChild *, s->children, --s->num_children); 1098 bdrv_unref_child(bs, child); 1099 1100 bdrv_drained_end(bs); 1101 } 1102 1103 static void quorum_gather_child_options(BlockDriverState *bs, QDict *target, 1104 bool backing_overridden) 1105 { 1106 BDRVQuorumState *s = bs->opaque; 1107 QList *children_list; 1108 int i; 1109 1110 /* 1111 * The generic implementation for gathering child options in 1112 * bdrv_refresh_filename() would use the names of the children 1113 * as specified for bdrv_open_child() or bdrv_attach_child(), 1114 * which is "children.%u" with %u being a value 1115 * (s->next_child_index) that is incremented each time a new child 1116 * is added (and never decremented). Since children can be 1117 * deleted at runtime, there may be gaps in that enumeration. 1118 * When creating a new quorum BDS and specifying the children for 1119 * it through runtime options, the enumeration used there may not 1120 * have any gaps, though. 1121 * 1122 * Therefore, we have to create a new gap-less enumeration here 1123 * (which we can achieve by simply putting all of the children's 1124 * full_open_options into a QList). 1125 * 1126 * XXX: Note that there are issues with the current child option 1127 * structure quorum uses (such as the fact that children do 1128 * not really have unique permanent names). Therefore, this 1129 * is going to have to change in the future and ideally we 1130 * want quorum to be covered by the generic implementation. 1131 */ 1132 1133 children_list = qlist_new(); 1134 qdict_put(target, "children", children_list); 1135 1136 for (i = 0; i < s->num_children; i++) { 1137 qlist_append(children_list, 1138 qobject_ref(s->children[i]->bs->full_open_options)); 1139 } 1140 } 1141 1142 static char *quorum_dirname(BlockDriverState *bs, Error **errp) 1143 { 1144 /* In general, there are multiple BDSs with different dirnames below this 1145 * one; so there is no unique dirname we could return (unless all are equal 1146 * by chance, or there is only one). Therefore, to be consistent, just 1147 * always return NULL. */ 1148 error_setg(errp, "Cannot generate a base directory for quorum nodes"); 1149 return NULL; 1150 } 1151 1152 static void quorum_child_perm(BlockDriverState *bs, BdrvChild *c, 1153 BdrvChildRole role, 1154 BlockReopenQueue *reopen_queue, 1155 uint64_t perm, uint64_t shared, 1156 uint64_t *nperm, uint64_t *nshared) 1157 { 1158 *nperm = perm & DEFAULT_PERM_PASSTHROUGH; 1159 1160 /* 1161 * We cannot share RESIZE or WRITE, as this would make the 1162 * children differ from each other. 1163 */ 1164 *nshared = (shared & (BLK_PERM_CONSISTENT_READ | 1165 BLK_PERM_WRITE_UNCHANGED)) 1166 | DEFAULT_PERM_UNCHANGED; 1167 } 1168 1169 static const char *const quorum_strong_runtime_opts[] = { 1170 QUORUM_OPT_VOTE_THRESHOLD, 1171 QUORUM_OPT_BLKVERIFY, 1172 QUORUM_OPT_REWRITE, 1173 QUORUM_OPT_READ_PATTERN, 1174 1175 NULL 1176 }; 1177 1178 static BlockDriver bdrv_quorum = { 1179 .format_name = "quorum", 1180 1181 .instance_size = sizeof(BDRVQuorumState), 1182 1183 .bdrv_open = quorum_open, 1184 .bdrv_close = quorum_close, 1185 .bdrv_gather_child_options = quorum_gather_child_options, 1186 .bdrv_dirname = quorum_dirname, 1187 1188 .bdrv_co_flush_to_disk = quorum_co_flush, 1189 1190 .bdrv_getlength = quorum_getlength, 1191 1192 .bdrv_co_preadv = quorum_co_preadv, 1193 .bdrv_co_pwritev = quorum_co_pwritev, 1194 1195 .bdrv_add_child = quorum_add_child, 1196 .bdrv_del_child = quorum_del_child, 1197 1198 .bdrv_child_perm = quorum_child_perm, 1199 1200 .bdrv_recurse_can_replace = quorum_recurse_can_replace, 1201 1202 .strong_runtime_opts = quorum_strong_runtime_opts, 1203 }; 1204 1205 static void bdrv_quorum_init(void) 1206 { 1207 if (!qcrypto_hash_supports(QCRYPTO_HASH_ALG_SHA256)) { 1208 /* SHA256 hash support is required for quorum device */ 1209 return; 1210 } 1211 bdrv_register(&bdrv_quorum); 1212 } 1213 1214 block_init(bdrv_quorum_init); 1215