1 /* 2 * Quorum Block filter 3 * 4 * Copyright (C) 2012-2014 Nodalink, EURL. 5 * 6 * Author: 7 * Benoît Canet <benoit.canet@irqsave.net> 8 * 9 * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp) 10 * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc). 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or later. 13 * See the COPYING file in the top-level directory. 14 */ 15 16 #include "qemu/osdep.h" 17 #include "qemu/cutils.h" 18 #include "qemu/module.h" 19 #include "qemu/option.h" 20 #include "block/block_int.h" 21 #include "block/qdict.h" 22 #include "qapi/error.h" 23 #include "qapi/qapi-events-block.h" 24 #include "qapi/qmp/qdict.h" 25 #include "qapi/qmp/qerror.h" 26 #include "qapi/qmp/qlist.h" 27 #include "qapi/qmp/qstring.h" 28 #include "crypto/hash.h" 29 30 #define HASH_LENGTH 32 31 32 #define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold" 33 #define QUORUM_OPT_BLKVERIFY "blkverify" 34 #define QUORUM_OPT_REWRITE "rewrite-corrupted" 35 #define QUORUM_OPT_READ_PATTERN "read-pattern" 36 37 /* This union holds a vote hash value */ 38 typedef union QuorumVoteValue { 39 uint8_t h[HASH_LENGTH]; /* SHA-256 hash */ 40 int64_t l; /* simpler 64 bits hash */ 41 } QuorumVoteValue; 42 43 /* A vote item */ 44 typedef struct QuorumVoteItem { 45 int index; 46 QLIST_ENTRY(QuorumVoteItem) next; 47 } QuorumVoteItem; 48 49 /* this structure is a vote version. A version is the set of votes sharing the 50 * same vote value. 51 * The set of votes will be tracked with the items field and its cardinality is 52 * vote_count. 53 */ 54 typedef struct QuorumVoteVersion { 55 QuorumVoteValue value; 56 int index; 57 int vote_count; 58 QLIST_HEAD(, QuorumVoteItem) items; 59 QLIST_ENTRY(QuorumVoteVersion) next; 60 } QuorumVoteVersion; 61 62 /* this structure holds a group of vote versions together */ 63 typedef struct QuorumVotes { 64 QLIST_HEAD(, QuorumVoteVersion) vote_list; 65 bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b); 66 } QuorumVotes; 67 68 /* the following structure holds the state of one quorum instance */ 69 typedef struct BDRVQuorumState { 70 BdrvChild **children; /* children BlockDriverStates */ 71 int num_children; /* children count */ 72 unsigned next_child_index; /* the index of the next child that should 73 * be added 74 */ 75 int threshold; /* if less than threshold children reads gave the 76 * same result a quorum error occurs. 77 */ 78 bool is_blkverify; /* true if the driver is in blkverify mode 79 * Writes are mirrored on two children devices. 80 * On reads the two children devices' contents are 81 * compared and if a difference is spotted its 82 * location is printed and the code aborts. 83 * It is useful to debug other block drivers by 84 * comparing them with a reference one. 85 */ 86 bool rewrite_corrupted;/* true if the driver must rewrite-on-read corrupted 87 * block if Quorum is reached. 88 */ 89 90 QuorumReadPattern read_pattern; 91 } BDRVQuorumState; 92 93 typedef struct QuorumAIOCB QuorumAIOCB; 94 95 /* Quorum will create one instance of the following structure per operation it 96 * performs on its children. 97 * So for each read/write operation coming from the upper layer there will be 98 * $children_count QuorumChildRequest. 99 */ 100 typedef struct QuorumChildRequest { 101 BlockDriverState *bs; 102 QEMUIOVector qiov; 103 uint8_t *buf; 104 int ret; 105 QuorumAIOCB *parent; 106 } QuorumChildRequest; 107 108 /* Quorum will use the following structure to track progress of each read/write 109 * operation received by the upper layer. 110 * This structure hold pointers to the QuorumChildRequest structures instances 111 * used to do operations on each children and track overall progress. 112 */ 113 struct QuorumAIOCB { 114 BlockDriverState *bs; 115 Coroutine *co; 116 117 /* Request metadata */ 118 uint64_t offset; 119 uint64_t bytes; 120 int flags; 121 122 QEMUIOVector *qiov; /* calling IOV */ 123 124 QuorumChildRequest *qcrs; /* individual child requests */ 125 int count; /* number of completed AIOCB */ 126 int success_count; /* number of successfully completed AIOCB */ 127 128 int rewrite_count; /* number of replica to rewrite: count down to 129 * zero once writes are fired 130 */ 131 132 QuorumVotes votes; 133 134 bool is_read; 135 int vote_ret; 136 int children_read; /* how many children have been read from */ 137 }; 138 139 typedef struct QuorumCo { 140 QuorumAIOCB *acb; 141 int idx; 142 } QuorumCo; 143 144 static void quorum_aio_finalize(QuorumAIOCB *acb) 145 { 146 g_free(acb->qcrs); 147 g_free(acb); 148 } 149 150 static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b) 151 { 152 return !memcmp(a->h, b->h, HASH_LENGTH); 153 } 154 155 static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b) 156 { 157 return a->l == b->l; 158 } 159 160 static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs, 161 QEMUIOVector *qiov, 162 uint64_t offset, 163 uint64_t bytes, 164 int flags) 165 { 166 BDRVQuorumState *s = bs->opaque; 167 QuorumAIOCB *acb = g_new(QuorumAIOCB, 1); 168 int i; 169 170 *acb = (QuorumAIOCB) { 171 .co = qemu_coroutine_self(), 172 .bs = bs, 173 .offset = offset, 174 .bytes = bytes, 175 .flags = flags, 176 .qiov = qiov, 177 .votes.compare = quorum_sha256_compare, 178 .votes.vote_list = QLIST_HEAD_INITIALIZER(acb.votes.vote_list), 179 }; 180 181 acb->qcrs = g_new0(QuorumChildRequest, s->num_children); 182 for (i = 0; i < s->num_children; i++) { 183 acb->qcrs[i].buf = NULL; 184 acb->qcrs[i].ret = 0; 185 acb->qcrs[i].parent = acb; 186 } 187 188 return acb; 189 } 190 191 static void quorum_report_bad(QuorumOpType type, uint64_t offset, 192 uint64_t bytes, char *node_name, int ret) 193 { 194 const char *msg = NULL; 195 int64_t start_sector = offset / BDRV_SECTOR_SIZE; 196 int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE); 197 198 if (ret < 0) { 199 msg = strerror(-ret); 200 } 201 202 qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector, 203 end_sector - start_sector); 204 } 205 206 static void quorum_report_failure(QuorumAIOCB *acb) 207 { 208 const char *reference = bdrv_get_device_or_node_name(acb->bs); 209 int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE; 210 int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes, 211 BDRV_SECTOR_SIZE); 212 213 qapi_event_send_quorum_failure(reference, start_sector, 214 end_sector - start_sector); 215 } 216 217 static int quorum_vote_error(QuorumAIOCB *acb); 218 219 static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb) 220 { 221 BDRVQuorumState *s = acb->bs->opaque; 222 223 if (acb->success_count < s->threshold) { 224 acb->vote_ret = quorum_vote_error(acb); 225 quorum_report_failure(acb); 226 return true; 227 } 228 229 return false; 230 } 231 232 static int read_fifo_child(QuorumAIOCB *acb); 233 234 static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source) 235 { 236 int i; 237 assert(dest->niov == source->niov); 238 assert(dest->size == source->size); 239 for (i = 0; i < source->niov; i++) { 240 assert(dest->iov[i].iov_len == source->iov[i].iov_len); 241 memcpy(dest->iov[i].iov_base, 242 source->iov[i].iov_base, 243 source->iov[i].iov_len); 244 } 245 } 246 247 static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret) 248 { 249 QuorumAIOCB *acb = sacb->parent; 250 QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE; 251 quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret); 252 } 253 254 static void quorum_report_bad_versions(BDRVQuorumState *s, 255 QuorumAIOCB *acb, 256 QuorumVoteValue *value) 257 { 258 QuorumVoteVersion *version; 259 QuorumVoteItem *item; 260 261 QLIST_FOREACH(version, &acb->votes.vote_list, next) { 262 if (acb->votes.compare(&version->value, value)) { 263 continue; 264 } 265 QLIST_FOREACH(item, &version->items, next) { 266 quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes, 267 s->children[item->index]->bs->node_name, 0); 268 } 269 } 270 } 271 272 static void quorum_rewrite_entry(void *opaque) 273 { 274 QuorumCo *co = opaque; 275 QuorumAIOCB *acb = co->acb; 276 BDRVQuorumState *s = acb->bs->opaque; 277 278 /* Ignore any errors, it's just a correction attempt for already 279 * corrupted data. 280 * Mask out BDRV_REQ_WRITE_UNCHANGED because this overwrites the 281 * area with different data from the other children. */ 282 bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes, 283 acb->qiov, acb->flags & ~BDRV_REQ_WRITE_UNCHANGED); 284 285 /* Wake up the caller after the last rewrite */ 286 acb->rewrite_count--; 287 if (!acb->rewrite_count) { 288 qemu_coroutine_enter_if_inactive(acb->co); 289 } 290 } 291 292 static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb, 293 QuorumVoteValue *value) 294 { 295 QuorumVoteVersion *version; 296 QuorumVoteItem *item; 297 int count = 0; 298 299 /* first count the number of bad versions: done first to avoid concurrency 300 * issues. 301 */ 302 QLIST_FOREACH(version, &acb->votes.vote_list, next) { 303 if (acb->votes.compare(&version->value, value)) { 304 continue; 305 } 306 QLIST_FOREACH(item, &version->items, next) { 307 count++; 308 } 309 } 310 311 /* quorum_rewrite_entry will count down this to zero */ 312 acb->rewrite_count = count; 313 314 /* now fire the correcting rewrites */ 315 QLIST_FOREACH(version, &acb->votes.vote_list, next) { 316 if (acb->votes.compare(&version->value, value)) { 317 continue; 318 } 319 QLIST_FOREACH(item, &version->items, next) { 320 Coroutine *co; 321 QuorumCo data = { 322 .acb = acb, 323 .idx = item->index, 324 }; 325 326 co = qemu_coroutine_create(quorum_rewrite_entry, &data); 327 qemu_coroutine_enter(co); 328 } 329 } 330 331 /* return true if any rewrite is done else false */ 332 return count; 333 } 334 335 static void quorum_count_vote(QuorumVotes *votes, 336 QuorumVoteValue *value, 337 int index) 338 { 339 QuorumVoteVersion *v = NULL, *version = NULL; 340 QuorumVoteItem *item; 341 342 /* look if we have something with this hash */ 343 QLIST_FOREACH(v, &votes->vote_list, next) { 344 if (votes->compare(&v->value, value)) { 345 version = v; 346 break; 347 } 348 } 349 350 /* It's a version not yet in the list add it */ 351 if (!version) { 352 version = g_new0(QuorumVoteVersion, 1); 353 QLIST_INIT(&version->items); 354 memcpy(&version->value, value, sizeof(version->value)); 355 version->index = index; 356 version->vote_count = 0; 357 QLIST_INSERT_HEAD(&votes->vote_list, version, next); 358 } 359 360 version->vote_count++; 361 362 item = g_new0(QuorumVoteItem, 1); 363 item->index = index; 364 QLIST_INSERT_HEAD(&version->items, item, next); 365 } 366 367 static void quorum_free_vote_list(QuorumVotes *votes) 368 { 369 QuorumVoteVersion *version, *next_version; 370 QuorumVoteItem *item, *next_item; 371 372 QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) { 373 QLIST_REMOVE(version, next); 374 QLIST_FOREACH_SAFE(item, &version->items, next, next_item) { 375 QLIST_REMOVE(item, next); 376 g_free(item); 377 } 378 g_free(version); 379 } 380 } 381 382 static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash) 383 { 384 QEMUIOVector *qiov = &acb->qcrs[i].qiov; 385 size_t len = sizeof(hash->h); 386 uint8_t *data = hash->h; 387 388 /* XXX - would be nice if we could pass in the Error ** 389 * and propagate that back, but this quorum code is 390 * restricted to just errno values currently */ 391 if (qcrypto_hash_bytesv(QCRYPTO_HASH_ALG_SHA256, 392 qiov->iov, qiov->niov, 393 &data, &len, 394 NULL) < 0) { 395 return -EINVAL; 396 } 397 398 return 0; 399 } 400 401 static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes) 402 { 403 int max = 0; 404 QuorumVoteVersion *candidate, *winner = NULL; 405 406 QLIST_FOREACH(candidate, &votes->vote_list, next) { 407 if (candidate->vote_count > max) { 408 max = candidate->vote_count; 409 winner = candidate; 410 } 411 } 412 413 return winner; 414 } 415 416 /* qemu_iovec_compare is handy for blkverify mode because it returns the first 417 * differing byte location. Yet it is handcoded to compare vectors one byte 418 * after another so it does not benefit from the libc SIMD optimizations. 419 * quorum_iovec_compare is written for speed and should be used in the non 420 * blkverify mode of quorum. 421 */ 422 static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b) 423 { 424 int i; 425 int result; 426 427 assert(a->niov == b->niov); 428 for (i = 0; i < a->niov; i++) { 429 assert(a->iov[i].iov_len == b->iov[i].iov_len); 430 result = memcmp(a->iov[i].iov_base, 431 b->iov[i].iov_base, 432 a->iov[i].iov_len); 433 if (result) { 434 return false; 435 } 436 } 437 438 return true; 439 } 440 441 static bool quorum_compare(QuorumAIOCB *acb, QEMUIOVector *a, QEMUIOVector *b) 442 { 443 BDRVQuorumState *s = acb->bs->opaque; 444 ssize_t offset; 445 446 /* This driver will replace blkverify in this particular case */ 447 if (s->is_blkverify) { 448 offset = qemu_iovec_compare(a, b); 449 if (offset != -1) { 450 fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64 451 " contents mismatch at offset %" PRIu64 "\n", 452 acb->offset, acb->bytes, acb->offset + offset); 453 exit(1); 454 } 455 return true; 456 } 457 458 return quorum_iovec_compare(a, b); 459 } 460 461 /* Do a vote to get the error code */ 462 static int quorum_vote_error(QuorumAIOCB *acb) 463 { 464 BDRVQuorumState *s = acb->bs->opaque; 465 QuorumVoteVersion *winner = NULL; 466 QuorumVotes error_votes; 467 QuorumVoteValue result_value; 468 int i, ret = 0; 469 bool error = false; 470 471 QLIST_INIT(&error_votes.vote_list); 472 error_votes.compare = quorum_64bits_compare; 473 474 for (i = 0; i < s->num_children; i++) { 475 ret = acb->qcrs[i].ret; 476 if (ret) { 477 error = true; 478 result_value.l = ret; 479 quorum_count_vote(&error_votes, &result_value, i); 480 } 481 } 482 483 if (error) { 484 winner = quorum_get_vote_winner(&error_votes); 485 ret = winner->value.l; 486 } 487 488 quorum_free_vote_list(&error_votes); 489 490 return ret; 491 } 492 493 static void quorum_vote(QuorumAIOCB *acb) 494 { 495 bool quorum = true; 496 int i, j, ret; 497 QuorumVoteValue hash; 498 BDRVQuorumState *s = acb->bs->opaque; 499 QuorumVoteVersion *winner; 500 501 if (quorum_has_too_much_io_failed(acb)) { 502 return; 503 } 504 505 /* get the index of the first successful read */ 506 for (i = 0; i < s->num_children; i++) { 507 if (!acb->qcrs[i].ret) { 508 break; 509 } 510 } 511 512 assert(i < s->num_children); 513 514 /* compare this read with all other successful reads stopping at quorum 515 * failure 516 */ 517 for (j = i + 1; j < s->num_children; j++) { 518 if (acb->qcrs[j].ret) { 519 continue; 520 } 521 quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov); 522 if (!quorum) { 523 break; 524 } 525 } 526 527 /* Every successful read agrees */ 528 if (quorum) { 529 quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov); 530 return; 531 } 532 533 /* compute hashes for each successful read, also store indexes */ 534 for (i = 0; i < s->num_children; i++) { 535 if (acb->qcrs[i].ret) { 536 continue; 537 } 538 ret = quorum_compute_hash(acb, i, &hash); 539 /* if ever the hash computation failed */ 540 if (ret < 0) { 541 acb->vote_ret = ret; 542 goto free_exit; 543 } 544 quorum_count_vote(&acb->votes, &hash, i); 545 } 546 547 /* vote to select the most represented version */ 548 winner = quorum_get_vote_winner(&acb->votes); 549 550 /* if the winner count is smaller than threshold the read fails */ 551 if (winner->vote_count < s->threshold) { 552 quorum_report_failure(acb); 553 acb->vote_ret = -EIO; 554 goto free_exit; 555 } 556 557 /* we have a winner: copy it */ 558 quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov); 559 560 /* some versions are bad print them */ 561 quorum_report_bad_versions(s, acb, &winner->value); 562 563 /* corruption correction is enabled */ 564 if (s->rewrite_corrupted) { 565 quorum_rewrite_bad_versions(acb, &winner->value); 566 } 567 568 free_exit: 569 /* free lists */ 570 quorum_free_vote_list(&acb->votes); 571 } 572 573 static void read_quorum_children_entry(void *opaque) 574 { 575 QuorumCo *co = opaque; 576 QuorumAIOCB *acb = co->acb; 577 BDRVQuorumState *s = acb->bs->opaque; 578 int i = co->idx; 579 QuorumChildRequest *sacb = &acb->qcrs[i]; 580 581 sacb->bs = s->children[i]->bs; 582 sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes, 583 &acb->qcrs[i].qiov, 0); 584 585 if (sacb->ret == 0) { 586 acb->success_count++; 587 } else { 588 quorum_report_bad_acb(sacb, sacb->ret); 589 } 590 591 acb->count++; 592 assert(acb->count <= s->num_children); 593 assert(acb->success_count <= s->num_children); 594 595 /* Wake up the caller after the last read */ 596 if (acb->count == s->num_children) { 597 qemu_coroutine_enter_if_inactive(acb->co); 598 } 599 } 600 601 static int read_quorum_children(QuorumAIOCB *acb) 602 { 603 BDRVQuorumState *s = acb->bs->opaque; 604 int i; 605 606 acb->children_read = s->num_children; 607 for (i = 0; i < s->num_children; i++) { 608 acb->qcrs[i].buf = qemu_blockalign(s->children[i]->bs, acb->qiov->size); 609 qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov); 610 qemu_iovec_clone(&acb->qcrs[i].qiov, acb->qiov, acb->qcrs[i].buf); 611 } 612 613 for (i = 0; i < s->num_children; i++) { 614 Coroutine *co; 615 QuorumCo data = { 616 .acb = acb, 617 .idx = i, 618 }; 619 620 co = qemu_coroutine_create(read_quorum_children_entry, &data); 621 qemu_coroutine_enter(co); 622 } 623 624 while (acb->count < s->num_children) { 625 qemu_coroutine_yield(); 626 } 627 628 /* Do the vote on read */ 629 quorum_vote(acb); 630 for (i = 0; i < s->num_children; i++) { 631 qemu_vfree(acb->qcrs[i].buf); 632 qemu_iovec_destroy(&acb->qcrs[i].qiov); 633 } 634 635 while (acb->rewrite_count) { 636 qemu_coroutine_yield(); 637 } 638 639 return acb->vote_ret; 640 } 641 642 static int read_fifo_child(QuorumAIOCB *acb) 643 { 644 BDRVQuorumState *s = acb->bs->opaque; 645 int n, ret; 646 647 /* We try to read the next child in FIFO order if we failed to read */ 648 do { 649 n = acb->children_read++; 650 acb->qcrs[n].bs = s->children[n]->bs; 651 ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes, 652 acb->qiov, 0); 653 if (ret < 0) { 654 quorum_report_bad_acb(&acb->qcrs[n], ret); 655 } 656 } while (ret < 0 && acb->children_read < s->num_children); 657 658 /* FIXME: rewrite failed children if acb->children_read > 1? */ 659 660 return ret; 661 } 662 663 static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset, 664 uint64_t bytes, QEMUIOVector *qiov, int flags) 665 { 666 BDRVQuorumState *s = bs->opaque; 667 QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags); 668 int ret; 669 670 acb->is_read = true; 671 acb->children_read = 0; 672 673 if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) { 674 ret = read_quorum_children(acb); 675 } else { 676 ret = read_fifo_child(acb); 677 } 678 quorum_aio_finalize(acb); 679 680 return ret; 681 } 682 683 static void write_quorum_entry(void *opaque) 684 { 685 QuorumCo *co = opaque; 686 QuorumAIOCB *acb = co->acb; 687 BDRVQuorumState *s = acb->bs->opaque; 688 int i = co->idx; 689 QuorumChildRequest *sacb = &acb->qcrs[i]; 690 691 sacb->bs = s->children[i]->bs; 692 sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes, 693 acb->qiov, acb->flags); 694 if (sacb->ret == 0) { 695 acb->success_count++; 696 } else { 697 quorum_report_bad_acb(sacb, sacb->ret); 698 } 699 acb->count++; 700 assert(acb->count <= s->num_children); 701 assert(acb->success_count <= s->num_children); 702 703 /* Wake up the caller after the last write */ 704 if (acb->count == s->num_children) { 705 qemu_coroutine_enter_if_inactive(acb->co); 706 } 707 } 708 709 static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset, 710 uint64_t bytes, QEMUIOVector *qiov, int flags) 711 { 712 BDRVQuorumState *s = bs->opaque; 713 QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags); 714 int i, ret; 715 716 for (i = 0; i < s->num_children; i++) { 717 Coroutine *co; 718 QuorumCo data = { 719 .acb = acb, 720 .idx = i, 721 }; 722 723 co = qemu_coroutine_create(write_quorum_entry, &data); 724 qemu_coroutine_enter(co); 725 } 726 727 while (acb->count < s->num_children) { 728 qemu_coroutine_yield(); 729 } 730 731 quorum_has_too_much_io_failed(acb); 732 733 ret = acb->vote_ret; 734 quorum_aio_finalize(acb); 735 736 return ret; 737 } 738 739 static int64_t quorum_getlength(BlockDriverState *bs) 740 { 741 BDRVQuorumState *s = bs->opaque; 742 int64_t result; 743 int i; 744 745 /* check that all file have the same length */ 746 result = bdrv_getlength(s->children[0]->bs); 747 if (result < 0) { 748 return result; 749 } 750 for (i = 1; i < s->num_children; i++) { 751 int64_t value = bdrv_getlength(s->children[i]->bs); 752 if (value < 0) { 753 return value; 754 } 755 if (value != result) { 756 return -EIO; 757 } 758 } 759 760 return result; 761 } 762 763 static coroutine_fn int quorum_co_flush(BlockDriverState *bs) 764 { 765 BDRVQuorumState *s = bs->opaque; 766 QuorumVoteVersion *winner = NULL; 767 QuorumVotes error_votes; 768 QuorumVoteValue result_value; 769 int i; 770 int result = 0; 771 int success_count = 0; 772 773 QLIST_INIT(&error_votes.vote_list); 774 error_votes.compare = quorum_64bits_compare; 775 776 for (i = 0; i < s->num_children; i++) { 777 result = bdrv_co_flush(s->children[i]->bs); 778 if (result) { 779 quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0, 0, 780 s->children[i]->bs->node_name, result); 781 result_value.l = result; 782 quorum_count_vote(&error_votes, &result_value, i); 783 } else { 784 success_count++; 785 } 786 } 787 788 if (success_count >= s->threshold) { 789 result = 0; 790 } else { 791 winner = quorum_get_vote_winner(&error_votes); 792 result = winner->value.l; 793 } 794 quorum_free_vote_list(&error_votes); 795 796 return result; 797 } 798 799 static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs, 800 BlockDriverState *candidate) 801 { 802 BDRVQuorumState *s = bs->opaque; 803 int i; 804 805 for (i = 0; i < s->num_children; i++) { 806 bool perm = bdrv_recurse_is_first_non_filter(s->children[i]->bs, 807 candidate); 808 if (perm) { 809 return true; 810 } 811 } 812 813 return false; 814 } 815 816 static int quorum_valid_threshold(int threshold, int num_children, Error **errp) 817 { 818 819 if (threshold < 1) { 820 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, 821 "vote-threshold", "value >= 1"); 822 return -ERANGE; 823 } 824 825 if (threshold > num_children) { 826 error_setg(errp, "threshold may not exceed children count"); 827 return -ERANGE; 828 } 829 830 return 0; 831 } 832 833 static QemuOptsList quorum_runtime_opts = { 834 .name = "quorum", 835 .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head), 836 .desc = { 837 { 838 .name = QUORUM_OPT_VOTE_THRESHOLD, 839 .type = QEMU_OPT_NUMBER, 840 .help = "The number of vote needed for reaching quorum", 841 }, 842 { 843 .name = QUORUM_OPT_BLKVERIFY, 844 .type = QEMU_OPT_BOOL, 845 .help = "Trigger block verify mode if set", 846 }, 847 { 848 .name = QUORUM_OPT_REWRITE, 849 .type = QEMU_OPT_BOOL, 850 .help = "Rewrite corrupted block on read quorum", 851 }, 852 { 853 .name = QUORUM_OPT_READ_PATTERN, 854 .type = QEMU_OPT_STRING, 855 .help = "Allowed pattern: quorum, fifo. Quorum is default", 856 }, 857 { /* end of list */ } 858 }, 859 }; 860 861 static int quorum_open(BlockDriverState *bs, QDict *options, int flags, 862 Error **errp) 863 { 864 BDRVQuorumState *s = bs->opaque; 865 Error *local_err = NULL; 866 QemuOpts *opts = NULL; 867 const char *pattern_str; 868 bool *opened; 869 int i; 870 int ret = 0; 871 872 qdict_flatten(options); 873 874 /* count how many different children are present */ 875 s->num_children = qdict_array_entries(options, "children."); 876 if (s->num_children < 0) { 877 error_setg(&local_err, "Option children is not a valid array"); 878 ret = -EINVAL; 879 goto exit; 880 } 881 if (s->num_children < 1) { 882 error_setg(&local_err, 883 "Number of provided children must be 1 or more"); 884 ret = -EINVAL; 885 goto exit; 886 } 887 888 opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort); 889 qemu_opts_absorb_qdict(opts, options, &local_err); 890 if (local_err) { 891 ret = -EINVAL; 892 goto exit; 893 } 894 895 s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0); 896 /* and validate it against s->num_children */ 897 ret = quorum_valid_threshold(s->threshold, s->num_children, &local_err); 898 if (ret < 0) { 899 goto exit; 900 } 901 902 pattern_str = qemu_opt_get(opts, QUORUM_OPT_READ_PATTERN); 903 if (!pattern_str) { 904 ret = QUORUM_READ_PATTERN_QUORUM; 905 } else { 906 ret = qapi_enum_parse(&QuorumReadPattern_lookup, pattern_str, 907 -EINVAL, NULL); 908 } 909 if (ret < 0) { 910 error_setg(&local_err, "Please set read-pattern as fifo or quorum"); 911 goto exit; 912 } 913 s->read_pattern = ret; 914 915 if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) { 916 s->is_blkverify = qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false); 917 if (s->is_blkverify && (s->num_children != 2 || s->threshold != 2)) { 918 error_setg(&local_err, "blkverify=on can only be set if there are " 919 "exactly two files and vote-threshold is 2"); 920 ret = -EINVAL; 921 goto exit; 922 } 923 924 s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE, 925 false); 926 if (s->rewrite_corrupted && s->is_blkverify) { 927 error_setg(&local_err, 928 "rewrite-corrupted=on cannot be used with blkverify=on"); 929 ret = -EINVAL; 930 goto exit; 931 } 932 } 933 934 /* allocate the children array */ 935 s->children = g_new0(BdrvChild *, s->num_children); 936 opened = g_new0(bool, s->num_children); 937 938 for (i = 0; i < s->num_children; i++) { 939 char indexstr[32]; 940 ret = snprintf(indexstr, 32, "children.%d", i); 941 assert(ret < 32); 942 943 s->children[i] = bdrv_open_child(NULL, options, indexstr, bs, 944 &child_format, false, &local_err); 945 if (local_err) { 946 ret = -EINVAL; 947 goto close_exit; 948 } 949 950 opened[i] = true; 951 } 952 s->next_child_index = s->num_children; 953 954 bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED; 955 956 g_free(opened); 957 goto exit; 958 959 close_exit: 960 /* cleanup on error */ 961 for (i = 0; i < s->num_children; i++) { 962 if (!opened[i]) { 963 continue; 964 } 965 bdrv_unref_child(bs, s->children[i]); 966 } 967 g_free(s->children); 968 g_free(opened); 969 exit: 970 qemu_opts_del(opts); 971 /* propagate error */ 972 error_propagate(errp, local_err); 973 return ret; 974 } 975 976 static void quorum_close(BlockDriverState *bs) 977 { 978 BDRVQuorumState *s = bs->opaque; 979 int i; 980 981 for (i = 0; i < s->num_children; i++) { 982 bdrv_unref_child(bs, s->children[i]); 983 } 984 985 g_free(s->children); 986 } 987 988 static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs, 989 Error **errp) 990 { 991 BDRVQuorumState *s = bs->opaque; 992 BdrvChild *child; 993 char indexstr[32]; 994 int ret; 995 996 if (s->is_blkverify) { 997 error_setg(errp, "Cannot add a child to a quorum in blkverify mode"); 998 return; 999 } 1000 1001 assert(s->num_children <= INT_MAX / sizeof(BdrvChild *)); 1002 if (s->num_children == INT_MAX / sizeof(BdrvChild *) || 1003 s->next_child_index == UINT_MAX) { 1004 error_setg(errp, "Too many children"); 1005 return; 1006 } 1007 1008 ret = snprintf(indexstr, 32, "children.%u", s->next_child_index); 1009 if (ret < 0 || ret >= 32) { 1010 error_setg(errp, "cannot generate child name"); 1011 return; 1012 } 1013 s->next_child_index++; 1014 1015 bdrv_drained_begin(bs); 1016 1017 /* We can safely add the child now */ 1018 bdrv_ref(child_bs); 1019 1020 child = bdrv_attach_child(bs, child_bs, indexstr, &child_format, errp); 1021 if (child == NULL) { 1022 s->next_child_index--; 1023 goto out; 1024 } 1025 s->children = g_renew(BdrvChild *, s->children, s->num_children + 1); 1026 s->children[s->num_children++] = child; 1027 1028 out: 1029 bdrv_drained_end(bs); 1030 } 1031 1032 static void quorum_del_child(BlockDriverState *bs, BdrvChild *child, 1033 Error **errp) 1034 { 1035 BDRVQuorumState *s = bs->opaque; 1036 int i; 1037 1038 for (i = 0; i < s->num_children; i++) { 1039 if (s->children[i] == child) { 1040 break; 1041 } 1042 } 1043 1044 /* we have checked it in bdrv_del_child() */ 1045 assert(i < s->num_children); 1046 1047 if (s->num_children <= s->threshold) { 1048 error_setg(errp, 1049 "The number of children cannot be lower than the vote threshold %d", 1050 s->threshold); 1051 return; 1052 } 1053 1054 /* We know now that num_children > threshold, so blkverify must be false */ 1055 assert(!s->is_blkverify); 1056 1057 bdrv_drained_begin(bs); 1058 1059 /* We can safely remove this child now */ 1060 memmove(&s->children[i], &s->children[i + 1], 1061 (s->num_children - i - 1) * sizeof(BdrvChild *)); 1062 s->children = g_renew(BdrvChild *, s->children, --s->num_children); 1063 bdrv_unref_child(bs, child); 1064 1065 bdrv_drained_end(bs); 1066 } 1067 1068 static void quorum_gather_child_options(BlockDriverState *bs, QDict *target, 1069 bool backing_overridden) 1070 { 1071 BDRVQuorumState *s = bs->opaque; 1072 QList *children_list; 1073 int i; 1074 1075 /* 1076 * The generic implementation for gathering child options in 1077 * bdrv_refresh_filename() would use the names of the children 1078 * as specified for bdrv_open_child() or bdrv_attach_child(), 1079 * which is "children.%u" with %u being a value 1080 * (s->next_child_index) that is incremented each time a new child 1081 * is added (and never decremented). Since children can be 1082 * deleted at runtime, there may be gaps in that enumeration. 1083 * When creating a new quorum BDS and specifying the children for 1084 * it through runtime options, the enumeration used there may not 1085 * have any gaps, though. 1086 * 1087 * Therefore, we have to create a new gap-less enumeration here 1088 * (which we can achieve by simply putting all of the children's 1089 * full_open_options into a QList). 1090 * 1091 * XXX: Note that there are issues with the current child option 1092 * structure quorum uses (such as the fact that children do 1093 * not really have unique permanent names). Therefore, this 1094 * is going to have to change in the future and ideally we 1095 * want quorum to be covered by the generic implementation. 1096 */ 1097 1098 children_list = qlist_new(); 1099 qdict_put(target, "children", children_list); 1100 1101 for (i = 0; i < s->num_children; i++) { 1102 qlist_append(children_list, 1103 qobject_ref(s->children[i]->bs->full_open_options)); 1104 } 1105 } 1106 1107 static char *quorum_dirname(BlockDriverState *bs, Error **errp) 1108 { 1109 /* In general, there are multiple BDSs with different dirnames below this 1110 * one; so there is no unique dirname we could return (unless all are equal 1111 * by chance, or there is only one). Therefore, to be consistent, just 1112 * always return NULL. */ 1113 error_setg(errp, "Cannot generate a base directory for quorum nodes"); 1114 return NULL; 1115 } 1116 1117 static const char *const quorum_strong_runtime_opts[] = { 1118 QUORUM_OPT_VOTE_THRESHOLD, 1119 QUORUM_OPT_BLKVERIFY, 1120 QUORUM_OPT_REWRITE, 1121 QUORUM_OPT_READ_PATTERN, 1122 1123 NULL 1124 }; 1125 1126 static BlockDriver bdrv_quorum = { 1127 .format_name = "quorum", 1128 1129 .instance_size = sizeof(BDRVQuorumState), 1130 1131 .bdrv_open = quorum_open, 1132 .bdrv_close = quorum_close, 1133 .bdrv_gather_child_options = quorum_gather_child_options, 1134 .bdrv_dirname = quorum_dirname, 1135 1136 .bdrv_co_flush_to_disk = quorum_co_flush, 1137 1138 .bdrv_getlength = quorum_getlength, 1139 1140 .bdrv_co_preadv = quorum_co_preadv, 1141 .bdrv_co_pwritev = quorum_co_pwritev, 1142 1143 .bdrv_add_child = quorum_add_child, 1144 .bdrv_del_child = quorum_del_child, 1145 1146 .bdrv_child_perm = bdrv_filter_default_perms, 1147 1148 .is_filter = true, 1149 .bdrv_recurse_is_first_non_filter = quorum_recurse_is_first_non_filter, 1150 1151 .strong_runtime_opts = quorum_strong_runtime_opts, 1152 }; 1153 1154 static void bdrv_quorum_init(void) 1155 { 1156 if (!qcrypto_hash_supports(QCRYPTO_HASH_ALG_SHA256)) { 1157 /* SHA256 hash support is required for quorum device */ 1158 return; 1159 } 1160 bdrv_register(&bdrv_quorum); 1161 } 1162 1163 block_init(bdrv_quorum_init); 1164