1 /* 2 * Quorum Block filter 3 * 4 * Copyright (C) 2012-2014 Nodalink, EURL. 5 * 6 * Author: 7 * Benoît Canet <benoit.canet@irqsave.net> 8 * 9 * Based on the design and code of blkverify.c (Copyright (C) 2010 IBM, Corp) 10 * and blkmirror.c (Copyright (C) 2011 Red Hat, Inc). 11 * 12 * This work is licensed under the terms of the GNU GPL, version 2 or later. 13 * See the COPYING file in the top-level directory. 14 */ 15 16 #include "qemu/osdep.h" 17 #include "qemu/cutils.h" 18 #include "block/block_int.h" 19 #include "qapi/qmp/qbool.h" 20 #include "qapi/qmp/qdict.h" 21 #include "qapi/qmp/qerror.h" 22 #include "qapi/qmp/qint.h" 23 #include "qapi/qmp/qjson.h" 24 #include "qapi/qmp/qlist.h" 25 #include "qapi/qmp/qstring.h" 26 #include "qapi-event.h" 27 #include "crypto/hash.h" 28 29 #define HASH_LENGTH 32 30 31 #define QUORUM_OPT_VOTE_THRESHOLD "vote-threshold" 32 #define QUORUM_OPT_BLKVERIFY "blkverify" 33 #define QUORUM_OPT_REWRITE "rewrite-corrupted" 34 #define QUORUM_OPT_READ_PATTERN "read-pattern" 35 36 /* This union holds a vote hash value */ 37 typedef union QuorumVoteValue { 38 uint8_t h[HASH_LENGTH]; /* SHA-256 hash */ 39 int64_t l; /* simpler 64 bits hash */ 40 } QuorumVoteValue; 41 42 /* A vote item */ 43 typedef struct QuorumVoteItem { 44 int index; 45 QLIST_ENTRY(QuorumVoteItem) next; 46 } QuorumVoteItem; 47 48 /* this structure is a vote version. A version is the set of votes sharing the 49 * same vote value. 50 * The set of votes will be tracked with the items field and its cardinality is 51 * vote_count. 52 */ 53 typedef struct QuorumVoteVersion { 54 QuorumVoteValue value; 55 int index; 56 int vote_count; 57 QLIST_HEAD(, QuorumVoteItem) items; 58 QLIST_ENTRY(QuorumVoteVersion) next; 59 } QuorumVoteVersion; 60 61 /* this structure holds a group of vote versions together */ 62 typedef struct QuorumVotes { 63 QLIST_HEAD(, QuorumVoteVersion) vote_list; 64 bool (*compare)(QuorumVoteValue *a, QuorumVoteValue *b); 65 } QuorumVotes; 66 67 /* the following structure holds the state of one quorum instance */ 68 typedef struct BDRVQuorumState { 69 BdrvChild **children; /* children BlockDriverStates */ 70 int num_children; /* children count */ 71 unsigned next_child_index; /* the index of the next child that should 72 * be added 73 */ 74 int threshold; /* if less than threshold children reads gave the 75 * same result a quorum error occurs. 76 */ 77 bool is_blkverify; /* true if the driver is in blkverify mode 78 * Writes are mirrored on two children devices. 79 * On reads the two children devices' contents are 80 * compared and if a difference is spotted its 81 * location is printed and the code aborts. 82 * It is useful to debug other block drivers by 83 * comparing them with a reference one. 84 */ 85 bool rewrite_corrupted;/* true if the driver must rewrite-on-read corrupted 86 * block if Quorum is reached. 87 */ 88 89 QuorumReadPattern read_pattern; 90 } BDRVQuorumState; 91 92 typedef struct QuorumAIOCB QuorumAIOCB; 93 94 /* Quorum will create one instance of the following structure per operation it 95 * performs on its children. 96 * So for each read/write operation coming from the upper layer there will be 97 * $children_count QuorumChildRequest. 98 */ 99 typedef struct QuorumChildRequest { 100 BlockAIOCB *aiocb; 101 QEMUIOVector qiov; 102 uint8_t *buf; 103 int ret; 104 QuorumAIOCB *parent; 105 } QuorumChildRequest; 106 107 /* Quorum will use the following structure to track progress of each read/write 108 * operation received by the upper layer. 109 * This structure hold pointers to the QuorumChildRequest structures instances 110 * used to do operations on each children and track overall progress. 111 */ 112 struct QuorumAIOCB { 113 BlockAIOCB common; 114 115 /* Request metadata */ 116 uint64_t sector_num; 117 int nb_sectors; 118 119 QEMUIOVector *qiov; /* calling IOV */ 120 121 QuorumChildRequest *qcrs; /* individual child requests */ 122 int count; /* number of completed AIOCB */ 123 int success_count; /* number of successfully completed AIOCB */ 124 125 int rewrite_count; /* number of replica to rewrite: count down to 126 * zero once writes are fired 127 */ 128 129 QuorumVotes votes; 130 131 bool is_read; 132 int vote_ret; 133 int children_read; /* how many children have been read from */ 134 }; 135 136 static bool quorum_vote(QuorumAIOCB *acb); 137 138 static void quorum_aio_cancel(BlockAIOCB *blockacb) 139 { 140 QuorumAIOCB *acb = container_of(blockacb, QuorumAIOCB, common); 141 BDRVQuorumState *s = acb->common.bs->opaque; 142 int i; 143 144 /* cancel all callbacks */ 145 for (i = 0; i < s->num_children; i++) { 146 if (acb->qcrs[i].aiocb) { 147 bdrv_aio_cancel_async(acb->qcrs[i].aiocb); 148 } 149 } 150 } 151 152 static AIOCBInfo quorum_aiocb_info = { 153 .aiocb_size = sizeof(QuorumAIOCB), 154 .cancel_async = quorum_aio_cancel, 155 }; 156 157 static void quorum_aio_finalize(QuorumAIOCB *acb) 158 { 159 acb->common.cb(acb->common.opaque, acb->vote_ret); 160 g_free(acb->qcrs); 161 qemu_aio_unref(acb); 162 } 163 164 static bool quorum_sha256_compare(QuorumVoteValue *a, QuorumVoteValue *b) 165 { 166 return !memcmp(a->h, b->h, HASH_LENGTH); 167 } 168 169 static bool quorum_64bits_compare(QuorumVoteValue *a, QuorumVoteValue *b) 170 { 171 return a->l == b->l; 172 } 173 174 static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s, 175 BlockDriverState *bs, 176 QEMUIOVector *qiov, 177 uint64_t sector_num, 178 int nb_sectors, 179 BlockCompletionFunc *cb, 180 void *opaque) 181 { 182 QuorumAIOCB *acb = qemu_aio_get(&quorum_aiocb_info, bs, cb, opaque); 183 int i; 184 185 acb->common.bs->opaque = s; 186 acb->sector_num = sector_num; 187 acb->nb_sectors = nb_sectors; 188 acb->qiov = qiov; 189 acb->qcrs = g_new0(QuorumChildRequest, s->num_children); 190 acb->count = 0; 191 acb->success_count = 0; 192 acb->rewrite_count = 0; 193 acb->votes.compare = quorum_sha256_compare; 194 QLIST_INIT(&acb->votes.vote_list); 195 acb->is_read = false; 196 acb->vote_ret = 0; 197 198 for (i = 0; i < s->num_children; i++) { 199 acb->qcrs[i].buf = NULL; 200 acb->qcrs[i].ret = 0; 201 acb->qcrs[i].parent = acb; 202 } 203 204 return acb; 205 } 206 207 static void quorum_report_bad(QuorumOpType type, uint64_t sector_num, 208 int nb_sectors, char *node_name, int ret) 209 { 210 const char *msg = NULL; 211 if (ret < 0) { 212 msg = strerror(-ret); 213 } 214 215 qapi_event_send_quorum_report_bad(type, !!msg, msg, node_name, 216 sector_num, nb_sectors, &error_abort); 217 } 218 219 static void quorum_report_failure(QuorumAIOCB *acb) 220 { 221 const char *reference = bdrv_get_device_or_node_name(acb->common.bs); 222 qapi_event_send_quorum_failure(reference, acb->sector_num, 223 acb->nb_sectors, &error_abort); 224 } 225 226 static int quorum_vote_error(QuorumAIOCB *acb); 227 228 static bool quorum_has_too_much_io_failed(QuorumAIOCB *acb) 229 { 230 BDRVQuorumState *s = acb->common.bs->opaque; 231 232 if (acb->success_count < s->threshold) { 233 acb->vote_ret = quorum_vote_error(acb); 234 quorum_report_failure(acb); 235 return true; 236 } 237 238 return false; 239 } 240 241 static void quorum_rewrite_aio_cb(void *opaque, int ret) 242 { 243 QuorumAIOCB *acb = opaque; 244 245 /* one less rewrite to do */ 246 acb->rewrite_count--; 247 248 /* wait until all rewrite callbacks have completed */ 249 if (acb->rewrite_count) { 250 return; 251 } 252 253 quorum_aio_finalize(acb); 254 } 255 256 static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb); 257 258 static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source) 259 { 260 int i; 261 assert(dest->niov == source->niov); 262 assert(dest->size == source->size); 263 for (i = 0; i < source->niov; i++) { 264 assert(dest->iov[i].iov_len == source->iov[i].iov_len); 265 memcpy(dest->iov[i].iov_base, 266 source->iov[i].iov_base, 267 source->iov[i].iov_len); 268 } 269 } 270 271 static void quorum_report_bad_acb(QuorumChildRequest *sacb, int ret) 272 { 273 QuorumAIOCB *acb = sacb->parent; 274 QuorumOpType type = acb->is_read ? QUORUM_OP_TYPE_READ : QUORUM_OP_TYPE_WRITE; 275 quorum_report_bad(type, acb->sector_num, acb->nb_sectors, 276 sacb->aiocb->bs->node_name, ret); 277 } 278 279 static void quorum_fifo_aio_cb(void *opaque, int ret) 280 { 281 QuorumChildRequest *sacb = opaque; 282 QuorumAIOCB *acb = sacb->parent; 283 BDRVQuorumState *s = acb->common.bs->opaque; 284 285 assert(acb->is_read && s->read_pattern == QUORUM_READ_PATTERN_FIFO); 286 287 if (ret < 0) { 288 quorum_report_bad_acb(sacb, ret); 289 290 /* We try to read next child in FIFO order if we fail to read */ 291 if (acb->children_read < s->num_children) { 292 read_fifo_child(acb); 293 return; 294 } 295 } 296 297 acb->vote_ret = ret; 298 299 /* FIXME: rewrite failed children if acb->children_read > 1? */ 300 quorum_aio_finalize(acb); 301 } 302 303 static void quorum_aio_cb(void *opaque, int ret) 304 { 305 QuorumChildRequest *sacb = opaque; 306 QuorumAIOCB *acb = sacb->parent; 307 BDRVQuorumState *s = acb->common.bs->opaque; 308 bool rewrite = false; 309 int i; 310 311 sacb->ret = ret; 312 if (ret == 0) { 313 acb->success_count++; 314 } else { 315 quorum_report_bad_acb(sacb, ret); 316 } 317 acb->count++; 318 assert(acb->count <= s->num_children); 319 assert(acb->success_count <= s->num_children); 320 if (acb->count < s->num_children) { 321 return; 322 } 323 324 /* Do the vote on read */ 325 if (acb->is_read) { 326 rewrite = quorum_vote(acb); 327 for (i = 0; i < s->num_children; i++) { 328 qemu_vfree(acb->qcrs[i].buf); 329 qemu_iovec_destroy(&acb->qcrs[i].qiov); 330 } 331 } else { 332 quorum_has_too_much_io_failed(acb); 333 } 334 335 /* if no rewrite is done the code will finish right away */ 336 if (!rewrite) { 337 quorum_aio_finalize(acb); 338 } 339 } 340 341 static void quorum_report_bad_versions(BDRVQuorumState *s, 342 QuorumAIOCB *acb, 343 QuorumVoteValue *value) 344 { 345 QuorumVoteVersion *version; 346 QuorumVoteItem *item; 347 348 QLIST_FOREACH(version, &acb->votes.vote_list, next) { 349 if (acb->votes.compare(&version->value, value)) { 350 continue; 351 } 352 QLIST_FOREACH(item, &version->items, next) { 353 quorum_report_bad(QUORUM_OP_TYPE_READ, acb->sector_num, 354 acb->nb_sectors, 355 s->children[item->index]->bs->node_name, 0); 356 } 357 } 358 } 359 360 static bool quorum_rewrite_bad_versions(BDRVQuorumState *s, QuorumAIOCB *acb, 361 QuorumVoteValue *value) 362 { 363 QuorumVoteVersion *version; 364 QuorumVoteItem *item; 365 int count = 0; 366 367 /* first count the number of bad versions: done first to avoid concurrency 368 * issues. 369 */ 370 QLIST_FOREACH(version, &acb->votes.vote_list, next) { 371 if (acb->votes.compare(&version->value, value)) { 372 continue; 373 } 374 QLIST_FOREACH(item, &version->items, next) { 375 count++; 376 } 377 } 378 379 /* quorum_rewrite_aio_cb will count down this to zero */ 380 acb->rewrite_count = count; 381 382 /* now fire the correcting rewrites */ 383 QLIST_FOREACH(version, &acb->votes.vote_list, next) { 384 if (acb->votes.compare(&version->value, value)) { 385 continue; 386 } 387 QLIST_FOREACH(item, &version->items, next) { 388 bdrv_aio_writev(s->children[item->index], acb->sector_num, 389 acb->qiov, acb->nb_sectors, quorum_rewrite_aio_cb, 390 acb); 391 } 392 } 393 394 /* return true if any rewrite is done else false */ 395 return count; 396 } 397 398 static void quorum_count_vote(QuorumVotes *votes, 399 QuorumVoteValue *value, 400 int index) 401 { 402 QuorumVoteVersion *v = NULL, *version = NULL; 403 QuorumVoteItem *item; 404 405 /* look if we have something with this hash */ 406 QLIST_FOREACH(v, &votes->vote_list, next) { 407 if (votes->compare(&v->value, value)) { 408 version = v; 409 break; 410 } 411 } 412 413 /* It's a version not yet in the list add it */ 414 if (!version) { 415 version = g_new0(QuorumVoteVersion, 1); 416 QLIST_INIT(&version->items); 417 memcpy(&version->value, value, sizeof(version->value)); 418 version->index = index; 419 version->vote_count = 0; 420 QLIST_INSERT_HEAD(&votes->vote_list, version, next); 421 } 422 423 version->vote_count++; 424 425 item = g_new0(QuorumVoteItem, 1); 426 item->index = index; 427 QLIST_INSERT_HEAD(&version->items, item, next); 428 } 429 430 static void quorum_free_vote_list(QuorumVotes *votes) 431 { 432 QuorumVoteVersion *version, *next_version; 433 QuorumVoteItem *item, *next_item; 434 435 QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) { 436 QLIST_REMOVE(version, next); 437 QLIST_FOREACH_SAFE(item, &version->items, next, next_item) { 438 QLIST_REMOVE(item, next); 439 g_free(item); 440 } 441 g_free(version); 442 } 443 } 444 445 static int quorum_compute_hash(QuorumAIOCB *acb, int i, QuorumVoteValue *hash) 446 { 447 QEMUIOVector *qiov = &acb->qcrs[i].qiov; 448 size_t len = sizeof(hash->h); 449 uint8_t *data = hash->h; 450 451 /* XXX - would be nice if we could pass in the Error ** 452 * and propagate that back, but this quorum code is 453 * restricted to just errno values currently */ 454 if (qcrypto_hash_bytesv(QCRYPTO_HASH_ALG_SHA256, 455 qiov->iov, qiov->niov, 456 &data, &len, 457 NULL) < 0) { 458 return -EINVAL; 459 } 460 461 return 0; 462 } 463 464 static QuorumVoteVersion *quorum_get_vote_winner(QuorumVotes *votes) 465 { 466 int max = 0; 467 QuorumVoteVersion *candidate, *winner = NULL; 468 469 QLIST_FOREACH(candidate, &votes->vote_list, next) { 470 if (candidate->vote_count > max) { 471 max = candidate->vote_count; 472 winner = candidate; 473 } 474 } 475 476 return winner; 477 } 478 479 /* qemu_iovec_compare is handy for blkverify mode because it returns the first 480 * differing byte location. Yet it is handcoded to compare vectors one byte 481 * after another so it does not benefit from the libc SIMD optimizations. 482 * quorum_iovec_compare is written for speed and should be used in the non 483 * blkverify mode of quorum. 484 */ 485 static bool quorum_iovec_compare(QEMUIOVector *a, QEMUIOVector *b) 486 { 487 int i; 488 int result; 489 490 assert(a->niov == b->niov); 491 for (i = 0; i < a->niov; i++) { 492 assert(a->iov[i].iov_len == b->iov[i].iov_len); 493 result = memcmp(a->iov[i].iov_base, 494 b->iov[i].iov_base, 495 a->iov[i].iov_len); 496 if (result) { 497 return false; 498 } 499 } 500 501 return true; 502 } 503 504 static void GCC_FMT_ATTR(2, 3) quorum_err(QuorumAIOCB *acb, 505 const char *fmt, ...) 506 { 507 va_list ap; 508 509 va_start(ap, fmt); 510 fprintf(stderr, "quorum: sector_num=%" PRId64 " nb_sectors=%d ", 511 acb->sector_num, acb->nb_sectors); 512 vfprintf(stderr, fmt, ap); 513 fprintf(stderr, "\n"); 514 va_end(ap); 515 exit(1); 516 } 517 518 static bool quorum_compare(QuorumAIOCB *acb, 519 QEMUIOVector *a, 520 QEMUIOVector *b) 521 { 522 BDRVQuorumState *s = acb->common.bs->opaque; 523 ssize_t offset; 524 525 /* This driver will replace blkverify in this particular case */ 526 if (s->is_blkverify) { 527 offset = qemu_iovec_compare(a, b); 528 if (offset != -1) { 529 quorum_err(acb, "contents mismatch in sector %" PRId64, 530 acb->sector_num + 531 (uint64_t)(offset / BDRV_SECTOR_SIZE)); 532 } 533 return true; 534 } 535 536 return quorum_iovec_compare(a, b); 537 } 538 539 /* Do a vote to get the error code */ 540 static int quorum_vote_error(QuorumAIOCB *acb) 541 { 542 BDRVQuorumState *s = acb->common.bs->opaque; 543 QuorumVoteVersion *winner = NULL; 544 QuorumVotes error_votes; 545 QuorumVoteValue result_value; 546 int i, ret = 0; 547 bool error = false; 548 549 QLIST_INIT(&error_votes.vote_list); 550 error_votes.compare = quorum_64bits_compare; 551 552 for (i = 0; i < s->num_children; i++) { 553 ret = acb->qcrs[i].ret; 554 if (ret) { 555 error = true; 556 result_value.l = ret; 557 quorum_count_vote(&error_votes, &result_value, i); 558 } 559 } 560 561 if (error) { 562 winner = quorum_get_vote_winner(&error_votes); 563 ret = winner->value.l; 564 } 565 566 quorum_free_vote_list(&error_votes); 567 568 return ret; 569 } 570 571 static bool quorum_vote(QuorumAIOCB *acb) 572 { 573 bool quorum = true; 574 bool rewrite = false; 575 int i, j, ret; 576 QuorumVoteValue hash; 577 BDRVQuorumState *s = acb->common.bs->opaque; 578 QuorumVoteVersion *winner; 579 580 if (quorum_has_too_much_io_failed(acb)) { 581 return false; 582 } 583 584 /* get the index of the first successful read */ 585 for (i = 0; i < s->num_children; i++) { 586 if (!acb->qcrs[i].ret) { 587 break; 588 } 589 } 590 591 assert(i < s->num_children); 592 593 /* compare this read with all other successful reads stopping at quorum 594 * failure 595 */ 596 for (j = i + 1; j < s->num_children; j++) { 597 if (acb->qcrs[j].ret) { 598 continue; 599 } 600 quorum = quorum_compare(acb, &acb->qcrs[i].qiov, &acb->qcrs[j].qiov); 601 if (!quorum) { 602 break; 603 } 604 } 605 606 /* Every successful read agrees */ 607 if (quorum) { 608 quorum_copy_qiov(acb->qiov, &acb->qcrs[i].qiov); 609 return false; 610 } 611 612 /* compute hashes for each successful read, also store indexes */ 613 for (i = 0; i < s->num_children; i++) { 614 if (acb->qcrs[i].ret) { 615 continue; 616 } 617 ret = quorum_compute_hash(acb, i, &hash); 618 /* if ever the hash computation failed */ 619 if (ret < 0) { 620 acb->vote_ret = ret; 621 goto free_exit; 622 } 623 quorum_count_vote(&acb->votes, &hash, i); 624 } 625 626 /* vote to select the most represented version */ 627 winner = quorum_get_vote_winner(&acb->votes); 628 629 /* if the winner count is smaller than threshold the read fails */ 630 if (winner->vote_count < s->threshold) { 631 quorum_report_failure(acb); 632 acb->vote_ret = -EIO; 633 goto free_exit; 634 } 635 636 /* we have a winner: copy it */ 637 quorum_copy_qiov(acb->qiov, &acb->qcrs[winner->index].qiov); 638 639 /* some versions are bad print them */ 640 quorum_report_bad_versions(s, acb, &winner->value); 641 642 /* corruption correction is enabled */ 643 if (s->rewrite_corrupted) { 644 rewrite = quorum_rewrite_bad_versions(s, acb, &winner->value); 645 } 646 647 free_exit: 648 /* free lists */ 649 quorum_free_vote_list(&acb->votes); 650 return rewrite; 651 } 652 653 static BlockAIOCB *read_quorum_children(QuorumAIOCB *acb) 654 { 655 BDRVQuorumState *s = acb->common.bs->opaque; 656 int i; 657 658 acb->children_read = s->num_children; 659 for (i = 0; i < s->num_children; i++) { 660 acb->qcrs[i].buf = qemu_blockalign(s->children[i]->bs, acb->qiov->size); 661 qemu_iovec_init(&acb->qcrs[i].qiov, acb->qiov->niov); 662 qemu_iovec_clone(&acb->qcrs[i].qiov, acb->qiov, acb->qcrs[i].buf); 663 } 664 665 for (i = 0; i < s->num_children; i++) { 666 acb->qcrs[i].aiocb = bdrv_aio_readv(s->children[i], acb->sector_num, 667 &acb->qcrs[i].qiov, acb->nb_sectors, 668 quorum_aio_cb, &acb->qcrs[i]); 669 } 670 671 return &acb->common; 672 } 673 674 static BlockAIOCB *read_fifo_child(QuorumAIOCB *acb) 675 { 676 BDRVQuorumState *s = acb->common.bs->opaque; 677 int n = acb->children_read++; 678 679 acb->qcrs[n].aiocb = bdrv_aio_readv(s->children[n], acb->sector_num, 680 acb->qiov, acb->nb_sectors, 681 quorum_fifo_aio_cb, &acb->qcrs[n]); 682 683 return &acb->common; 684 } 685 686 static BlockAIOCB *quorum_aio_readv(BlockDriverState *bs, 687 int64_t sector_num, 688 QEMUIOVector *qiov, 689 int nb_sectors, 690 BlockCompletionFunc *cb, 691 void *opaque) 692 { 693 BDRVQuorumState *s = bs->opaque; 694 QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num, 695 nb_sectors, cb, opaque); 696 acb->is_read = true; 697 acb->children_read = 0; 698 699 if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) { 700 return read_quorum_children(acb); 701 } 702 703 return read_fifo_child(acb); 704 } 705 706 static BlockAIOCB *quorum_aio_writev(BlockDriverState *bs, 707 int64_t sector_num, 708 QEMUIOVector *qiov, 709 int nb_sectors, 710 BlockCompletionFunc *cb, 711 void *opaque) 712 { 713 BDRVQuorumState *s = bs->opaque; 714 QuorumAIOCB *acb = quorum_aio_get(s, bs, qiov, sector_num, nb_sectors, 715 cb, opaque); 716 int i; 717 718 for (i = 0; i < s->num_children; i++) { 719 acb->qcrs[i].aiocb = bdrv_aio_writev(s->children[i], sector_num, 720 qiov, nb_sectors, &quorum_aio_cb, 721 &acb->qcrs[i]); 722 } 723 724 return &acb->common; 725 } 726 727 static int64_t quorum_getlength(BlockDriverState *bs) 728 { 729 BDRVQuorumState *s = bs->opaque; 730 int64_t result; 731 int i; 732 733 /* check that all file have the same length */ 734 result = bdrv_getlength(s->children[0]->bs); 735 if (result < 0) { 736 return result; 737 } 738 for (i = 1; i < s->num_children; i++) { 739 int64_t value = bdrv_getlength(s->children[i]->bs); 740 if (value < 0) { 741 return value; 742 } 743 if (value != result) { 744 return -EIO; 745 } 746 } 747 748 return result; 749 } 750 751 static coroutine_fn int quorum_co_flush(BlockDriverState *bs) 752 { 753 BDRVQuorumState *s = bs->opaque; 754 QuorumVoteVersion *winner = NULL; 755 QuorumVotes error_votes; 756 QuorumVoteValue result_value; 757 int i; 758 int result = 0; 759 int success_count = 0; 760 761 QLIST_INIT(&error_votes.vote_list); 762 error_votes.compare = quorum_64bits_compare; 763 764 for (i = 0; i < s->num_children; i++) { 765 result = bdrv_co_flush(s->children[i]->bs); 766 if (result) { 767 quorum_report_bad(QUORUM_OP_TYPE_FLUSH, 0, 768 bdrv_nb_sectors(s->children[i]->bs), 769 s->children[i]->bs->node_name, result); 770 result_value.l = result; 771 quorum_count_vote(&error_votes, &result_value, i); 772 } else { 773 success_count++; 774 } 775 } 776 777 if (success_count >= s->threshold) { 778 result = 0; 779 } else { 780 winner = quorum_get_vote_winner(&error_votes); 781 result = winner->value.l; 782 } 783 quorum_free_vote_list(&error_votes); 784 785 return result; 786 } 787 788 static bool quorum_recurse_is_first_non_filter(BlockDriverState *bs, 789 BlockDriverState *candidate) 790 { 791 BDRVQuorumState *s = bs->opaque; 792 int i; 793 794 for (i = 0; i < s->num_children; i++) { 795 bool perm = bdrv_recurse_is_first_non_filter(s->children[i]->bs, 796 candidate); 797 if (perm) { 798 return true; 799 } 800 } 801 802 return false; 803 } 804 805 static int quorum_valid_threshold(int threshold, int num_children, Error **errp) 806 { 807 808 if (threshold < 1) { 809 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, 810 "vote-threshold", "value >= 1"); 811 return -ERANGE; 812 } 813 814 if (threshold > num_children) { 815 error_setg(errp, "threshold may not exceed children count"); 816 return -ERANGE; 817 } 818 819 return 0; 820 } 821 822 static QemuOptsList quorum_runtime_opts = { 823 .name = "quorum", 824 .head = QTAILQ_HEAD_INITIALIZER(quorum_runtime_opts.head), 825 .desc = { 826 { 827 .name = QUORUM_OPT_VOTE_THRESHOLD, 828 .type = QEMU_OPT_NUMBER, 829 .help = "The number of vote needed for reaching quorum", 830 }, 831 { 832 .name = QUORUM_OPT_BLKVERIFY, 833 .type = QEMU_OPT_BOOL, 834 .help = "Trigger block verify mode if set", 835 }, 836 { 837 .name = QUORUM_OPT_REWRITE, 838 .type = QEMU_OPT_BOOL, 839 .help = "Rewrite corrupted block on read quorum", 840 }, 841 { 842 .name = QUORUM_OPT_READ_PATTERN, 843 .type = QEMU_OPT_STRING, 844 .help = "Allowed pattern: quorum, fifo. Quorum is default", 845 }, 846 { /* end of list */ } 847 }, 848 }; 849 850 static int parse_read_pattern(const char *opt) 851 { 852 int i; 853 854 if (!opt) { 855 /* Set quorum as default */ 856 return QUORUM_READ_PATTERN_QUORUM; 857 } 858 859 for (i = 0; i < QUORUM_READ_PATTERN__MAX; i++) { 860 if (!strcmp(opt, QuorumReadPattern_lookup[i])) { 861 return i; 862 } 863 } 864 865 return -EINVAL; 866 } 867 868 static int quorum_open(BlockDriverState *bs, QDict *options, int flags, 869 Error **errp) 870 { 871 BDRVQuorumState *s = bs->opaque; 872 Error *local_err = NULL; 873 QemuOpts *opts = NULL; 874 bool *opened; 875 int i; 876 int ret = 0; 877 878 qdict_flatten(options); 879 880 /* count how many different children are present */ 881 s->num_children = qdict_array_entries(options, "children."); 882 if (s->num_children < 0) { 883 error_setg(&local_err, "Option children is not a valid array"); 884 ret = -EINVAL; 885 goto exit; 886 } 887 if (s->num_children < 1) { 888 error_setg(&local_err, 889 "Number of provided children must be 1 or more"); 890 ret = -EINVAL; 891 goto exit; 892 } 893 894 opts = qemu_opts_create(&quorum_runtime_opts, NULL, 0, &error_abort); 895 qemu_opts_absorb_qdict(opts, options, &local_err); 896 if (local_err) { 897 ret = -EINVAL; 898 goto exit; 899 } 900 901 s->threshold = qemu_opt_get_number(opts, QUORUM_OPT_VOTE_THRESHOLD, 0); 902 /* and validate it against s->num_children */ 903 ret = quorum_valid_threshold(s->threshold, s->num_children, &local_err); 904 if (ret < 0) { 905 goto exit; 906 } 907 908 ret = parse_read_pattern(qemu_opt_get(opts, QUORUM_OPT_READ_PATTERN)); 909 if (ret < 0) { 910 error_setg(&local_err, "Please set read-pattern as fifo or quorum"); 911 goto exit; 912 } 913 s->read_pattern = ret; 914 915 if (s->read_pattern == QUORUM_READ_PATTERN_QUORUM) { 916 /* is the driver in blkverify mode */ 917 if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false) && 918 s->num_children == 2 && s->threshold == 2) { 919 s->is_blkverify = true; 920 } else if (qemu_opt_get_bool(opts, QUORUM_OPT_BLKVERIFY, false)) { 921 fprintf(stderr, "blkverify mode is set by setting blkverify=on " 922 "and using two files with vote_threshold=2\n"); 923 } 924 925 s->rewrite_corrupted = qemu_opt_get_bool(opts, QUORUM_OPT_REWRITE, 926 false); 927 if (s->rewrite_corrupted && s->is_blkverify) { 928 error_setg(&local_err, 929 "rewrite-corrupted=on cannot be used with blkverify=on"); 930 ret = -EINVAL; 931 goto exit; 932 } 933 } 934 935 /* allocate the children array */ 936 s->children = g_new0(BdrvChild *, s->num_children); 937 opened = g_new0(bool, s->num_children); 938 939 for (i = 0; i < s->num_children; i++) { 940 char indexstr[32]; 941 ret = snprintf(indexstr, 32, "children.%d", i); 942 assert(ret < 32); 943 944 s->children[i] = bdrv_open_child(NULL, options, indexstr, bs, 945 &child_format, false, &local_err); 946 if (local_err) { 947 ret = -EINVAL; 948 goto close_exit; 949 } 950 951 opened[i] = true; 952 } 953 s->next_child_index = s->num_children; 954 955 g_free(opened); 956 goto exit; 957 958 close_exit: 959 /* cleanup on error */ 960 for (i = 0; i < s->num_children; i++) { 961 if (!opened[i]) { 962 continue; 963 } 964 bdrv_unref_child(bs, s->children[i]); 965 } 966 g_free(s->children); 967 g_free(opened); 968 exit: 969 qemu_opts_del(opts); 970 /* propagate error */ 971 error_propagate(errp, local_err); 972 return ret; 973 } 974 975 static void quorum_close(BlockDriverState *bs) 976 { 977 BDRVQuorumState *s = bs->opaque; 978 int i; 979 980 for (i = 0; i < s->num_children; i++) { 981 bdrv_unref_child(bs, s->children[i]); 982 } 983 984 g_free(s->children); 985 } 986 987 static void quorum_add_child(BlockDriverState *bs, BlockDriverState *child_bs, 988 Error **errp) 989 { 990 BDRVQuorumState *s = bs->opaque; 991 BdrvChild *child; 992 char indexstr[32]; 993 int ret; 994 995 assert(s->num_children <= INT_MAX / sizeof(BdrvChild *)); 996 if (s->num_children == INT_MAX / sizeof(BdrvChild *) || 997 s->next_child_index == UINT_MAX) { 998 error_setg(errp, "Too many children"); 999 return; 1000 } 1001 1002 ret = snprintf(indexstr, 32, "children.%u", s->next_child_index); 1003 if (ret < 0 || ret >= 32) { 1004 error_setg(errp, "cannot generate child name"); 1005 return; 1006 } 1007 s->next_child_index++; 1008 1009 bdrv_drained_begin(bs); 1010 1011 /* We can safely add the child now */ 1012 bdrv_ref(child_bs); 1013 child = bdrv_attach_child(bs, child_bs, indexstr, &child_format); 1014 s->children = g_renew(BdrvChild *, s->children, s->num_children + 1); 1015 s->children[s->num_children++] = child; 1016 1017 bdrv_drained_end(bs); 1018 } 1019 1020 static void quorum_del_child(BlockDriverState *bs, BdrvChild *child, 1021 Error **errp) 1022 { 1023 BDRVQuorumState *s = bs->opaque; 1024 int i; 1025 1026 for (i = 0; i < s->num_children; i++) { 1027 if (s->children[i] == child) { 1028 break; 1029 } 1030 } 1031 1032 /* we have checked it in bdrv_del_child() */ 1033 assert(i < s->num_children); 1034 1035 if (s->num_children <= s->threshold) { 1036 error_setg(errp, 1037 "The number of children cannot be lower than the vote threshold %d", 1038 s->threshold); 1039 return; 1040 } 1041 1042 bdrv_drained_begin(bs); 1043 1044 /* We can safely remove this child now */ 1045 memmove(&s->children[i], &s->children[i + 1], 1046 (s->num_children - i - 1) * sizeof(BdrvChild *)); 1047 s->children = g_renew(BdrvChild *, s->children, --s->num_children); 1048 bdrv_unref_child(bs, child); 1049 1050 bdrv_drained_end(bs); 1051 } 1052 1053 static void quorum_refresh_filename(BlockDriverState *bs, QDict *options) 1054 { 1055 BDRVQuorumState *s = bs->opaque; 1056 QDict *opts; 1057 QList *children; 1058 int i; 1059 1060 for (i = 0; i < s->num_children; i++) { 1061 bdrv_refresh_filename(s->children[i]->bs); 1062 if (!s->children[i]->bs->full_open_options) { 1063 return; 1064 } 1065 } 1066 1067 children = qlist_new(); 1068 for (i = 0; i < s->num_children; i++) { 1069 QINCREF(s->children[i]->bs->full_open_options); 1070 qlist_append_obj(children, 1071 QOBJECT(s->children[i]->bs->full_open_options)); 1072 } 1073 1074 opts = qdict_new(); 1075 qdict_put_obj(opts, "driver", QOBJECT(qstring_from_str("quorum"))); 1076 qdict_put_obj(opts, QUORUM_OPT_VOTE_THRESHOLD, 1077 QOBJECT(qint_from_int(s->threshold))); 1078 qdict_put_obj(opts, QUORUM_OPT_BLKVERIFY, 1079 QOBJECT(qbool_from_bool(s->is_blkverify))); 1080 qdict_put_obj(opts, QUORUM_OPT_REWRITE, 1081 QOBJECT(qbool_from_bool(s->rewrite_corrupted))); 1082 qdict_put_obj(opts, "children", QOBJECT(children)); 1083 1084 bs->full_open_options = opts; 1085 } 1086 1087 static BlockDriver bdrv_quorum = { 1088 .format_name = "quorum", 1089 .protocol_name = "quorum", 1090 1091 .instance_size = sizeof(BDRVQuorumState), 1092 1093 .bdrv_file_open = quorum_open, 1094 .bdrv_close = quorum_close, 1095 .bdrv_refresh_filename = quorum_refresh_filename, 1096 1097 .bdrv_co_flush_to_disk = quorum_co_flush, 1098 1099 .bdrv_getlength = quorum_getlength, 1100 1101 .bdrv_aio_readv = quorum_aio_readv, 1102 .bdrv_aio_writev = quorum_aio_writev, 1103 1104 .bdrv_add_child = quorum_add_child, 1105 .bdrv_del_child = quorum_del_child, 1106 1107 .is_filter = true, 1108 .bdrv_recurse_is_first_non_filter = quorum_recurse_is_first_non_filter, 1109 }; 1110 1111 static void bdrv_quorum_init(void) 1112 { 1113 if (!qcrypto_hash_supports(QCRYPTO_HASH_ALG_SHA256)) { 1114 /* SHA256 hash support is required for quorum device */ 1115 return; 1116 } 1117 bdrv_register(&bdrv_quorum); 1118 } 1119 1120 block_init(bdrv_quorum_init); 1121