1 /* 2 * Quorum Block filter 3 * 4 * Copyright (C) 2012-2014 Nodalink, EURL. 5 * 6 * Author: 7 * Benoît Canet <benoit.canet@irqsave.net> 8 * 9 * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp) 10 * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc). 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or later. 13 * See the COPYING file in the top-level directory. 14 */ 15 16 #include "qemu/osdep.h" 17 #include "qemu/cutils.h" 18 #include "qemu/option.h" 19 #include "block/block_int.h" 20 #include "block/qdict.h" 21 #include "qapi/error.h" 22 #include "qapi/qapi-events-block.h" 23 #include "qapi/qmp/qdict.h" 24 #include "qapi/qmp/qerror.h" 25 #include "qapi/qmp/qlist.h" 26 #include "qapi/qmp/qstring.h" 27 #include "crypto/hash.h" 28 29 #define HASH_LENGTH 32 30 31 #define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold" 32 #define QUORUM_OPT_BLKVERIFY "blkverify" 33 #define QUORUM_OPT_REWRITE "rewrite-corrupted" 34 #define QUORUM_OPT_READ_PATTERN "read-pattern" 35 36 /* This union holds a vote hash value */ 37 typedef union QuorumVoteValue { 38 uint8_t h[HASH_LENGTH]; /* SHA-256 hash */ 39 int64_t l; /* simpler 64 bits hash */ 40 } QuorumVoteValue; 41 42 /* A vote item */ 43 typedef struct QuorumVoteItem { 44 int index; 45 QLIST_ENTRY(QuorumVoteItem) next; 46 } QuorumVoteItem; 47 48 /* this structure is a vote version. A version is the set of votes sharing the 49 * same vote value. 50 * The set of votes will be tracked with the items field and its cardinality is 51 * vote_count. 52 */ 53 typedef struct QuorumVoteVersion { 54 QuorumVoteValue value; 55 int index; 56 int vote_count; 57 QLIST_HEAD(, QuorumVoteItem) items; 58 QLIST_ENTRY(QuorumVoteVersion) next; 59 } QuorumVoteVersion; 60 61 /* this structure holds a group of vote versions together */ 62 typedef struct QuorumVotes { 63 QLIST_HEAD(, QuorumVoteVersion) vote_list; 64 bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b); 65 } QuorumVotes; 66 67 /* the following structure holds the state of one quorum instance */ 68 typedef struct BDRVQuorumState { 69 BdrvChild **children; /* children BlockDriverStates */ 70 int num_children; /* children count */ 71 unsigned next_child_index; /* the index of the next child that should 72 * be added 73 */ 74 int threshold; /* if less than threshold children reads gave the 75 * same result a quorum error occurs. 76 */ 77 bool is_blkverify; /* true if the driver is in blkverify mode 78 * Writes are mirrored on two children devices. 79 * On reads the two children devices' contents are 80 * compared and if a difference is spotted its 81 * location is printed and the code aborts. 82 * It is useful to debug other block drivers by 83 * comparing them with a reference one. 84 */ 85 bool rewrite_corrupted;/* true if the driver must rewrite-on-read corrupted 86 * block if Quorum is reached. 87 */ 88 89 QuorumReadPattern read_pattern; 90 } BDRVQuorumState; 91 92 typedef struct QuorumAIOCB QuorumAIOCB; 93 94 /* Quorum will create one instance of the following structure per operation it 95 * performs on its children. 96 * So for each read/write operation coming from the upper layer there will be 97 * $children_count QuorumChildRequest. 98 */ 99 typedef struct QuorumChildRequest { 100 BlockDriverState *bs; 101 QEMUIOVector qiov; 102 uint8_t *buf; 103 int ret; 104 QuorumAIOCB *parent; 105 } QuorumChildRequest; 106 107 /* Quorum will use the following structure to track progress of each read/write 108 * operation received by the upper layer. 109 * This structure hold pointers to the QuorumChildRequest structures instances 110 * used to do operations on each children and track overall progress. 111 */ 112 struct QuorumAIOCB { 113 BlockDriverState *bs; 114 Coroutine *co; 115 116 /* Request metadata */ 117 uint64_t offset; 118 uint64_t bytes; 119 int flags; 120 121 QEMUIOVector *qiov; /* calling IOV */ 122 123 QuorumChildRequest *qcrs; /* individual child requests */ 124 int count; /* number of completed AIOCB */ 125 int success_count; /* number of successfully completed AIOCB */ 126 127 int rewrite_count; /* number of replica to rewrite: count down to 128 * zero once writes are fired 129 */ 130 131 QuorumVotes votes; 132 133 bool is_read; 134 int vote_ret; 135 int children_read; /* how many children have been read from */ 136 }; 137 138 typedef struct QuorumCo { 139 QuorumAIOCB *acb; 140 int idx; 141 } QuorumCo; 142 143 static void quorum_aio_finalize(QuorumAIOCB *acb) 144 { 145 g_free(acb->qcrs); 146 g_free(acb); 147 } 148 149 static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b) 150 { 151 return !memcmp(a->h, b->h, HASH_LENGTH); 152 } 153 154 static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b) 155 { 156 return a->l == b->l; 157 } 158 159 static QuorumAIOCB *quorum_aio_get(BlockDriverState *bs, 160 QEMUIOVector *qiov, 161 uint64_t offset, 162 uint64_t bytes, 163 int flags) 164 { 165 BDRVQuorumState *s = bs->opaque; 166 QuorumAIOCB *acb = g_new(QuorumAIOCB, 1); 167 int i; 168 169 *acb = (QuorumAIOCB) { 170 .co = qemu_coroutine_self(), 171 .bs = bs, 172 .offset = offset, 173 .bytes = bytes, 174 .flags = flags, 175 .qiov = qiov, 176 .votes.compare = quorum_sha256_compare, 177 .votes.vote_list = QLIST_HEAD_INITIALIZER(acb.votes.vote_list), 178 }; 179 180 acb->qcrs = g_new0(QuorumChildRequest, s->num_children); 181 for (i = 0; i < s->num_children; i++) { 182 acb->qcrs[i].buf = NULL; 183 acb->qcrs[i].ret = 0; 184 acb->qcrs[i].parent = acb; 185 } 186 187 return acb; 188 } 189 190 static void quorum_report_bad(QuorumOpType type, uint64_t offset, 191 uint64_t bytes, char *node_name, int ret) 192 { 193 const char *msg = NULL; 194 int64_t start_sector = offset / BDRV_SECTOR_SIZE; 195 int64_t end_sector = DIV_ROUND_UP(offset + bytes, BDRV_SECTOR_SIZE); 196 197 if (ret < 0) { 198 msg = strerror(-ret); 199 } 200 201 qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, start_sector, 202 end_sector - start_sector); 203 } 204 205 static void quorum_report_failure(QuorumAIOCB *acb) 206 { 207 const char *reference = bdrv_get_device_or_node_name(acb->bs); 208 int64_t start_sector = acb->offset / BDRV_SECTOR_SIZE; 209 int64_t end_sector = DIV_ROUND_UP(acb->offset + acb->bytes, 210 BDRV_SECTOR_SIZE); 211 212 qapi_event_send_quorum_failure(reference, start_sector, 213 end_sector - start_sector); 214 } 215 216 static int quorum_vote_error(QuorumAIOCB *acb); 217 218 static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb) 219 { 220 BDRVQuorumState *s = acb->bs->opaque; 221 222 if (acb->success_count < s->threshold) { 223 acb->vote_ret = quorum_vote_error(acb); 224 quorum_report_failure(acb); 225 return true; 226 } 227 228 return false; 229 } 230 231 static int read_fifo_child(QuorumAIOCB *acb); 232 233 static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source) 234 { 235 int i; 236 assert(dest->niov == source->niov); 237 assert(dest->size == source->size); 238 for (i = 0; i < source->niov; i++) { 239 assert(dest->iov[i].iov_len == source->iov[i].iov_len); 240 memcpy(dest->iov[i].iov_base, 241 source->iov[i].iov_base, 242 source->iov[i].iov_len); 243 } 244 } 245 246 static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret) 247 { 248 QuorumAIOCB *acb = sacb->parent; 249 QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE; 250 quorum_report_bad(type, acb->offset, acb->bytes, sacb->bs->node_name, ret); 251 } 252 253 static void quorum_report_bad_versions(BDRVQuorumState *s, 254 QuorumAIOCB *acb, 255 QuorumVoteValue *value) 256 { 257 QuorumVoteVersion *version; 258 QuorumVoteItem *item; 259 260 QLIST_FOREACH(version, &acb->votes.vote_list, next) { 261 if (acb->votes.compare(&version->value, value)) { 262 continue; 263 } 264 QLIST_FOREACH(item, &version->items, next) { 265 quorum_report_bad(QUORUM_OP_TYPE_READ, acb->offset, acb->bytes, 266 s->children[item->index]->bs->node_name, 0); 267 } 268 } 269 } 270 271 static void quorum_rewrite_entry(void *opaque) 272 { 273 QuorumCo *co = opaque; 274 QuorumAIOCB *acb = co->acb; 275 BDRVQuorumState *s = acb->bs->opaque; 276 277 /* Ignore any errors, it's just a correction attempt for already 278 * corrupted data. 279 * Mask out BDRV_REQ_WRITE_UNCHANGED because this overwrites the 280 * area with different data from the other children. */ 281 bdrv_co_pwritev(s->children[co->idx], acb->offset, acb->bytes, 282 acb->qiov, acb->flags & ~BDRV_REQ_WRITE_UNCHANGED); 283 284 /* Wake up the caller after the last rewrite */ 285 acb->rewrite_count--; 286 if (!acb->rewrite_count) { 287 qemu_coroutine_enter_if_inactive(acb->co); 288 } 289 } 290 291 static bool quorum_rewrite_bad_versions(QuorumAIOCB *acb, 292 QuorumVoteValue *value) 293 { 294 QuorumVoteVersion *version; 295 QuorumVoteItem *item; 296 int count = 0; 297 298 /* first count the number of bad versions: done first to avoid concurrency 299 * issues. 300 */ 301 QLIST_FOREACH(version, &acb->votes.vote_list, next) { 302 if (acb->votes.compare(&version->value, value)) { 303 continue; 304 } 305 QLIST_FOREACH(item, &version->items, next) { 306 count++; 307 } 308 } 309 310 /* quorum_rewrite_entry will count down this to zero */ 311 acb->rewrite_count = count; 312 313 /* now fire the correcting rewrites */ 314 QLIST_FOREACH(version, &acb->votes.vote_list, next) { 315 if (acb->votes.compare(&version->value, value)) { 316 continue; 317 } 318 QLIST_FOREACH(item, &version->items, next) { 319 Coroutine *co; 320 QuorumCo data = { 321 .acb = acb, 322 .idx = item->index, 323 }; 324 325 co = qemu_coroutine_create(quorum_rewrite_entry, &data); 326 qemu_coroutine_enter(co); 327 } 328 } 329 330 /* return true if any rewrite is done else false */ 331 return count; 332 } 333 334 static void quorum_count_vote(QuorumVotes *votes, 335 QuorumVoteValue *value, 336 int index) 337 { 338 QuorumVoteVersion *v = NULL, *version = NULL; 339 QuorumVoteItem *item; 340 341 /* look if we have something with this hash */ 342 QLIST_FOREACH(v, &votes->vote_list, next) { 343 if (votes->compare(&v->value, value)) { 344 version = v; 345 break; 346 } 347 } 348 349 /* It's a version not yet in the list add it */ 350 if (!version) { 351 version = g_new0(QuorumVoteVersion, 1); 352 QLIST_INIT(&version->items); 353 memcpy(&version->value, value, sizeof(version->value)); 354 version->index = index; 355 version->vote_count = 0; 356 QLIST_INSERT_HEAD(&votes->vote_list, version, next); 357 } 358 359 version->vote_count++; 360 361 item = g_new0(QuorumVoteItem, 1); 362 item->index = index; 363 QLIST_INSERT_HEAD(&version->items, item, next); 364 } 365 366 static void quorum_free_vote_list(QuorumVotes *votes) 367 { 368 QuorumVoteVersion *version, *next_version; 369 QuorumVoteItem *item, *next_item; 370 371 QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) { 372 QLIST_REMOVE(version, next); 373 QLIST_FOREACH_SAFE(item, &version->items, next, next_item) { 374 QLIST_REMOVE(item, next); 375 g_free(item); 376 } 377 g_free(version); 378 } 379 } 380 381 static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash) 382 { 383 QEMUIOVector *qiov = &acb->qcrs[i].qiov; 384 size_t len = sizeof(hash->h); 385 uint8_t *data = hash->h; 386 387 /* XXX - would be nice if we could pass in the Error ** 388 * and propagate that back, but this quorum code is 389 * restricted to just errno values currently */ 390 if (qcrypto_hash_bytesv(QCRYPTO_HASH_ALG_SHA256, 391 qiov->iov, qiov->niov, 392 &data, &len, 393 NULL) < 0) { 394 return -EINVAL; 395 } 396 397 return 0; 398 } 399 400 static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes) 401 { 402 int max = 0; 403 QuorumVoteVersion *candidate, *winner = NULL; 404 405 QLIST_FOREACH(candidate, &votes->vote_list, next) { 406 if (candidate->vote_count > max) { 407 max = candidate->vote_count; 408 winner = candidate; 409 } 410 } 411 412 return winner; 413 } 414 415 /* qemu_iovec_compare is handy for blkverify mode because it returns the first 416 * differing byte location. Yet it is handcoded to compare vectors one byte 417 * after another so it does not benefit from the libc SIMD optimizations. 418 * quorum_iovec_compare is written for speed and should be used in the non 419 * blkverify mode of quorum. 420 */ 421 static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b) 422 { 423 int i; 424 int result; 425 426 assert(a->niov == b->niov); 427 for (i = 0; i < a->niov; i++) { 428 assert(a->iov[i].iov_len == b->iov[i].iov_len); 429 result = memcmp(a->iov[i].iov_base, 430 b->iov[i].iov_base, 431 a->iov[i].iov_len); 432 if (result) { 433 return false; 434 } 435 } 436 437 return true; 438 } 439 440 static bool quorum_compare(QuorumAIOCB *acb, QEMUIOVector *a, QEMUIOVector *b) 441 { 442 BDRVQuorumState *s = acb->bs->opaque; 443 ssize_t offset; 444 445 /* This driver will replace blkverify in this particular case */ 446 if (s->is_blkverify) { 447 offset = qemu_iovec_compare(a, b); 448 if (offset != -1) { 449 fprintf(stderr, "quorum: offset=%" PRIu64 " bytes=%" PRIu64 450 " contents mismatch at offset %" PRIu64 "\n", 451 acb->offset, acb->bytes, acb->offset + offset); 452 exit(1); 453 } 454 return true; 455 } 456 457 return quorum_iovec_compare(a, b); 458 } 459 460 /* Do a vote to get the error code */ 461 static int quorum_vote_error(QuorumAIOCB *acb) 462 { 463 BDRVQuorumState *s = acb->bs->opaque; 464 QuorumVoteVersion *winner = NULL; 465 QuorumVotes error_votes; 466 QuorumVoteValue result_value; 467 int i, ret = 0; 468 bool error = false; 469 470 QLIST_INIT(&error_votes.vote_list); 471 error_votes.compare = quorum_64bits_compare; 472 473 for (i = 0; i < s->num_children; i++) { 474 ret = acb->qcrs[i].ret; 475 if (ret) { 476 error = true; 477 result_value.l = ret; 478 quorum_count_vote(&error_votes, &result_value, i); 479 } 480 } 481 482 if (error) { 483 winner = quorum_get_vote_winner(&error_votes); 484 ret = winner->value.l; 485 } 486 487 quorum_free_vote_list(&error_votes); 488 489 return ret; 490 } 491 492 static void quorum_vote(QuorumAIOCB *acb) 493 { 494 bool quorum = true; 495 int i, j, ret; 496 QuorumVoteValue hash; 497 BDRVQuorumState *s = acb->bs->opaque; 498 QuorumVoteVersion *winner; 499 500 if (quorum_has_too_much_io_failed(acb)) { 501 return; 502 } 503 504 /* get the index of the first successful read */ 505 for (i = 0; i < s->num_children; i++) { 506 if (!acb->qcrs[i].ret) { 507 break; 508 } 509 } 510 511 assert(i < s->num_children); 512 513 /* compare this read with all other successful reads stopping at quorum 514 * failure 515 */ 516 for (j = i + 1; j < s->num_children; j++) { 517 if (acb->qcrs[j].ret) { 518 continue; 519 } 520 quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov); 521 if (!quorum) { 522 break; 523 } 524 } 525 526 /* Every successful read agrees */ 527 if (quorum) { 528 quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov); 529 return; 530 } 531 532 /* compute hashes for each successful read, also store indexes */ 533 for (i = 0; i < s->num_children; i++) { 534 if (acb->qcrs[i].ret) { 535 continue; 536 } 537 ret = quorum_compute_hash(acb, i, &hash); 538 /* if ever the hash computation failed */ 539 if (ret < 0) { 540 acb->vote_ret = ret; 541 goto free_exit; 542 } 543 quorum_count_vote(&acb->votes, &hash, i); 544 } 545 546 /* vote to select the most represented version */ 547 winner = quorum_get_vote_winner(&acb->votes); 548 549 /* if the winner count is smaller than threshold the read fails */ 550 if (winner->vote_count < s->threshold) { 551 quorum_report_failure(acb); 552 acb->vote_ret = -EIO; 553 goto free_exit; 554 } 555 556 /* we have a winner: copy it */ 557 quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov); 558 559 /* some versions are bad print them */ 560 quorum_report_bad_versions(s, acb, &winner->value); 561 562 /* corruption correction is enabled */ 563 if (s->rewrite_corrupted) { 564 quorum_rewrite_bad_versions(acb, &winner->value); 565 } 566 567 free_exit: 568 /* free lists */ 569 quorum_free_vote_list(&acb->votes); 570 } 571 572 static void read_quorum_children_entry(void *opaque) 573 { 574 QuorumCo *co = opaque; 575 QuorumAIOCB *acb = co->acb; 576 BDRVQuorumState *s = acb->bs->opaque; 577 int i = co->idx; 578 QuorumChildRequest *sacb = &acb->qcrs[i]; 579 580 sacb->bs = s->children[i]->bs; 581 sacb->ret = bdrv_co_preadv(s->children[i], acb->offset, acb->bytes, 582 &acb->qcrs[i].qiov, 0); 583 584 if (sacb->ret == 0) { 585 acb->success_count++; 586 } else { 587 quorum_report_bad_acb(sacb, sacb->ret); 588 } 589 590 acb->count++; 591 assert(acb->count <= s->num_children); 592 assert(acb->success_count <= s->num_children); 593 594 /* Wake up the caller after the last read */ 595 if (acb->count == s->num_children) { 596 qemu_coroutine_enter_if_inactive(acb->co); 597 } 598 } 599 600 static int read_quorum_children(QuorumAIOCB *acb) 601 { 602 BDRVQuorumState *s = acb->bs->opaque; 603 int i; 604 605 acb->children_read = s->num_children; 606 for (i = 0; i < s->num_children; i++) { 607 acb->qcrs[i].buf = qemu_blockalign(s->children[i]->bs, acb->qiov->size); 608 qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov); 609 qemu_iovec_clone(&acb->qcrs[i].qiov, acb->qiov, acb->qcrs[i].buf); 610 } 611 612 for (i = 0; i < s->num_children; i++) { 613 Coroutine *co; 614 QuorumCo data = { 615 .acb = acb, 616 .idx = i, 617 }; 618 619 co = qemu_coroutine_create(read_quorum_children_entry, &data); 620 qemu_coroutine_enter(co); 621 } 622 623 while (acb->count < s->num_children) { 624 qemu_coroutine_yield(); 625 } 626 627 /* Do the vote on read */ 628 quorum_vote(acb); 629 for (i = 0; i < s->num_children; i++) { 630 qemu_vfree(acb->qcrs[i].buf); 631 qemu_iovec_destroy(&acb->qcrs[i].qiov); 632 } 633 634 while (acb->rewrite_count) { 635 qemu_coroutine_yield(); 636 } 637 638 return acb->vote_ret; 639 } 640 641 static int read_fifo_child(QuorumAIOCB *acb) 642 { 643 BDRVQuorumState *s = acb->bs->opaque; 644 int n, ret; 645 646 /* We try to read the next child in FIFO order if we failed to read */ 647 do { 648 n = acb->children_read++; 649 acb->qcrs[n].bs = s->children[n]->bs; 650 ret = bdrv_co_preadv(s->children[n], acb->offset, acb->bytes, 651 acb->qiov, 0); 652 if (ret < 0) { 653 quorum_report_bad_acb(&acb->qcrs[n], ret); 654 } 655 } while (ret < 0 && acb->children_read < s->num_children); 656 657 /* FIXME: rewrite failed children if acb->children_read > 1? */ 658 659 return ret; 660 } 661 662 static int quorum_co_preadv(BlockDriverState *bs, uint64_t offset, 663 uint64_t bytes, QEMUIOVector *qiov, int flags) 664 { 665 BDRVQuorumState *s = bs->opaque; 666 QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags); 667 int ret; 668 669 acb->is_read = true; 670 acb->children_read = 0; 671 672 if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) { 673 ret = read_quorum_children(acb); 674 } else { 675 ret = read_fifo_child(acb); 676 } 677 quorum_aio_finalize(acb); 678 679 return ret; 680 } 681 682 static void write_quorum_entry(void *opaque) 683 { 684 QuorumCo *co = opaque; 685 QuorumAIOCB *acb = co->acb; 686 BDRVQuorumState *s = acb->bs->opaque; 687 int i = co->idx; 688 QuorumChildRequest *sacb = &acb->qcrs[i]; 689 690 sacb->bs = s->children[i]->bs; 691 sacb->ret = bdrv_co_pwritev(s->children[i], acb->offset, acb->bytes, 692 acb->qiov, acb->flags); 693 if (sacb->ret == 0) { 694 acb->success_count++; 695 } else { 696 quorum_report_bad_acb(sacb, sacb->ret); 697 } 698 acb->count++; 699 assert(acb->count <= s->num_children); 700 assert(acb->success_count <= s->num_children); 701 702 /* Wake up the caller after the last write */ 703 if (acb->count == s->num_children) { 704 qemu_coroutine_enter_if_inactive(acb->co); 705 } 706 } 707 708 static int quorum_co_pwritev(BlockDriverState *bs, uint64_t offset, 709 uint64_t bytes, QEMUIOVector *qiov, int flags) 710 { 711 BDRVQuorumState *s = bs->opaque; 712 QuorumAIOCB *acb = quorum_aio_get(bs, qiov, offset, bytes, flags); 713 int i, ret; 714 715 for (i = 0; i < s->num_children; i++) { 716 Coroutine *co; 717 QuorumCo data = { 718 .acb = acb, 719 .idx = i, 720 }; 721 722 co = qemu_coroutine_create(write_quorum_entry, &data); 723 qemu_coroutine_enter(co); 724 } 725 726 while (acb->count < s->num_children) { 727 qemu_coroutine_yield(); 728 } 729 730 quorum_has_too_much_io_failed(acb); 731 732 ret = acb->vote_ret; 733 quorum_aio_finalize(acb); 734 735 return ret; 736 } 737 738 static int64_t quorum_getlength(BlockDriverState *bs) 739 { 740 BDRVQuorumState *s = bs->opaque; 741 int64_t result; 742 int i; 743 744 /* check that all file have the same length */ 745 result = bdrv_getlength(s->children[0]->bs); 746 if (result < 0) { 747 return result; 748 } 749 for (i = 1; i < s->num_children; i++) { 750 int64_t value = bdrv_getlength(s->children[i]->bs); 751 if (value < 0) { 752 return value; 753 } 754 if (value != result) { 755 return -EIO; 756 } 757 } 758 759 return result; 760 } 761 762 static coroutine_fn int quorum_co_flush(BlockDriverState *bs) 763 { 764 BDRVQuorumState *s = bs->opaque; 765 QuorumVoteVersion *winner = NULL; 766 QuorumVotes error_votes; 767 QuorumVoteValue result_value; 768 int i; 769 int result = 0; 770 int success_count = 0; 771 772 QLIST_INIT(&error_votes.vote_list); 773 error_votes.compare = quorum_64bits_compare; 774 775 for (i = 0; i < s->num_children; i++) { 776 result = bdrv_co_flush(s->children[i]->bs); 777 if (result) { 778 quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0, 0, 779 s->children[i]->bs->node_name, result); 780 result_value.l = result; 781 quorum_count_vote(&error_votes, &result_value, i); 782 } else { 783 success_count++; 784 } 785 } 786 787 if (success_count >= s->threshold) { 788 result = 0; 789 } else { 790 winner = quorum_get_vote_winner(&error_votes); 791 result = winner->value.l; 792 } 793 quorum_free_vote_list(&error_votes); 794 795 return result; 796 } 797 798 static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs, 799 BlockDriverState *candidate) 800 { 801 BDRVQuorumState *s = bs->opaque; 802 int i; 803 804 for (i = 0; i < s->num_children; i++) { 805 bool perm = bdrv_recurse_is_first_non_filter(s->children[i]->bs, 806 candidate); 807 if (perm) { 808 return true; 809 } 810 } 811 812 return false; 813 } 814 815 static int quorum_valid_threshold(int threshold, int num_children, Error **errp) 816 { 817 818 if (threshold < 1) { 819 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, 820 "vote-threshold", "value >= 1"); 821 return -ERANGE; 822 } 823 824 if (threshold > num_children) { 825 error_setg(errp, "threshold may not exceed children count"); 826 return -ERANGE; 827 } 828 829 return 0; 830 } 831 832 static QemuOptsList quorum_runtime_opts = { 833 .name = "quorum", 834 .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head), 835 .desc = { 836 { 837 .name = QUORUM_OPT_VOTE_THRESHOLD, 838 .type = QEMU_OPT_NUMBER, 839 .help = "The number of vote needed for reaching quorum", 840 }, 841 { 842 .name = QUORUM_OPT_BLKVERIFY, 843 .type = QEMU_OPT_BOOL, 844 .help = "Trigger block verify mode if set", 845 }, 846 { 847 .name = QUORUM_OPT_REWRITE, 848 .type = QEMU_OPT_BOOL, 849 .help = "Rewrite corrupted block on read quorum", 850 }, 851 { 852 .name = QUORUM_OPT_READ_PATTERN, 853 .type = QEMU_OPT_STRING, 854 .help = "Allowed pattern: quorum, fifo. Quorum is default", 855 }, 856 { /* end of list */ } 857 }, 858 }; 859 860 static int quorum_open(BlockDriverState *bs, QDict *options, int flags, 861 Error **errp) 862 { 863 BDRVQuorumState *s = bs->opaque; 864 Error *local_err = NULL; 865 QemuOpts *opts = NULL; 866 const char *pattern_str; 867 bool *opened; 868 int i; 869 int ret = 0; 870 871 qdict_flatten(options); 872 873 /* count how many different children are present */ 874 s->num_children = qdict_array_entries(options, "children."); 875 if (s->num_children < 0) { 876 error_setg(&local_err, "Option children is not a valid array"); 877 ret = -EINVAL; 878 goto exit; 879 } 880 if (s->num_children < 1) { 881 error_setg(&local_err, 882 "Number of provided children must be 1 or more"); 883 ret = -EINVAL; 884 goto exit; 885 } 886 887 opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort); 888 qemu_opts_absorb_qdict(opts, options, &local_err); 889 if (local_err) { 890 ret = -EINVAL; 891 goto exit; 892 } 893 894 s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0); 895 /* and validate it against s->num_children */ 896 ret = quorum_valid_threshold(s->threshold, s->num_children, &local_err); 897 if (ret < 0) { 898 goto exit; 899 } 900 901 pattern_str = qemu_opt_get(opts, QUORUM_OPT_READ_PATTERN); 902 if (!pattern_str) { 903 ret = QUORUM_READ_PATTERN_QUORUM; 904 } else { 905 ret = qapi_enum_parse(&QuorumReadPattern_lookup, pattern_str, 906 -EINVAL, NULL); 907 } 908 if (ret < 0) { 909 error_setg(&local_err, "Please set read-pattern as fifo or quorum"); 910 goto exit; 911 } 912 s->read_pattern = ret; 913 914 if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) { 915 s->is_blkverify = qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false); 916 if (s->is_blkverify && (s->num_children != 2 || s->threshold != 2)) { 917 error_setg(&local_err, "blkverify=on can only be set if there are " 918 "exactly two files and vote-threshold is 2"); 919 ret = -EINVAL; 920 goto exit; 921 } 922 923 s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE, 924 false); 925 if (s->rewrite_corrupted && s->is_blkverify) { 926 error_setg(&local_err, 927 "rewrite-corrupted=on cannot be used with blkverify=on"); 928 ret = -EINVAL; 929 goto exit; 930 } 931 } 932 933 /* allocate the children array */ 934 s->children = g_new0(BdrvChild *, s->num_children); 935 opened = g_new0(bool, s->num_children); 936 937 for (i = 0; i < s->num_children; i++) { 938 char indexstr[32]; 939 ret = snprintf(indexstr, 32, "children.%d", i); 940 assert(ret < 32); 941 942 s->children[i] = bdrv_open_child(NULL, options, indexstr, bs, 943 &child_format, false, &local_err); 944 if (local_err) { 945 ret = -EINVAL; 946 goto close_exit; 947 } 948 949 opened[i] = true; 950 } 951 s->next_child_index = s->num_children; 952 953 bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED; 954 955 g_free(opened); 956 goto exit; 957 958 close_exit: 959 /* cleanup on error */ 960 for (i = 0; i < s->num_children; i++) { 961 if (!opened[i]) { 962 continue; 963 } 964 bdrv_unref_child(bs, s->children[i]); 965 } 966 g_free(s->children); 967 g_free(opened); 968 exit: 969 qemu_opts_del(opts); 970 /* propagate error */ 971 error_propagate(errp, local_err); 972 return ret; 973 } 974 975 static void quorum_close(BlockDriverState *bs) 976 { 977 BDRVQuorumState *s = bs->opaque; 978 int i; 979 980 for (i = 0; i < s->num_children; i++) { 981 bdrv_unref_child(bs, s->children[i]); 982 } 983 984 g_free(s->children); 985 } 986 987 static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs, 988 Error **errp) 989 { 990 BDRVQuorumState *s = bs->opaque; 991 BdrvChild *child; 992 char indexstr[32]; 993 int ret; 994 995 if (s->is_blkverify) { 996 error_setg(errp, "Cannot add a child to a quorum in blkverify mode"); 997 return; 998 } 999 1000 assert(s->num_children <= INT_MAX / sizeof(BdrvChild *)); 1001 if (s->num_children == INT_MAX / sizeof(BdrvChild *) || 1002 s->next_child_index == UINT_MAX) { 1003 error_setg(errp, "Too many children"); 1004 return; 1005 } 1006 1007 ret = snprintf(indexstr, 32, "children.%u", s->next_child_index); 1008 if (ret < 0 || ret >= 32) { 1009 error_setg(errp, "cannot generate child name"); 1010 return; 1011 } 1012 s->next_child_index++; 1013 1014 bdrv_drained_begin(bs); 1015 1016 /* We can safely add the child now */ 1017 bdrv_ref(child_bs); 1018 1019 child = bdrv_attach_child(bs, child_bs, indexstr, &child_format, errp); 1020 if (child == NULL) { 1021 s->next_child_index--; 1022 goto out; 1023 } 1024 s->children = g_renew(BdrvChild *, s->children, s->num_children + 1); 1025 s->children[s->num_children++] = child; 1026 1027 out: 1028 bdrv_drained_end(bs); 1029 } 1030 1031 static void quorum_del_child(BlockDriverState *bs, BdrvChild *child, 1032 Error **errp) 1033 { 1034 BDRVQuorumState *s = bs->opaque; 1035 int i; 1036 1037 for (i = 0; i < s->num_children; i++) { 1038 if (s->children[i] == child) { 1039 break; 1040 } 1041 } 1042 1043 /* we have checked it in bdrv_del_child() */ 1044 assert(i < s->num_children); 1045 1046 if (s->num_children <= s->threshold) { 1047 error_setg(errp, 1048 "The number of children cannot be lower than the vote threshold %d", 1049 s->threshold); 1050 return; 1051 } 1052 1053 /* We know now that num_children > threshold, so blkverify must be false */ 1054 assert(!s->is_blkverify); 1055 1056 bdrv_drained_begin(bs); 1057 1058 /* We can safely remove this child now */ 1059 memmove(&s->children[i], &s->children[i + 1], 1060 (s->num_children - i - 1) * sizeof(BdrvChild *)); 1061 s->children = g_renew(BdrvChild *, s->children, --s->num_children); 1062 bdrv_unref_child(bs, child); 1063 1064 bdrv_drained_end(bs); 1065 } 1066 1067 static void quorum_gather_child_options(BlockDriverState *bs, QDict *target, 1068 bool backing_overridden) 1069 { 1070 BDRVQuorumState *s = bs->opaque; 1071 QList *children_list; 1072 int i; 1073 1074 /* 1075 * The generic implementation for gathering child options in 1076 * bdrv_refresh_filename() would use the names of the children 1077 * as specified for bdrv_open_child() or bdrv_attach_child(), 1078 * which is "children.%u" with %u being a value 1079 * (s->next_child_index) that is incremented each time a new child 1080 * is added (and never decremented). Since children can be 1081 * deleted at runtime, there may be gaps in that enumeration. 1082 * When creating a new quorum BDS and specifying the children for 1083 * it through runtime options, the enumeration used there may not 1084 * have any gaps, though. 1085 * 1086 * Therefore, we have to create a new gap-less enumeration here 1087 * (which we can achieve by simply putting all of the children's 1088 * full_open_options into a QList). 1089 * 1090 * XXX: Note that there are issues with the current child option 1091 * structure quorum uses (such as the fact that children do 1092 * not really have unique permanent names). Therefore, this 1093 * is going to have to change in the future and ideally we 1094 * want quorum to be covered by the generic implementation. 1095 */ 1096 1097 children_list = qlist_new(); 1098 qdict_put(target, "children", children_list); 1099 1100 for (i = 0; i < s->num_children; i++) { 1101 qlist_append(children_list, 1102 qobject_ref(s->children[i]->bs->full_open_options)); 1103 } 1104 } 1105 1106 static char *quorum_dirname(BlockDriverState *bs, Error **errp) 1107 { 1108 /* In general, there are multiple BDSs with different dirnames below this 1109 * one; so there is no unique dirname we could return (unless all are equal 1110 * by chance, or there is only one). Therefore, to be consistent, just 1111 * always return NULL. */ 1112 error_setg(errp, "Cannot generate a base directory for quorum nodes"); 1113 return NULL; 1114 } 1115 1116 static const char *const quorum_strong_runtime_opts[] = { 1117 QUORUM_OPT_VOTE_THRESHOLD, 1118 QUORUM_OPT_BLKVERIFY, 1119 QUORUM_OPT_REWRITE, 1120 QUORUM_OPT_READ_PATTERN, 1121 1122 NULL 1123 }; 1124 1125 static BlockDriver bdrv_quorum = { 1126 .format_name = "quorum", 1127 1128 .instance_size = sizeof(BDRVQuorumState), 1129 1130 .bdrv_open = quorum_open, 1131 .bdrv_close = quorum_close, 1132 .bdrv_gather_child_options = quorum_gather_child_options, 1133 .bdrv_dirname = quorum_dirname, 1134 1135 .bdrv_co_flush_to_disk = quorum_co_flush, 1136 1137 .bdrv_getlength = quorum_getlength, 1138 1139 .bdrv_co_preadv = quorum_co_preadv, 1140 .bdrv_co_pwritev = quorum_co_pwritev, 1141 1142 .bdrv_add_child = quorum_add_child, 1143 .bdrv_del_child = quorum_del_child, 1144 1145 .bdrv_child_perm = bdrv_filter_default_perms, 1146 1147 .is_filter = true, 1148 .bdrv_recurse_is_first_non_filter = quorum_recurse_is_first_non_filter, 1149 1150 .strong_runtime_opts = quorum_strong_runtime_opts, 1151 }; 1152 1153 static void bdrv_quorum_init(void) 1154 { 1155 if (!qcrypto_hash_supports(QCRYPTO_HASH_ALG_SHA256)) { 1156 /* SHA256 hash support is required for quorum device */ 1157 return; 1158 } 1159 bdrv_register(&bdrv_quorum); 1160 } 1161 1162 block_init(bdrv_quorum_init); 1163