1 /* 2 * QEMU Block driver for NBD 3 * 4 * Copyright (c) 2019 Virtuozzo International GmbH. 5 * Copyright (C) 2016 Red Hat, Inc. 6 * Copyright (C) 2008 Bull S.A.S. 7 * Author: Laurent Vivier <Laurent.Vivier@bull.net> 8 * 9 * Some parts: 10 * Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws> 11 * 12 * Permission is hereby granted, free of charge, to any person obtaining a copy 13 * of this software and associated documentation files (the "Software"), to deal 14 * in the Software without restriction, including without limitation the rights 15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 16 * copies of the Software, and to permit persons to whom the Software is 17 * furnished to do so, subject to the following conditions: 18 * 19 * The above copyright notice and this permission notice shall be included in 20 * all copies or substantial portions of the Software. 21 * 22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 25 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 28 * THE SOFTWARE. 29 */ 30 31 #include "qemu/osdep.h" 32 33 #include "trace.h" 34 #include "qemu/uri.h" 35 #include "qemu/option.h" 36 #include "qemu/cutils.h" 37 #include "qemu/main-loop.h" 38 #include "qemu/atomic.h" 39 40 #include "qapi/qapi-visit-sockets.h" 41 #include "qapi/qmp/qstring.h" 42 #include "qapi/clone-visitor.h" 43 44 #include "block/qdict.h" 45 #include "block/nbd.h" 46 #include "block/block_int.h" 47 48 #include "qemu/yank.h" 49 50 #define EN_OPTSTR ":exportname=" 51 #define MAX_NBD_REQUESTS 16 52 53 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs)) 54 #define INDEX_TO_HANDLE(bs, index) ((index) ^ (uint64_t)(intptr_t)(bs)) 55 56 typedef struct { 57 Coroutine *coroutine; 58 uint64_t offset; /* original offset of the request */ 59 bool receiving; /* waiting for connection_co? */ 60 } NBDClientRequest; 61 62 typedef enum NBDClientState { 63 NBD_CLIENT_CONNECTING_WAIT, 64 NBD_CLIENT_CONNECTING_NOWAIT, 65 NBD_CLIENT_CONNECTED, 66 NBD_CLIENT_QUIT 67 } NBDClientState; 68 69 typedef enum NBDConnectThreadState { 70 /* No thread, no pending results */ 71 CONNECT_THREAD_NONE, 72 73 /* Thread is running, no results for now */ 74 CONNECT_THREAD_RUNNING, 75 76 /* 77 * Thread is running, but requestor exited. Thread should close 78 * the new socket and free the connect state on exit. 79 */ 80 CONNECT_THREAD_RUNNING_DETACHED, 81 82 /* Thread finished, results are stored in a state */ 83 CONNECT_THREAD_FAIL, 84 CONNECT_THREAD_SUCCESS 85 } NBDConnectThreadState; 86 87 typedef struct NBDConnectThread { 88 /* Initialization constants */ 89 SocketAddress *saddr; /* address to connect to */ 90 /* 91 * Bottom half to schedule on completion. Scheduled only if bh_ctx is not 92 * NULL 93 */ 94 QEMUBHFunc *bh_func; 95 void *bh_opaque; 96 97 /* 98 * Result of last attempt. Valid in FAIL and SUCCESS states. 99 * If you want to steal error, don't forget to set pointer to NULL. 100 */ 101 QIOChannelSocket *sioc; 102 Error *err; 103 104 /* state and bh_ctx are protected by mutex */ 105 QemuMutex mutex; 106 NBDConnectThreadState state; /* current state of the thread */ 107 AioContext *bh_ctx; /* where to schedule bh (NULL means don't schedule) */ 108 } NBDConnectThread; 109 110 typedef struct BDRVNBDState { 111 QIOChannelSocket *sioc; /* The master data channel */ 112 QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */ 113 NBDExportInfo info; 114 115 CoMutex send_mutex; 116 CoQueue free_sema; 117 Coroutine *connection_co; 118 Coroutine *teardown_co; 119 QemuCoSleepState *connection_co_sleep_ns_state; 120 bool drained; 121 bool wait_drained_end; 122 int in_flight; 123 NBDClientState state; 124 int connect_status; 125 Error *connect_err; 126 bool wait_in_flight; 127 128 QEMUTimer *reconnect_delay_timer; 129 130 NBDClientRequest requests[MAX_NBD_REQUESTS]; 131 NBDReply reply; 132 BlockDriverState *bs; 133 134 /* Connection parameters */ 135 uint32_t reconnect_delay; 136 SocketAddress *saddr; 137 char *export, *tlscredsid; 138 QCryptoTLSCreds *tlscreds; 139 const char *hostname; 140 char *x_dirty_bitmap; 141 bool alloc_depth; 142 143 bool wait_connect; 144 NBDConnectThread *connect_thread; 145 } BDRVNBDState; 146 147 static int nbd_establish_connection(BlockDriverState *bs, SocketAddress *saddr, 148 Error **errp); 149 static int nbd_co_establish_connection(BlockDriverState *bs, Error **errp); 150 static void nbd_co_establish_connection_cancel(BlockDriverState *bs, 151 bool detach); 152 static int nbd_client_handshake(BlockDriverState *bs, Error **errp); 153 static void nbd_yank(void *opaque); 154 155 static void nbd_clear_bdrvstate(BDRVNBDState *s) 156 { 157 object_unref(OBJECT(s->tlscreds)); 158 qapi_free_SocketAddress(s->saddr); 159 s->saddr = NULL; 160 g_free(s->export); 161 s->export = NULL; 162 g_free(s->tlscredsid); 163 s->tlscredsid = NULL; 164 g_free(s->x_dirty_bitmap); 165 s->x_dirty_bitmap = NULL; 166 } 167 168 static void nbd_channel_error(BDRVNBDState *s, int ret) 169 { 170 if (ret == -EIO) { 171 if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) { 172 s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT : 173 NBD_CLIENT_CONNECTING_NOWAIT; 174 } 175 } else { 176 if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) { 177 qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 178 } 179 s->state = NBD_CLIENT_QUIT; 180 } 181 } 182 183 static void nbd_recv_coroutines_wake_all(BDRVNBDState *s) 184 { 185 int i; 186 187 for (i = 0; i < MAX_NBD_REQUESTS; i++) { 188 NBDClientRequest *req = &s->requests[i]; 189 190 if (req->coroutine && req->receiving) { 191 aio_co_wake(req->coroutine); 192 } 193 } 194 } 195 196 static void reconnect_delay_timer_del(BDRVNBDState *s) 197 { 198 if (s->reconnect_delay_timer) { 199 timer_free(s->reconnect_delay_timer); 200 s->reconnect_delay_timer = NULL; 201 } 202 } 203 204 static void reconnect_delay_timer_cb(void *opaque) 205 { 206 BDRVNBDState *s = opaque; 207 208 if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) { 209 s->state = NBD_CLIENT_CONNECTING_NOWAIT; 210 while (qemu_co_enter_next(&s->free_sema, NULL)) { 211 /* Resume all queued requests */ 212 } 213 } 214 215 reconnect_delay_timer_del(s); 216 } 217 218 static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns) 219 { 220 if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTING_WAIT) { 221 return; 222 } 223 224 assert(!s->reconnect_delay_timer); 225 s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs), 226 QEMU_CLOCK_REALTIME, 227 SCALE_NS, 228 reconnect_delay_timer_cb, s); 229 timer_mod(s->reconnect_delay_timer, expire_time_ns); 230 } 231 232 static void nbd_client_detach_aio_context(BlockDriverState *bs) 233 { 234 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 235 236 /* Timer is deleted in nbd_client_co_drain_begin() */ 237 assert(!s->reconnect_delay_timer); 238 qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc)); 239 } 240 241 static void nbd_client_attach_aio_context_bh(void *opaque) 242 { 243 BlockDriverState *bs = opaque; 244 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 245 246 /* 247 * The node is still drained, so we know the coroutine has yielded in 248 * nbd_read_eof(), the only place where bs->in_flight can reach 0, or it is 249 * entered for the first time. Both places are safe for entering the 250 * coroutine. 251 */ 252 qemu_aio_coroutine_enter(bs->aio_context, s->connection_co); 253 bdrv_dec_in_flight(bs); 254 } 255 256 static void nbd_client_attach_aio_context(BlockDriverState *bs, 257 AioContext *new_context) 258 { 259 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 260 261 /* 262 * s->connection_co is either yielded from nbd_receive_reply or from 263 * nbd_co_reconnect_loop() 264 */ 265 if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED) { 266 qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context); 267 } 268 269 bdrv_inc_in_flight(bs); 270 271 /* 272 * Need to wait here for the BH to run because the BH must run while the 273 * node is still drained. 274 */ 275 aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs); 276 } 277 278 static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs) 279 { 280 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 281 282 s->drained = true; 283 if (s->connection_co_sleep_ns_state) { 284 qemu_co_sleep_wake(s->connection_co_sleep_ns_state); 285 } 286 287 nbd_co_establish_connection_cancel(bs, false); 288 289 reconnect_delay_timer_del(s); 290 291 if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) { 292 s->state = NBD_CLIENT_CONNECTING_NOWAIT; 293 qemu_co_queue_restart_all(&s->free_sema); 294 } 295 } 296 297 static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs) 298 { 299 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 300 301 s->drained = false; 302 if (s->wait_drained_end) { 303 s->wait_drained_end = false; 304 aio_co_wake(s->connection_co); 305 } 306 } 307 308 309 static void nbd_teardown_connection(BlockDriverState *bs) 310 { 311 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 312 313 if (s->ioc) { 314 /* finish any pending coroutines */ 315 qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 316 } else if (s->sioc) { 317 /* abort negotiation */ 318 qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH, 319 NULL); 320 } 321 322 s->state = NBD_CLIENT_QUIT; 323 if (s->connection_co) { 324 if (s->connection_co_sleep_ns_state) { 325 qemu_co_sleep_wake(s->connection_co_sleep_ns_state); 326 } 327 nbd_co_establish_connection_cancel(bs, true); 328 } 329 if (qemu_in_coroutine()) { 330 s->teardown_co = qemu_coroutine_self(); 331 /* connection_co resumes us when it terminates */ 332 qemu_coroutine_yield(); 333 s->teardown_co = NULL; 334 } else { 335 BDRV_POLL_WHILE(bs, s->connection_co); 336 } 337 assert(!s->connection_co); 338 } 339 340 static bool nbd_client_connecting(BDRVNBDState *s) 341 { 342 NBDClientState state = qatomic_load_acquire(&s->state); 343 return state == NBD_CLIENT_CONNECTING_WAIT || 344 state == NBD_CLIENT_CONNECTING_NOWAIT; 345 } 346 347 static bool nbd_client_connecting_wait(BDRVNBDState *s) 348 { 349 return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT; 350 } 351 352 static void connect_bh(void *opaque) 353 { 354 BDRVNBDState *state = opaque; 355 356 assert(state->wait_connect); 357 state->wait_connect = false; 358 aio_co_wake(state->connection_co); 359 } 360 361 static void nbd_init_connect_thread(BDRVNBDState *s) 362 { 363 s->connect_thread = g_new(NBDConnectThread, 1); 364 365 *s->connect_thread = (NBDConnectThread) { 366 .saddr = QAPI_CLONE(SocketAddress, s->saddr), 367 .state = CONNECT_THREAD_NONE, 368 .bh_func = connect_bh, 369 .bh_opaque = s, 370 }; 371 372 qemu_mutex_init(&s->connect_thread->mutex); 373 } 374 375 static void nbd_free_connect_thread(NBDConnectThread *thr) 376 { 377 if (thr->sioc) { 378 qio_channel_close(QIO_CHANNEL(thr->sioc), NULL); 379 } 380 error_free(thr->err); 381 qapi_free_SocketAddress(thr->saddr); 382 g_free(thr); 383 } 384 385 static void *connect_thread_func(void *opaque) 386 { 387 NBDConnectThread *thr = opaque; 388 int ret; 389 bool do_free = false; 390 391 thr->sioc = qio_channel_socket_new(); 392 393 error_free(thr->err); 394 thr->err = NULL; 395 ret = qio_channel_socket_connect_sync(thr->sioc, thr->saddr, &thr->err); 396 if (ret < 0) { 397 object_unref(OBJECT(thr->sioc)); 398 thr->sioc = NULL; 399 } 400 401 qemu_mutex_lock(&thr->mutex); 402 403 switch (thr->state) { 404 case CONNECT_THREAD_RUNNING: 405 thr->state = ret < 0 ? CONNECT_THREAD_FAIL : CONNECT_THREAD_SUCCESS; 406 if (thr->bh_ctx) { 407 aio_bh_schedule_oneshot(thr->bh_ctx, thr->bh_func, thr->bh_opaque); 408 409 /* play safe, don't reuse bh_ctx on further connection attempts */ 410 thr->bh_ctx = NULL; 411 } 412 break; 413 case CONNECT_THREAD_RUNNING_DETACHED: 414 do_free = true; 415 break; 416 default: 417 abort(); 418 } 419 420 qemu_mutex_unlock(&thr->mutex); 421 422 if (do_free) { 423 nbd_free_connect_thread(thr); 424 } 425 426 return NULL; 427 } 428 429 static int coroutine_fn 430 nbd_co_establish_connection(BlockDriverState *bs, Error **errp) 431 { 432 int ret; 433 QemuThread thread; 434 BDRVNBDState *s = bs->opaque; 435 NBDConnectThread *thr = s->connect_thread; 436 437 qemu_mutex_lock(&thr->mutex); 438 439 switch (thr->state) { 440 case CONNECT_THREAD_FAIL: 441 case CONNECT_THREAD_NONE: 442 error_free(thr->err); 443 thr->err = NULL; 444 thr->state = CONNECT_THREAD_RUNNING; 445 qemu_thread_create(&thread, "nbd-connect", 446 connect_thread_func, thr, QEMU_THREAD_DETACHED); 447 break; 448 case CONNECT_THREAD_SUCCESS: 449 /* Previous attempt finally succeeded in background */ 450 thr->state = CONNECT_THREAD_NONE; 451 s->sioc = thr->sioc; 452 thr->sioc = NULL; 453 yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), 454 nbd_yank, bs); 455 qemu_mutex_unlock(&thr->mutex); 456 return 0; 457 case CONNECT_THREAD_RUNNING: 458 /* Already running, will wait */ 459 break; 460 default: 461 abort(); 462 } 463 464 thr->bh_ctx = qemu_get_current_aio_context(); 465 466 qemu_mutex_unlock(&thr->mutex); 467 468 469 /* 470 * We are going to wait for connect-thread finish, but 471 * nbd_client_co_drain_begin() can interrupt. 472 * 473 * Note that wait_connect variable is not visible for connect-thread. It 474 * doesn't need mutex protection, it used only inside home aio context of 475 * bs. 476 */ 477 s->wait_connect = true; 478 qemu_coroutine_yield(); 479 480 qemu_mutex_lock(&thr->mutex); 481 482 switch (thr->state) { 483 case CONNECT_THREAD_SUCCESS: 484 case CONNECT_THREAD_FAIL: 485 thr->state = CONNECT_THREAD_NONE; 486 error_propagate(errp, thr->err); 487 thr->err = NULL; 488 s->sioc = thr->sioc; 489 thr->sioc = NULL; 490 if (s->sioc) { 491 yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), 492 nbd_yank, bs); 493 } 494 ret = (s->sioc ? 0 : -1); 495 break; 496 case CONNECT_THREAD_RUNNING: 497 case CONNECT_THREAD_RUNNING_DETACHED: 498 /* 499 * Obviously, drained section wants to start. Report the attempt as 500 * failed. Still connect thread is executing in background, and its 501 * result may be used for next connection attempt. 502 */ 503 ret = -1; 504 error_setg(errp, "Connection attempt cancelled by other operation"); 505 break; 506 507 case CONNECT_THREAD_NONE: 508 /* 509 * Impossible. We've seen this thread running. So it should be 510 * running or at least give some results. 511 */ 512 abort(); 513 514 default: 515 abort(); 516 } 517 518 qemu_mutex_unlock(&thr->mutex); 519 520 return ret; 521 } 522 523 /* 524 * nbd_co_establish_connection_cancel 525 * Cancel nbd_co_establish_connection asynchronously: it will finish soon, to 526 * allow drained section to begin. 527 * 528 * If detach is true, also cleanup the state (or if thread is running, move it 529 * to CONNECT_THREAD_RUNNING_DETACHED state). s->connect_thread becomes NULL if 530 * detach is true. 531 */ 532 static void nbd_co_establish_connection_cancel(BlockDriverState *bs, 533 bool detach) 534 { 535 BDRVNBDState *s = bs->opaque; 536 NBDConnectThread *thr = s->connect_thread; 537 bool wake = false; 538 bool do_free = false; 539 540 qemu_mutex_lock(&thr->mutex); 541 542 if (thr->state == CONNECT_THREAD_RUNNING) { 543 /* We can cancel only in running state, when bh is not yet scheduled */ 544 thr->bh_ctx = NULL; 545 if (s->wait_connect) { 546 s->wait_connect = false; 547 wake = true; 548 } 549 if (detach) { 550 thr->state = CONNECT_THREAD_RUNNING_DETACHED; 551 s->connect_thread = NULL; 552 } 553 } else if (detach) { 554 do_free = true; 555 } 556 557 qemu_mutex_unlock(&thr->mutex); 558 559 if (do_free) { 560 nbd_free_connect_thread(thr); 561 s->connect_thread = NULL; 562 } 563 564 if (wake) { 565 aio_co_wake(s->connection_co); 566 } 567 } 568 569 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s) 570 { 571 int ret; 572 Error *local_err = NULL; 573 574 if (!nbd_client_connecting(s)) { 575 return; 576 } 577 578 /* Wait for completion of all in-flight requests */ 579 580 qemu_co_mutex_lock(&s->send_mutex); 581 582 while (s->in_flight > 0) { 583 qemu_co_mutex_unlock(&s->send_mutex); 584 nbd_recv_coroutines_wake_all(s); 585 s->wait_in_flight = true; 586 qemu_coroutine_yield(); 587 s->wait_in_flight = false; 588 qemu_co_mutex_lock(&s->send_mutex); 589 } 590 591 qemu_co_mutex_unlock(&s->send_mutex); 592 593 if (!nbd_client_connecting(s)) { 594 return; 595 } 596 597 /* 598 * Now we are sure that nobody is accessing the channel, and no one will 599 * try until we set the state to CONNECTED. 600 */ 601 602 /* Finalize previous connection if any */ 603 if (s->ioc) { 604 qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc)); 605 yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), 606 nbd_yank, s->bs); 607 object_unref(OBJECT(s->sioc)); 608 s->sioc = NULL; 609 object_unref(OBJECT(s->ioc)); 610 s->ioc = NULL; 611 } 612 613 if (nbd_co_establish_connection(s->bs, &local_err) < 0) { 614 ret = -ECONNREFUSED; 615 goto out; 616 } 617 618 bdrv_dec_in_flight(s->bs); 619 620 ret = nbd_client_handshake(s->bs, &local_err); 621 622 if (s->drained) { 623 s->wait_drained_end = true; 624 while (s->drained) { 625 /* 626 * We may be entered once from nbd_client_attach_aio_context_bh 627 * and then from nbd_client_co_drain_end. So here is a loop. 628 */ 629 qemu_coroutine_yield(); 630 } 631 } 632 bdrv_inc_in_flight(s->bs); 633 634 out: 635 s->connect_status = ret; 636 error_free(s->connect_err); 637 s->connect_err = NULL; 638 error_propagate(&s->connect_err, local_err); 639 640 if (ret >= 0) { 641 /* successfully connected */ 642 s->state = NBD_CLIENT_CONNECTED; 643 qemu_co_queue_restart_all(&s->free_sema); 644 } 645 } 646 647 static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s) 648 { 649 uint64_t timeout = 1 * NANOSECONDS_PER_SECOND; 650 uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND; 651 652 if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) { 653 reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + 654 s->reconnect_delay * NANOSECONDS_PER_SECOND); 655 } 656 657 nbd_reconnect_attempt(s); 658 659 while (nbd_client_connecting(s)) { 660 if (s->drained) { 661 bdrv_dec_in_flight(s->bs); 662 s->wait_drained_end = true; 663 while (s->drained) { 664 /* 665 * We may be entered once from nbd_client_attach_aio_context_bh 666 * and then from nbd_client_co_drain_end. So here is a loop. 667 */ 668 qemu_coroutine_yield(); 669 } 670 bdrv_inc_in_flight(s->bs); 671 } else { 672 qemu_co_sleep_ns_wakeable(QEMU_CLOCK_REALTIME, timeout, 673 &s->connection_co_sleep_ns_state); 674 if (s->drained) { 675 continue; 676 } 677 if (timeout < max_timeout) { 678 timeout *= 2; 679 } 680 } 681 682 nbd_reconnect_attempt(s); 683 } 684 685 reconnect_delay_timer_del(s); 686 } 687 688 static coroutine_fn void nbd_connection_entry(void *opaque) 689 { 690 BDRVNBDState *s = opaque; 691 uint64_t i; 692 int ret = 0; 693 Error *local_err = NULL; 694 695 while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) { 696 /* 697 * The NBD client can only really be considered idle when it has 698 * yielded from qio_channel_readv_all_eof(), waiting for data. This is 699 * the point where the additional scheduled coroutine entry happens 700 * after nbd_client_attach_aio_context(). 701 * 702 * Therefore we keep an additional in_flight reference all the time and 703 * only drop it temporarily here. 704 */ 705 706 if (nbd_client_connecting(s)) { 707 nbd_co_reconnect_loop(s); 708 } 709 710 if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { 711 continue; 712 } 713 714 assert(s->reply.handle == 0); 715 ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err); 716 717 if (local_err) { 718 trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err)); 719 error_free(local_err); 720 local_err = NULL; 721 } 722 if (ret <= 0) { 723 nbd_channel_error(s, ret ? ret : -EIO); 724 continue; 725 } 726 727 /* 728 * There's no need for a mutex on the receive side, because the 729 * handler acts as a synchronization point and ensures that only 730 * one coroutine is called until the reply finishes. 731 */ 732 i = HANDLE_TO_INDEX(s, s->reply.handle); 733 if (i >= MAX_NBD_REQUESTS || 734 !s->requests[i].coroutine || 735 !s->requests[i].receiving || 736 (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply)) 737 { 738 nbd_channel_error(s, -EINVAL); 739 continue; 740 } 741 742 /* 743 * We're woken up again by the request itself. Note that there 744 * is no race between yielding and reentering connection_co. This 745 * is because: 746 * 747 * - if the request runs on the same AioContext, it is only 748 * entered after we yield 749 * 750 * - if the request runs on a different AioContext, reentering 751 * connection_co happens through a bottom half, which can only 752 * run after we yield. 753 */ 754 aio_co_wake(s->requests[i].coroutine); 755 qemu_coroutine_yield(); 756 } 757 758 qemu_co_queue_restart_all(&s->free_sema); 759 nbd_recv_coroutines_wake_all(s); 760 bdrv_dec_in_flight(s->bs); 761 762 s->connection_co = NULL; 763 if (s->ioc) { 764 qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc)); 765 yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), 766 nbd_yank, s->bs); 767 object_unref(OBJECT(s->sioc)); 768 s->sioc = NULL; 769 object_unref(OBJECT(s->ioc)); 770 s->ioc = NULL; 771 } 772 773 if (s->teardown_co) { 774 aio_co_wake(s->teardown_co); 775 } 776 aio_wait_kick(); 777 } 778 779 static int nbd_co_send_request(BlockDriverState *bs, 780 NBDRequest *request, 781 QEMUIOVector *qiov) 782 { 783 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 784 int rc, i = -1; 785 786 qemu_co_mutex_lock(&s->send_mutex); 787 while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) { 788 qemu_co_queue_wait(&s->free_sema, &s->send_mutex); 789 } 790 791 if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { 792 rc = -EIO; 793 goto err; 794 } 795 796 s->in_flight++; 797 798 for (i = 0; i < MAX_NBD_REQUESTS; i++) { 799 if (s->requests[i].coroutine == NULL) { 800 break; 801 } 802 } 803 804 g_assert(qemu_in_coroutine()); 805 assert(i < MAX_NBD_REQUESTS); 806 807 s->requests[i].coroutine = qemu_coroutine_self(); 808 s->requests[i].offset = request->from; 809 s->requests[i].receiving = false; 810 811 request->handle = INDEX_TO_HANDLE(s, i); 812 813 assert(s->ioc); 814 815 if (qiov) { 816 qio_channel_set_cork(s->ioc, true); 817 rc = nbd_send_request(s->ioc, request); 818 if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTED && 819 rc >= 0) { 820 if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov, 821 NULL) < 0) { 822 rc = -EIO; 823 } 824 } else if (rc >= 0) { 825 rc = -EIO; 826 } 827 qio_channel_set_cork(s->ioc, false); 828 } else { 829 rc = nbd_send_request(s->ioc, request); 830 } 831 832 err: 833 if (rc < 0) { 834 nbd_channel_error(s, rc); 835 if (i != -1) { 836 s->requests[i].coroutine = NULL; 837 s->in_flight--; 838 } 839 if (s->in_flight == 0 && s->wait_in_flight) { 840 aio_co_wake(s->connection_co); 841 } else { 842 qemu_co_queue_next(&s->free_sema); 843 } 844 } 845 qemu_co_mutex_unlock(&s->send_mutex); 846 return rc; 847 } 848 849 static inline uint16_t payload_advance16(uint8_t **payload) 850 { 851 *payload += 2; 852 return lduw_be_p(*payload - 2); 853 } 854 855 static inline uint32_t payload_advance32(uint8_t **payload) 856 { 857 *payload += 4; 858 return ldl_be_p(*payload - 4); 859 } 860 861 static inline uint64_t payload_advance64(uint8_t **payload) 862 { 863 *payload += 8; 864 return ldq_be_p(*payload - 8); 865 } 866 867 static int nbd_parse_offset_hole_payload(BDRVNBDState *s, 868 NBDStructuredReplyChunk *chunk, 869 uint8_t *payload, uint64_t orig_offset, 870 QEMUIOVector *qiov, Error **errp) 871 { 872 uint64_t offset; 873 uint32_t hole_size; 874 875 if (chunk->length != sizeof(offset) + sizeof(hole_size)) { 876 error_setg(errp, "Protocol error: invalid payload for " 877 "NBD_REPLY_TYPE_OFFSET_HOLE"); 878 return -EINVAL; 879 } 880 881 offset = payload_advance64(&payload); 882 hole_size = payload_advance32(&payload); 883 884 if (!hole_size || offset < orig_offset || hole_size > qiov->size || 885 offset > orig_offset + qiov->size - hole_size) { 886 error_setg(errp, "Protocol error: server sent chunk exceeding requested" 887 " region"); 888 return -EINVAL; 889 } 890 if (s->info.min_block && 891 !QEMU_IS_ALIGNED(hole_size, s->info.min_block)) { 892 trace_nbd_structured_read_compliance("hole"); 893 } 894 895 qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size); 896 897 return 0; 898 } 899 900 /* 901 * nbd_parse_blockstatus_payload 902 * Based on our request, we expect only one extent in reply, for the 903 * base:allocation context. 904 */ 905 static int nbd_parse_blockstatus_payload(BDRVNBDState *s, 906 NBDStructuredReplyChunk *chunk, 907 uint8_t *payload, uint64_t orig_length, 908 NBDExtent *extent, Error **errp) 909 { 910 uint32_t context_id; 911 912 /* The server succeeded, so it must have sent [at least] one extent */ 913 if (chunk->length < sizeof(context_id) + sizeof(*extent)) { 914 error_setg(errp, "Protocol error: invalid payload for " 915 "NBD_REPLY_TYPE_BLOCK_STATUS"); 916 return -EINVAL; 917 } 918 919 context_id = payload_advance32(&payload); 920 if (s->info.context_id != context_id) { 921 error_setg(errp, "Protocol error: unexpected context id %d for " 922 "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context " 923 "id is %d", context_id, 924 s->info.context_id); 925 return -EINVAL; 926 } 927 928 extent->length = payload_advance32(&payload); 929 extent->flags = payload_advance32(&payload); 930 931 if (extent->length == 0) { 932 error_setg(errp, "Protocol error: server sent status chunk with " 933 "zero length"); 934 return -EINVAL; 935 } 936 937 /* 938 * A server sending unaligned block status is in violation of the 939 * protocol, but as qemu-nbd 3.1 is such a server (at least for 940 * POSIX files that are not a multiple of 512 bytes, since qemu 941 * rounds files up to 512-byte multiples but lseek(SEEK_HOLE) 942 * still sees an implicit hole beyond the real EOF), it's nicer to 943 * work around the misbehaving server. If the request included 944 * more than the final unaligned block, truncate it back to an 945 * aligned result; if the request was only the final block, round 946 * up to the full block and change the status to fully-allocated 947 * (always a safe status, even if it loses information). 948 */ 949 if (s->info.min_block && !QEMU_IS_ALIGNED(extent->length, 950 s->info.min_block)) { 951 trace_nbd_parse_blockstatus_compliance("extent length is unaligned"); 952 if (extent->length > s->info.min_block) { 953 extent->length = QEMU_ALIGN_DOWN(extent->length, 954 s->info.min_block); 955 } else { 956 extent->length = s->info.min_block; 957 extent->flags = 0; 958 } 959 } 960 961 /* 962 * We used NBD_CMD_FLAG_REQ_ONE, so the server should not have 963 * sent us any more than one extent, nor should it have included 964 * status beyond our request in that extent. However, it's easy 965 * enough to ignore the server's noncompliance without killing the 966 * connection; just ignore trailing extents, and clamp things to 967 * the length of our request. 968 */ 969 if (chunk->length > sizeof(context_id) + sizeof(*extent)) { 970 trace_nbd_parse_blockstatus_compliance("more than one extent"); 971 } 972 if (extent->length > orig_length) { 973 extent->length = orig_length; 974 trace_nbd_parse_blockstatus_compliance("extent length too large"); 975 } 976 977 /* 978 * HACK: if we are using x-dirty-bitmaps to access 979 * qemu:allocation-depth, treat all depths > 2 the same as 2, 980 * since nbd_client_co_block_status is only expecting the low two 981 * bits to be set. 982 */ 983 if (s->alloc_depth && extent->flags > 2) { 984 extent->flags = 2; 985 } 986 987 return 0; 988 } 989 990 /* 991 * nbd_parse_error_payload 992 * on success @errp contains message describing nbd error reply 993 */ 994 static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk, 995 uint8_t *payload, int *request_ret, 996 Error **errp) 997 { 998 uint32_t error; 999 uint16_t message_size; 1000 1001 assert(chunk->type & (1 << 15)); 1002 1003 if (chunk->length < sizeof(error) + sizeof(message_size)) { 1004 error_setg(errp, 1005 "Protocol error: invalid payload for structured error"); 1006 return -EINVAL; 1007 } 1008 1009 error = nbd_errno_to_system_errno(payload_advance32(&payload)); 1010 if (error == 0) { 1011 error_setg(errp, "Protocol error: server sent structured error chunk " 1012 "with error = 0"); 1013 return -EINVAL; 1014 } 1015 1016 *request_ret = -error; 1017 message_size = payload_advance16(&payload); 1018 1019 if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) { 1020 error_setg(errp, "Protocol error: server sent structured error chunk " 1021 "with incorrect message size"); 1022 return -EINVAL; 1023 } 1024 1025 /* TODO: Add a trace point to mention the server complaint */ 1026 1027 /* TODO handle ERROR_OFFSET */ 1028 1029 return 0; 1030 } 1031 1032 static int nbd_co_receive_offset_data_payload(BDRVNBDState *s, 1033 uint64_t orig_offset, 1034 QEMUIOVector *qiov, Error **errp) 1035 { 1036 QEMUIOVector sub_qiov; 1037 uint64_t offset; 1038 size_t data_size; 1039 int ret; 1040 NBDStructuredReplyChunk *chunk = &s->reply.structured; 1041 1042 assert(nbd_reply_is_structured(&s->reply)); 1043 1044 /* The NBD spec requires at least one byte of payload */ 1045 if (chunk->length <= sizeof(offset)) { 1046 error_setg(errp, "Protocol error: invalid payload for " 1047 "NBD_REPLY_TYPE_OFFSET_DATA"); 1048 return -EINVAL; 1049 } 1050 1051 if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) { 1052 return -EIO; 1053 } 1054 1055 data_size = chunk->length - sizeof(offset); 1056 assert(data_size); 1057 if (offset < orig_offset || data_size > qiov->size || 1058 offset > orig_offset + qiov->size - data_size) { 1059 error_setg(errp, "Protocol error: server sent chunk exceeding requested" 1060 " region"); 1061 return -EINVAL; 1062 } 1063 if (s->info.min_block && !QEMU_IS_ALIGNED(data_size, s->info.min_block)) { 1064 trace_nbd_structured_read_compliance("data"); 1065 } 1066 1067 qemu_iovec_init(&sub_qiov, qiov->niov); 1068 qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size); 1069 ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp); 1070 qemu_iovec_destroy(&sub_qiov); 1071 1072 return ret < 0 ? -EIO : 0; 1073 } 1074 1075 #define NBD_MAX_MALLOC_PAYLOAD 1000 1076 static coroutine_fn int nbd_co_receive_structured_payload( 1077 BDRVNBDState *s, void **payload, Error **errp) 1078 { 1079 int ret; 1080 uint32_t len; 1081 1082 assert(nbd_reply_is_structured(&s->reply)); 1083 1084 len = s->reply.structured.length; 1085 1086 if (len == 0) { 1087 return 0; 1088 } 1089 1090 if (payload == NULL) { 1091 error_setg(errp, "Unexpected structured payload"); 1092 return -EINVAL; 1093 } 1094 1095 if (len > NBD_MAX_MALLOC_PAYLOAD) { 1096 error_setg(errp, "Payload too large"); 1097 return -EINVAL; 1098 } 1099 1100 *payload = g_new(char, len); 1101 ret = nbd_read(s->ioc, *payload, len, "structured payload", errp); 1102 if (ret < 0) { 1103 g_free(*payload); 1104 *payload = NULL; 1105 return ret; 1106 } 1107 1108 return 0; 1109 } 1110 1111 /* 1112 * nbd_co_do_receive_one_chunk 1113 * for simple reply: 1114 * set request_ret to received reply error 1115 * if qiov is not NULL: read payload to @qiov 1116 * for structured reply chunk: 1117 * if error chunk: read payload, set @request_ret, do not set @payload 1118 * else if offset_data chunk: read payload data to @qiov, do not set @payload 1119 * else: read payload to @payload 1120 * 1121 * If function fails, @errp contains corresponding error message, and the 1122 * connection with the server is suspect. If it returns 0, then the 1123 * transaction succeeded (although @request_ret may be a negative errno 1124 * corresponding to the server's error reply), and errp is unchanged. 1125 */ 1126 static coroutine_fn int nbd_co_do_receive_one_chunk( 1127 BDRVNBDState *s, uint64_t handle, bool only_structured, 1128 int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp) 1129 { 1130 int ret; 1131 int i = HANDLE_TO_INDEX(s, handle); 1132 void *local_payload = NULL; 1133 NBDStructuredReplyChunk *chunk; 1134 1135 if (payload) { 1136 *payload = NULL; 1137 } 1138 *request_ret = 0; 1139 1140 /* Wait until we're woken up by nbd_connection_entry. */ 1141 s->requests[i].receiving = true; 1142 qemu_coroutine_yield(); 1143 s->requests[i].receiving = false; 1144 if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { 1145 error_setg(errp, "Connection closed"); 1146 return -EIO; 1147 } 1148 assert(s->ioc); 1149 1150 assert(s->reply.handle == handle); 1151 1152 if (nbd_reply_is_simple(&s->reply)) { 1153 if (only_structured) { 1154 error_setg(errp, "Protocol error: simple reply when structured " 1155 "reply chunk was expected"); 1156 return -EINVAL; 1157 } 1158 1159 *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error); 1160 if (*request_ret < 0 || !qiov) { 1161 return 0; 1162 } 1163 1164 return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov, 1165 errp) < 0 ? -EIO : 0; 1166 } 1167 1168 /* handle structured reply chunk */ 1169 assert(s->info.structured_reply); 1170 chunk = &s->reply.structured; 1171 1172 if (chunk->type == NBD_REPLY_TYPE_NONE) { 1173 if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) { 1174 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without" 1175 " NBD_REPLY_FLAG_DONE flag set"); 1176 return -EINVAL; 1177 } 1178 if (chunk->length) { 1179 error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with" 1180 " nonzero length"); 1181 return -EINVAL; 1182 } 1183 return 0; 1184 } 1185 1186 if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) { 1187 if (!qiov) { 1188 error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk"); 1189 return -EINVAL; 1190 } 1191 1192 return nbd_co_receive_offset_data_payload(s, s->requests[i].offset, 1193 qiov, errp); 1194 } 1195 1196 if (nbd_reply_type_is_error(chunk->type)) { 1197 payload = &local_payload; 1198 } 1199 1200 ret = nbd_co_receive_structured_payload(s, payload, errp); 1201 if (ret < 0) { 1202 return ret; 1203 } 1204 1205 if (nbd_reply_type_is_error(chunk->type)) { 1206 ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp); 1207 g_free(local_payload); 1208 return ret; 1209 } 1210 1211 return 0; 1212 } 1213 1214 /* 1215 * nbd_co_receive_one_chunk 1216 * Read reply, wake up connection_co and set s->quit if needed. 1217 * Return value is a fatal error code or normal nbd reply error code 1218 */ 1219 static coroutine_fn int nbd_co_receive_one_chunk( 1220 BDRVNBDState *s, uint64_t handle, bool only_structured, 1221 int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload, 1222 Error **errp) 1223 { 1224 int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured, 1225 request_ret, qiov, payload, errp); 1226 1227 if (ret < 0) { 1228 memset(reply, 0, sizeof(*reply)); 1229 nbd_channel_error(s, ret); 1230 } else { 1231 /* For assert at loop start in nbd_connection_entry */ 1232 *reply = s->reply; 1233 } 1234 s->reply.handle = 0; 1235 1236 if (s->connection_co && !s->wait_in_flight) { 1237 /* 1238 * We must check s->wait_in_flight, because we may entered by 1239 * nbd_recv_coroutines_wake_all(), in this case we should not 1240 * wake connection_co here, it will woken by last request. 1241 */ 1242 aio_co_wake(s->connection_co); 1243 } 1244 1245 return ret; 1246 } 1247 1248 typedef struct NBDReplyChunkIter { 1249 int ret; 1250 int request_ret; 1251 Error *err; 1252 bool done, only_structured; 1253 } NBDReplyChunkIter; 1254 1255 static void nbd_iter_channel_error(NBDReplyChunkIter *iter, 1256 int ret, Error **local_err) 1257 { 1258 assert(local_err && *local_err); 1259 assert(ret < 0); 1260 1261 if (!iter->ret) { 1262 iter->ret = ret; 1263 error_propagate(&iter->err, *local_err); 1264 } else { 1265 error_free(*local_err); 1266 } 1267 1268 *local_err = NULL; 1269 } 1270 1271 static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret) 1272 { 1273 assert(ret < 0); 1274 1275 if (!iter->request_ret) { 1276 iter->request_ret = ret; 1277 } 1278 } 1279 1280 /* 1281 * NBD_FOREACH_REPLY_CHUNK 1282 * The pointer stored in @payload requires g_free() to free it. 1283 */ 1284 #define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \ 1285 qiov, reply, payload) \ 1286 for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \ 1287 nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);) 1288 1289 /* 1290 * nbd_reply_chunk_iter_receive 1291 * The pointer stored in @payload requires g_free() to free it. 1292 */ 1293 static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s, 1294 NBDReplyChunkIter *iter, 1295 uint64_t handle, 1296 QEMUIOVector *qiov, NBDReply *reply, 1297 void **payload) 1298 { 1299 int ret, request_ret; 1300 NBDReply local_reply; 1301 NBDStructuredReplyChunk *chunk; 1302 Error *local_err = NULL; 1303 if (qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { 1304 error_setg(&local_err, "Connection closed"); 1305 nbd_iter_channel_error(iter, -EIO, &local_err); 1306 goto break_loop; 1307 } 1308 1309 if (iter->done) { 1310 /* Previous iteration was last. */ 1311 goto break_loop; 1312 } 1313 1314 if (reply == NULL) { 1315 reply = &local_reply; 1316 } 1317 1318 ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured, 1319 &request_ret, qiov, reply, payload, 1320 &local_err); 1321 if (ret < 0) { 1322 nbd_iter_channel_error(iter, ret, &local_err); 1323 } else if (request_ret < 0) { 1324 nbd_iter_request_error(iter, request_ret); 1325 } 1326 1327 /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */ 1328 if (nbd_reply_is_simple(reply) || 1329 qatomic_load_acquire(&s->state) != NBD_CLIENT_CONNECTED) { 1330 goto break_loop; 1331 } 1332 1333 chunk = &reply->structured; 1334 iter->only_structured = true; 1335 1336 if (chunk->type == NBD_REPLY_TYPE_NONE) { 1337 /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */ 1338 assert(chunk->flags & NBD_REPLY_FLAG_DONE); 1339 goto break_loop; 1340 } 1341 1342 if (chunk->flags & NBD_REPLY_FLAG_DONE) { 1343 /* This iteration is last. */ 1344 iter->done = true; 1345 } 1346 1347 /* Execute the loop body */ 1348 return true; 1349 1350 break_loop: 1351 s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL; 1352 1353 qemu_co_mutex_lock(&s->send_mutex); 1354 s->in_flight--; 1355 if (s->in_flight == 0 && s->wait_in_flight) { 1356 aio_co_wake(s->connection_co); 1357 } else { 1358 qemu_co_queue_next(&s->free_sema); 1359 } 1360 qemu_co_mutex_unlock(&s->send_mutex); 1361 1362 return false; 1363 } 1364 1365 static int nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle, 1366 int *request_ret, Error **errp) 1367 { 1368 NBDReplyChunkIter iter; 1369 1370 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) { 1371 /* nbd_reply_chunk_iter_receive does all the work */ 1372 } 1373 1374 error_propagate(errp, iter.err); 1375 *request_ret = iter.request_ret; 1376 return iter.ret; 1377 } 1378 1379 static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle, 1380 uint64_t offset, QEMUIOVector *qiov, 1381 int *request_ret, Error **errp) 1382 { 1383 NBDReplyChunkIter iter; 1384 NBDReply reply; 1385 void *payload = NULL; 1386 Error *local_err = NULL; 1387 1388 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply, 1389 qiov, &reply, &payload) 1390 { 1391 int ret; 1392 NBDStructuredReplyChunk *chunk = &reply.structured; 1393 1394 assert(nbd_reply_is_structured(&reply)); 1395 1396 switch (chunk->type) { 1397 case NBD_REPLY_TYPE_OFFSET_DATA: 1398 /* 1399 * special cased in nbd_co_receive_one_chunk, data is already 1400 * in qiov 1401 */ 1402 break; 1403 case NBD_REPLY_TYPE_OFFSET_HOLE: 1404 ret = nbd_parse_offset_hole_payload(s, &reply.structured, payload, 1405 offset, qiov, &local_err); 1406 if (ret < 0) { 1407 nbd_channel_error(s, ret); 1408 nbd_iter_channel_error(&iter, ret, &local_err); 1409 } 1410 break; 1411 default: 1412 if (!nbd_reply_type_is_error(chunk->type)) { 1413 /* not allowed reply type */ 1414 nbd_channel_error(s, -EINVAL); 1415 error_setg(&local_err, 1416 "Unexpected reply type: %d (%s) for CMD_READ", 1417 chunk->type, nbd_reply_type_lookup(chunk->type)); 1418 nbd_iter_channel_error(&iter, -EINVAL, &local_err); 1419 } 1420 } 1421 1422 g_free(payload); 1423 payload = NULL; 1424 } 1425 1426 error_propagate(errp, iter.err); 1427 *request_ret = iter.request_ret; 1428 return iter.ret; 1429 } 1430 1431 static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s, 1432 uint64_t handle, uint64_t length, 1433 NBDExtent *extent, 1434 int *request_ret, Error **errp) 1435 { 1436 NBDReplyChunkIter iter; 1437 NBDReply reply; 1438 void *payload = NULL; 1439 Error *local_err = NULL; 1440 bool received = false; 1441 1442 assert(!extent->length); 1443 NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, &reply, &payload) { 1444 int ret; 1445 NBDStructuredReplyChunk *chunk = &reply.structured; 1446 1447 assert(nbd_reply_is_structured(&reply)); 1448 1449 switch (chunk->type) { 1450 case NBD_REPLY_TYPE_BLOCK_STATUS: 1451 if (received) { 1452 nbd_channel_error(s, -EINVAL); 1453 error_setg(&local_err, "Several BLOCK_STATUS chunks in reply"); 1454 nbd_iter_channel_error(&iter, -EINVAL, &local_err); 1455 } 1456 received = true; 1457 1458 ret = nbd_parse_blockstatus_payload(s, &reply.structured, 1459 payload, length, extent, 1460 &local_err); 1461 if (ret < 0) { 1462 nbd_channel_error(s, ret); 1463 nbd_iter_channel_error(&iter, ret, &local_err); 1464 } 1465 break; 1466 default: 1467 if (!nbd_reply_type_is_error(chunk->type)) { 1468 nbd_channel_error(s, -EINVAL); 1469 error_setg(&local_err, 1470 "Unexpected reply type: %d (%s) " 1471 "for CMD_BLOCK_STATUS", 1472 chunk->type, nbd_reply_type_lookup(chunk->type)); 1473 nbd_iter_channel_error(&iter, -EINVAL, &local_err); 1474 } 1475 } 1476 1477 g_free(payload); 1478 payload = NULL; 1479 } 1480 1481 if (!extent->length && !iter.request_ret) { 1482 error_setg(&local_err, "Server did not reply with any status extents"); 1483 nbd_iter_channel_error(&iter, -EIO, &local_err); 1484 } 1485 1486 error_propagate(errp, iter.err); 1487 *request_ret = iter.request_ret; 1488 return iter.ret; 1489 } 1490 1491 static int nbd_co_request(BlockDriverState *bs, NBDRequest *request, 1492 QEMUIOVector *write_qiov) 1493 { 1494 int ret, request_ret; 1495 Error *local_err = NULL; 1496 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 1497 1498 assert(request->type != NBD_CMD_READ); 1499 if (write_qiov) { 1500 assert(request->type == NBD_CMD_WRITE); 1501 assert(request->len == iov_size(write_qiov->iov, write_qiov->niov)); 1502 } else { 1503 assert(request->type != NBD_CMD_WRITE); 1504 } 1505 1506 do { 1507 ret = nbd_co_send_request(bs, request, write_qiov); 1508 if (ret < 0) { 1509 continue; 1510 } 1511 1512 ret = nbd_co_receive_return_code(s, request->handle, 1513 &request_ret, &local_err); 1514 if (local_err) { 1515 trace_nbd_co_request_fail(request->from, request->len, 1516 request->handle, request->flags, 1517 request->type, 1518 nbd_cmd_lookup(request->type), 1519 ret, error_get_pretty(local_err)); 1520 error_free(local_err); 1521 local_err = NULL; 1522 } 1523 } while (ret < 0 && nbd_client_connecting_wait(s)); 1524 1525 return ret ? ret : request_ret; 1526 } 1527 1528 static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset, 1529 uint64_t bytes, QEMUIOVector *qiov, int flags) 1530 { 1531 int ret, request_ret; 1532 Error *local_err = NULL; 1533 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 1534 NBDRequest request = { 1535 .type = NBD_CMD_READ, 1536 .from = offset, 1537 .len = bytes, 1538 }; 1539 1540 assert(bytes <= NBD_MAX_BUFFER_SIZE); 1541 assert(!flags); 1542 1543 if (!bytes) { 1544 return 0; 1545 } 1546 /* 1547 * Work around the fact that the block layer doesn't do 1548 * byte-accurate sizing yet - if the read exceeds the server's 1549 * advertised size because the block layer rounded size up, then 1550 * truncate the request to the server and tail-pad with zero. 1551 */ 1552 if (offset >= s->info.size) { 1553 assert(bytes < BDRV_SECTOR_SIZE); 1554 qemu_iovec_memset(qiov, 0, 0, bytes); 1555 return 0; 1556 } 1557 if (offset + bytes > s->info.size) { 1558 uint64_t slop = offset + bytes - s->info.size; 1559 1560 assert(slop < BDRV_SECTOR_SIZE); 1561 qemu_iovec_memset(qiov, bytes - slop, 0, slop); 1562 request.len -= slop; 1563 } 1564 1565 do { 1566 ret = nbd_co_send_request(bs, &request, NULL); 1567 if (ret < 0) { 1568 continue; 1569 } 1570 1571 ret = nbd_co_receive_cmdread_reply(s, request.handle, offset, qiov, 1572 &request_ret, &local_err); 1573 if (local_err) { 1574 trace_nbd_co_request_fail(request.from, request.len, request.handle, 1575 request.flags, request.type, 1576 nbd_cmd_lookup(request.type), 1577 ret, error_get_pretty(local_err)); 1578 error_free(local_err); 1579 local_err = NULL; 1580 } 1581 } while (ret < 0 && nbd_client_connecting_wait(s)); 1582 1583 return ret ? ret : request_ret; 1584 } 1585 1586 static int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset, 1587 uint64_t bytes, QEMUIOVector *qiov, int flags) 1588 { 1589 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 1590 NBDRequest request = { 1591 .type = NBD_CMD_WRITE, 1592 .from = offset, 1593 .len = bytes, 1594 }; 1595 1596 assert(!(s->info.flags & NBD_FLAG_READ_ONLY)); 1597 if (flags & BDRV_REQ_FUA) { 1598 assert(s->info.flags & NBD_FLAG_SEND_FUA); 1599 request.flags |= NBD_CMD_FLAG_FUA; 1600 } 1601 1602 assert(bytes <= NBD_MAX_BUFFER_SIZE); 1603 1604 if (!bytes) { 1605 return 0; 1606 } 1607 return nbd_co_request(bs, &request, qiov); 1608 } 1609 1610 static int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, 1611 int bytes, BdrvRequestFlags flags) 1612 { 1613 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 1614 NBDRequest request = { 1615 .type = NBD_CMD_WRITE_ZEROES, 1616 .from = offset, 1617 .len = bytes, 1618 }; 1619 1620 assert(!(s->info.flags & NBD_FLAG_READ_ONLY)); 1621 if (!(s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) { 1622 return -ENOTSUP; 1623 } 1624 1625 if (flags & BDRV_REQ_FUA) { 1626 assert(s->info.flags & NBD_FLAG_SEND_FUA); 1627 request.flags |= NBD_CMD_FLAG_FUA; 1628 } 1629 if (!(flags & BDRV_REQ_MAY_UNMAP)) { 1630 request.flags |= NBD_CMD_FLAG_NO_HOLE; 1631 } 1632 if (flags & BDRV_REQ_NO_FALLBACK) { 1633 assert(s->info.flags & NBD_FLAG_SEND_FAST_ZERO); 1634 request.flags |= NBD_CMD_FLAG_FAST_ZERO; 1635 } 1636 1637 if (!bytes) { 1638 return 0; 1639 } 1640 return nbd_co_request(bs, &request, NULL); 1641 } 1642 1643 static int nbd_client_co_flush(BlockDriverState *bs) 1644 { 1645 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 1646 NBDRequest request = { .type = NBD_CMD_FLUSH }; 1647 1648 if (!(s->info.flags & NBD_FLAG_SEND_FLUSH)) { 1649 return 0; 1650 } 1651 1652 request.from = 0; 1653 request.len = 0; 1654 1655 return nbd_co_request(bs, &request, NULL); 1656 } 1657 1658 static int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, 1659 int bytes) 1660 { 1661 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 1662 NBDRequest request = { 1663 .type = NBD_CMD_TRIM, 1664 .from = offset, 1665 .len = bytes, 1666 }; 1667 1668 assert(!(s->info.flags & NBD_FLAG_READ_ONLY)); 1669 if (!(s->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) { 1670 return 0; 1671 } 1672 1673 return nbd_co_request(bs, &request, NULL); 1674 } 1675 1676 static int coroutine_fn nbd_client_co_block_status( 1677 BlockDriverState *bs, bool want_zero, int64_t offset, int64_t bytes, 1678 int64_t *pnum, int64_t *map, BlockDriverState **file) 1679 { 1680 int ret, request_ret; 1681 NBDExtent extent = { 0 }; 1682 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 1683 Error *local_err = NULL; 1684 1685 NBDRequest request = { 1686 .type = NBD_CMD_BLOCK_STATUS, 1687 .from = offset, 1688 .len = MIN(QEMU_ALIGN_DOWN(INT_MAX, bs->bl.request_alignment), 1689 MIN(bytes, s->info.size - offset)), 1690 .flags = NBD_CMD_FLAG_REQ_ONE, 1691 }; 1692 1693 if (!s->info.base_allocation) { 1694 *pnum = bytes; 1695 *map = offset; 1696 *file = bs; 1697 return BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID; 1698 } 1699 1700 /* 1701 * Work around the fact that the block layer doesn't do 1702 * byte-accurate sizing yet - if the status request exceeds the 1703 * server's advertised size because the block layer rounded size 1704 * up, we truncated the request to the server (above), or are 1705 * called on just the hole. 1706 */ 1707 if (offset >= s->info.size) { 1708 *pnum = bytes; 1709 assert(bytes < BDRV_SECTOR_SIZE); 1710 /* Intentionally don't report offset_valid for the hole */ 1711 return BDRV_BLOCK_ZERO; 1712 } 1713 1714 if (s->info.min_block) { 1715 assert(QEMU_IS_ALIGNED(request.len, s->info.min_block)); 1716 } 1717 do { 1718 ret = nbd_co_send_request(bs, &request, NULL); 1719 if (ret < 0) { 1720 continue; 1721 } 1722 1723 ret = nbd_co_receive_blockstatus_reply(s, request.handle, bytes, 1724 &extent, &request_ret, 1725 &local_err); 1726 if (local_err) { 1727 trace_nbd_co_request_fail(request.from, request.len, request.handle, 1728 request.flags, request.type, 1729 nbd_cmd_lookup(request.type), 1730 ret, error_get_pretty(local_err)); 1731 error_free(local_err); 1732 local_err = NULL; 1733 } 1734 } while (ret < 0 && nbd_client_connecting_wait(s)); 1735 1736 if (ret < 0 || request_ret < 0) { 1737 return ret ? ret : request_ret; 1738 } 1739 1740 assert(extent.length); 1741 *pnum = extent.length; 1742 *map = offset; 1743 *file = bs; 1744 return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) | 1745 (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0) | 1746 BDRV_BLOCK_OFFSET_VALID; 1747 } 1748 1749 static int nbd_client_reopen_prepare(BDRVReopenState *state, 1750 BlockReopenQueue *queue, Error **errp) 1751 { 1752 BDRVNBDState *s = (BDRVNBDState *)state->bs->opaque; 1753 1754 if ((state->flags & BDRV_O_RDWR) && (s->info.flags & NBD_FLAG_READ_ONLY)) { 1755 error_setg(errp, "Can't reopen read-only NBD mount as read/write"); 1756 return -EACCES; 1757 } 1758 return 0; 1759 } 1760 1761 static void nbd_yank(void *opaque) 1762 { 1763 BlockDriverState *bs = opaque; 1764 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 1765 1766 qatomic_store_release(&s->state, NBD_CLIENT_QUIT); 1767 qio_channel_shutdown(QIO_CHANNEL(s->sioc), QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1768 } 1769 1770 static void nbd_client_close(BlockDriverState *bs) 1771 { 1772 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 1773 NBDRequest request = { .type = NBD_CMD_DISC }; 1774 1775 if (s->ioc) { 1776 nbd_send_request(s->ioc, &request); 1777 } 1778 1779 nbd_teardown_connection(bs); 1780 } 1781 1782 static int nbd_establish_connection(BlockDriverState *bs, 1783 SocketAddress *saddr, 1784 Error **errp) 1785 { 1786 ERRP_GUARD(); 1787 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 1788 1789 s->sioc = qio_channel_socket_new(); 1790 qio_channel_set_name(QIO_CHANNEL(s->sioc), "nbd-client"); 1791 1792 qio_channel_socket_connect_sync(s->sioc, saddr, errp); 1793 if (*errp) { 1794 object_unref(OBJECT(s->sioc)); 1795 s->sioc = NULL; 1796 return -1; 1797 } 1798 1799 yank_register_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), nbd_yank, bs); 1800 qio_channel_set_delay(QIO_CHANNEL(s->sioc), false); 1801 1802 return 0; 1803 } 1804 1805 /* nbd_client_handshake takes ownership on s->sioc. On failure it's unref'ed. */ 1806 static int nbd_client_handshake(BlockDriverState *bs, Error **errp) 1807 { 1808 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 1809 AioContext *aio_context = bdrv_get_aio_context(bs); 1810 int ret; 1811 1812 trace_nbd_client_handshake(s->export); 1813 qio_channel_set_blocking(QIO_CHANNEL(s->sioc), false, NULL); 1814 qio_channel_attach_aio_context(QIO_CHANNEL(s->sioc), aio_context); 1815 1816 s->info.request_sizes = true; 1817 s->info.structured_reply = true; 1818 s->info.base_allocation = true; 1819 s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap); 1820 s->info.name = g_strdup(s->export ?: ""); 1821 ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(s->sioc), s->tlscreds, 1822 s->hostname, &s->ioc, &s->info, errp); 1823 g_free(s->info.x_dirty_bitmap); 1824 g_free(s->info.name); 1825 if (ret < 0) { 1826 yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), 1827 nbd_yank, bs); 1828 object_unref(OBJECT(s->sioc)); 1829 s->sioc = NULL; 1830 return ret; 1831 } 1832 if (s->x_dirty_bitmap) { 1833 if (!s->info.base_allocation) { 1834 error_setg(errp, "requested x-dirty-bitmap %s not found", 1835 s->x_dirty_bitmap); 1836 ret = -EINVAL; 1837 goto fail; 1838 } 1839 if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) { 1840 s->alloc_depth = true; 1841 } 1842 } 1843 if (s->info.flags & NBD_FLAG_READ_ONLY) { 1844 ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp); 1845 if (ret < 0) { 1846 goto fail; 1847 } 1848 } 1849 if (s->info.flags & NBD_FLAG_SEND_FUA) { 1850 bs->supported_write_flags = BDRV_REQ_FUA; 1851 bs->supported_zero_flags |= BDRV_REQ_FUA; 1852 } 1853 if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) { 1854 bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP; 1855 if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) { 1856 bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK; 1857 } 1858 } 1859 1860 if (!s->ioc) { 1861 s->ioc = QIO_CHANNEL(s->sioc); 1862 object_ref(OBJECT(s->ioc)); 1863 } 1864 1865 trace_nbd_client_handshake_success(s->export); 1866 1867 return 0; 1868 1869 fail: 1870 /* 1871 * We have connected, but must fail for other reasons. 1872 * Send NBD_CMD_DISC as a courtesy to the server. 1873 */ 1874 { 1875 NBDRequest request = { .type = NBD_CMD_DISC }; 1876 1877 nbd_send_request(s->ioc ?: QIO_CHANNEL(s->sioc), &request); 1878 1879 yank_unregister_function(BLOCKDEV_YANK_INSTANCE(bs->node_name), 1880 nbd_yank, bs); 1881 object_unref(OBJECT(s->sioc)); 1882 s->sioc = NULL; 1883 1884 return ret; 1885 } 1886 } 1887 1888 /* 1889 * Parse nbd_open options 1890 */ 1891 1892 static int nbd_parse_uri(const char *filename, QDict *options) 1893 { 1894 URI *uri; 1895 const char *p; 1896 QueryParams *qp = NULL; 1897 int ret = 0; 1898 bool is_unix; 1899 1900 uri = uri_parse(filename); 1901 if (!uri) { 1902 return -EINVAL; 1903 } 1904 1905 /* transport */ 1906 if (!g_strcmp0(uri->scheme, "nbd")) { 1907 is_unix = false; 1908 } else if (!g_strcmp0(uri->scheme, "nbd+tcp")) { 1909 is_unix = false; 1910 } else if (!g_strcmp0(uri->scheme, "nbd+unix")) { 1911 is_unix = true; 1912 } else { 1913 ret = -EINVAL; 1914 goto out; 1915 } 1916 1917 p = uri->path ? uri->path : ""; 1918 if (p[0] == '/') { 1919 p++; 1920 } 1921 if (p[0]) { 1922 qdict_put_str(options, "export", p); 1923 } 1924 1925 qp = query_params_parse(uri->query); 1926 if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) { 1927 ret = -EINVAL; 1928 goto out; 1929 } 1930 1931 if (is_unix) { 1932 /* nbd+unix:///export?socket=path */ 1933 if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) { 1934 ret = -EINVAL; 1935 goto out; 1936 } 1937 qdict_put_str(options, "server.type", "unix"); 1938 qdict_put_str(options, "server.path", qp->p[0].value); 1939 } else { 1940 QString *host; 1941 char *port_str; 1942 1943 /* nbd[+tcp]://host[:port]/export */ 1944 if (!uri->server) { 1945 ret = -EINVAL; 1946 goto out; 1947 } 1948 1949 /* strip braces from literal IPv6 address */ 1950 if (uri->server[0] == '[') { 1951 host = qstring_from_substr(uri->server, 1, 1952 strlen(uri->server) - 1); 1953 } else { 1954 host = qstring_from_str(uri->server); 1955 } 1956 1957 qdict_put_str(options, "server.type", "inet"); 1958 qdict_put(options, "server.host", host); 1959 1960 port_str = g_strdup_printf("%d", uri->port ?: NBD_DEFAULT_PORT); 1961 qdict_put_str(options, "server.port", port_str); 1962 g_free(port_str); 1963 } 1964 1965 out: 1966 if (qp) { 1967 query_params_free(qp); 1968 } 1969 uri_free(uri); 1970 return ret; 1971 } 1972 1973 static bool nbd_has_filename_options_conflict(QDict *options, Error **errp) 1974 { 1975 const QDictEntry *e; 1976 1977 for (e = qdict_first(options); e; e = qdict_next(options, e)) { 1978 if (!strcmp(e->key, "host") || 1979 !strcmp(e->key, "port") || 1980 !strcmp(e->key, "path") || 1981 !strcmp(e->key, "export") || 1982 strstart(e->key, "server.", NULL)) 1983 { 1984 error_setg(errp, "Option '%s' cannot be used with a file name", 1985 e->key); 1986 return true; 1987 } 1988 } 1989 1990 return false; 1991 } 1992 1993 static void nbd_parse_filename(const char *filename, QDict *options, 1994 Error **errp) 1995 { 1996 g_autofree char *file = NULL; 1997 char *export_name; 1998 const char *host_spec; 1999 const char *unixpath; 2000 2001 if (nbd_has_filename_options_conflict(options, errp)) { 2002 return; 2003 } 2004 2005 if (strstr(filename, "://")) { 2006 int ret = nbd_parse_uri(filename, options); 2007 if (ret < 0) { 2008 error_setg(errp, "No valid URL specified"); 2009 } 2010 return; 2011 } 2012 2013 file = g_strdup(filename); 2014 2015 export_name = strstr(file, EN_OPTSTR); 2016 if (export_name) { 2017 if (export_name[strlen(EN_OPTSTR)] == 0) { 2018 return; 2019 } 2020 export_name[0] = 0; /* truncate 'file' */ 2021 export_name += strlen(EN_OPTSTR); 2022 2023 qdict_put_str(options, "export", export_name); 2024 } 2025 2026 /* extract the host_spec - fail if it's not nbd:... */ 2027 if (!strstart(file, "nbd:", &host_spec)) { 2028 error_setg(errp, "File name string for NBD must start with 'nbd:'"); 2029 return; 2030 } 2031 2032 if (!*host_spec) { 2033 return; 2034 } 2035 2036 /* are we a UNIX or TCP socket? */ 2037 if (strstart(host_spec, "unix:", &unixpath)) { 2038 qdict_put_str(options, "server.type", "unix"); 2039 qdict_put_str(options, "server.path", unixpath); 2040 } else { 2041 InetSocketAddress *addr = g_new(InetSocketAddress, 1); 2042 2043 if (inet_parse(addr, host_spec, errp)) { 2044 goto out_inet; 2045 } 2046 2047 qdict_put_str(options, "server.type", "inet"); 2048 qdict_put_str(options, "server.host", addr->host); 2049 qdict_put_str(options, "server.port", addr->port); 2050 out_inet: 2051 qapi_free_InetSocketAddress(addr); 2052 } 2053 } 2054 2055 static bool nbd_process_legacy_socket_options(QDict *output_options, 2056 QemuOpts *legacy_opts, 2057 Error **errp) 2058 { 2059 const char *path = qemu_opt_get(legacy_opts, "path"); 2060 const char *host = qemu_opt_get(legacy_opts, "host"); 2061 const char *port = qemu_opt_get(legacy_opts, "port"); 2062 const QDictEntry *e; 2063 2064 if (!path && !host && !port) { 2065 return true; 2066 } 2067 2068 for (e = qdict_first(output_options); e; e = qdict_next(output_options, e)) 2069 { 2070 if (strstart(e->key, "server.", NULL)) { 2071 error_setg(errp, "Cannot use 'server' and path/host/port at the " 2072 "same time"); 2073 return false; 2074 } 2075 } 2076 2077 if (path && host) { 2078 error_setg(errp, "path and host may not be used at the same time"); 2079 return false; 2080 } else if (path) { 2081 if (port) { 2082 error_setg(errp, "port may not be used without host"); 2083 return false; 2084 } 2085 2086 qdict_put_str(output_options, "server.type", "unix"); 2087 qdict_put_str(output_options, "server.path", path); 2088 } else if (host) { 2089 qdict_put_str(output_options, "server.type", "inet"); 2090 qdict_put_str(output_options, "server.host", host); 2091 qdict_put_str(output_options, "server.port", 2092 port ?: stringify(NBD_DEFAULT_PORT)); 2093 } 2094 2095 return true; 2096 } 2097 2098 static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options, 2099 Error **errp) 2100 { 2101 SocketAddress *saddr = NULL; 2102 QDict *addr = NULL; 2103 Visitor *iv = NULL; 2104 2105 qdict_extract_subqdict(options, &addr, "server."); 2106 if (!qdict_size(addr)) { 2107 error_setg(errp, "NBD server address missing"); 2108 goto done; 2109 } 2110 2111 iv = qobject_input_visitor_new_flat_confused(addr, errp); 2112 if (!iv) { 2113 goto done; 2114 } 2115 2116 if (!visit_type_SocketAddress(iv, NULL, &saddr, errp)) { 2117 goto done; 2118 } 2119 2120 done: 2121 qobject_unref(addr); 2122 visit_free(iv); 2123 return saddr; 2124 } 2125 2126 static QCryptoTLSCreds *nbd_get_tls_creds(const char *id, Error **errp) 2127 { 2128 Object *obj; 2129 QCryptoTLSCreds *creds; 2130 2131 obj = object_resolve_path_component( 2132 object_get_objects_root(), id); 2133 if (!obj) { 2134 error_setg(errp, "No TLS credentials with id '%s'", 2135 id); 2136 return NULL; 2137 } 2138 creds = (QCryptoTLSCreds *) 2139 object_dynamic_cast(obj, TYPE_QCRYPTO_TLS_CREDS); 2140 if (!creds) { 2141 error_setg(errp, "Object with id '%s' is not TLS credentials", 2142 id); 2143 return NULL; 2144 } 2145 2146 if (creds->endpoint != QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT) { 2147 error_setg(errp, 2148 "Expecting TLS credentials with a client endpoint"); 2149 return NULL; 2150 } 2151 object_ref(obj); 2152 return creds; 2153 } 2154 2155 2156 static QemuOptsList nbd_runtime_opts = { 2157 .name = "nbd", 2158 .head = QTAILQ_HEAD_INITIALIZER(nbd_runtime_opts.head), 2159 .desc = { 2160 { 2161 .name = "host", 2162 .type = QEMU_OPT_STRING, 2163 .help = "TCP host to connect to", 2164 }, 2165 { 2166 .name = "port", 2167 .type = QEMU_OPT_STRING, 2168 .help = "TCP port to connect to", 2169 }, 2170 { 2171 .name = "path", 2172 .type = QEMU_OPT_STRING, 2173 .help = "Unix socket path to connect to", 2174 }, 2175 { 2176 .name = "export", 2177 .type = QEMU_OPT_STRING, 2178 .help = "Name of the NBD export to open", 2179 }, 2180 { 2181 .name = "tls-creds", 2182 .type = QEMU_OPT_STRING, 2183 .help = "ID of the TLS credentials to use", 2184 }, 2185 { 2186 .name = "x-dirty-bitmap", 2187 .type = QEMU_OPT_STRING, 2188 .help = "experimental: expose named dirty bitmap in place of " 2189 "block status", 2190 }, 2191 { 2192 .name = "reconnect-delay", 2193 .type = QEMU_OPT_NUMBER, 2194 .help = "On an unexpected disconnect, the nbd client tries to " 2195 "connect again until succeeding or encountering a serious " 2196 "error. During the first @reconnect-delay seconds, all " 2197 "requests are paused and will be rerun on a successful " 2198 "reconnect. After that time, any delayed requests and all " 2199 "future requests before a successful reconnect will " 2200 "immediately fail. Default 0", 2201 }, 2202 { /* end of list */ } 2203 }, 2204 }; 2205 2206 static int nbd_process_options(BlockDriverState *bs, QDict *options, 2207 Error **errp) 2208 { 2209 BDRVNBDState *s = bs->opaque; 2210 QemuOpts *opts; 2211 int ret = -EINVAL; 2212 2213 opts = qemu_opts_create(&nbd_runtime_opts, NULL, 0, &error_abort); 2214 if (!qemu_opts_absorb_qdict(opts, options, errp)) { 2215 goto error; 2216 } 2217 2218 /* Translate @host, @port, and @path to a SocketAddress */ 2219 if (!nbd_process_legacy_socket_options(options, opts, errp)) { 2220 goto error; 2221 } 2222 2223 /* Pop the config into our state object. Exit if invalid. */ 2224 s->saddr = nbd_config(s, options, errp); 2225 if (!s->saddr) { 2226 goto error; 2227 } 2228 2229 s->export = g_strdup(qemu_opt_get(opts, "export")); 2230 if (s->export && strlen(s->export) > NBD_MAX_STRING_SIZE) { 2231 error_setg(errp, "export name too long to send to server"); 2232 goto error; 2233 } 2234 2235 s->tlscredsid = g_strdup(qemu_opt_get(opts, "tls-creds")); 2236 if (s->tlscredsid) { 2237 s->tlscreds = nbd_get_tls_creds(s->tlscredsid, errp); 2238 if (!s->tlscreds) { 2239 goto error; 2240 } 2241 2242 /* TODO SOCKET_ADDRESS_KIND_FD where fd has AF_INET or AF_INET6 */ 2243 if (s->saddr->type != SOCKET_ADDRESS_TYPE_INET) { 2244 error_setg(errp, "TLS only supported over IP sockets"); 2245 goto error; 2246 } 2247 s->hostname = s->saddr->u.inet.host; 2248 } 2249 2250 s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap")); 2251 if (s->x_dirty_bitmap && strlen(s->x_dirty_bitmap) > NBD_MAX_STRING_SIZE) { 2252 error_setg(errp, "x-dirty-bitmap query too long to send to server"); 2253 goto error; 2254 } 2255 2256 s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0); 2257 2258 ret = 0; 2259 2260 error: 2261 if (ret < 0) { 2262 nbd_clear_bdrvstate(s); 2263 } 2264 qemu_opts_del(opts); 2265 return ret; 2266 } 2267 2268 static int nbd_open(BlockDriverState *bs, QDict *options, int flags, 2269 Error **errp) 2270 { 2271 int ret; 2272 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 2273 2274 ret = nbd_process_options(bs, options, errp); 2275 if (ret < 0) { 2276 return ret; 2277 } 2278 2279 s->bs = bs; 2280 qemu_co_mutex_init(&s->send_mutex); 2281 qemu_co_queue_init(&s->free_sema); 2282 2283 if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) { 2284 return -EEXIST; 2285 } 2286 2287 /* 2288 * establish TCP connection, return error if it fails 2289 * TODO: Configurable retry-until-timeout behaviour. 2290 */ 2291 if (nbd_establish_connection(bs, s->saddr, errp) < 0) { 2292 yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name)); 2293 return -ECONNREFUSED; 2294 } 2295 2296 ret = nbd_client_handshake(bs, errp); 2297 if (ret < 0) { 2298 yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name)); 2299 nbd_clear_bdrvstate(s); 2300 return ret; 2301 } 2302 /* successfully connected */ 2303 s->state = NBD_CLIENT_CONNECTED; 2304 2305 nbd_init_connect_thread(s); 2306 2307 s->connection_co = qemu_coroutine_create(nbd_connection_entry, s); 2308 bdrv_inc_in_flight(bs); 2309 aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co); 2310 2311 return 0; 2312 } 2313 2314 static int nbd_co_flush(BlockDriverState *bs) 2315 { 2316 return nbd_client_co_flush(bs); 2317 } 2318 2319 static void nbd_refresh_limits(BlockDriverState *bs, Error **errp) 2320 { 2321 BDRVNBDState *s = (BDRVNBDState *)bs->opaque; 2322 uint32_t min = s->info.min_block; 2323 uint32_t max = MIN_NON_ZERO(NBD_MAX_BUFFER_SIZE, s->info.max_block); 2324 2325 /* 2326 * If the server did not advertise an alignment: 2327 * - a size that is not sector-aligned implies that an alignment 2328 * of 1 can be used to access those tail bytes 2329 * - advertisement of block status requires an alignment of 1, so 2330 * that we don't violate block layer constraints that block 2331 * status is always aligned (as we can't control whether the 2332 * server will report sub-sector extents, such as a hole at EOF 2333 * on an unaligned POSIX file) 2334 * - otherwise, assume the server is so old that we are safer avoiding 2335 * sub-sector requests 2336 */ 2337 if (!min) { 2338 min = (!QEMU_IS_ALIGNED(s->info.size, BDRV_SECTOR_SIZE) || 2339 s->info.base_allocation) ? 1 : BDRV_SECTOR_SIZE; 2340 } 2341 2342 bs->bl.request_alignment = min; 2343 bs->bl.max_pdiscard = QEMU_ALIGN_DOWN(INT_MAX, min); 2344 bs->bl.max_pwrite_zeroes = max; 2345 bs->bl.max_transfer = max; 2346 2347 if (s->info.opt_block && 2348 s->info.opt_block > bs->bl.opt_transfer) { 2349 bs->bl.opt_transfer = s->info.opt_block; 2350 } 2351 } 2352 2353 static void nbd_close(BlockDriverState *bs) 2354 { 2355 BDRVNBDState *s = bs->opaque; 2356 2357 nbd_client_close(bs); 2358 yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name)); 2359 nbd_clear_bdrvstate(s); 2360 } 2361 2362 /* 2363 * NBD cannot truncate, but if the caller asks to truncate to the same size, or 2364 * to a smaller size with exact=false, there is no reason to fail the 2365 * operation. 2366 * 2367 * Preallocation mode is ignored since it does not seems useful to fail when 2368 * we never change anything. 2369 */ 2370 static int coroutine_fn nbd_co_truncate(BlockDriverState *bs, int64_t offset, 2371 bool exact, PreallocMode prealloc, 2372 BdrvRequestFlags flags, Error **errp) 2373 { 2374 BDRVNBDState *s = bs->opaque; 2375 2376 if (offset != s->info.size && exact) { 2377 error_setg(errp, "Cannot resize NBD nodes"); 2378 return -ENOTSUP; 2379 } 2380 2381 if (offset > s->info.size) { 2382 error_setg(errp, "Cannot grow NBD nodes"); 2383 return -EINVAL; 2384 } 2385 2386 return 0; 2387 } 2388 2389 static int64_t nbd_getlength(BlockDriverState *bs) 2390 { 2391 BDRVNBDState *s = bs->opaque; 2392 2393 return s->info.size; 2394 } 2395 2396 static void nbd_refresh_filename(BlockDriverState *bs) 2397 { 2398 BDRVNBDState *s = bs->opaque; 2399 const char *host = NULL, *port = NULL, *path = NULL; 2400 size_t len = 0; 2401 2402 if (s->saddr->type == SOCKET_ADDRESS_TYPE_INET) { 2403 const InetSocketAddress *inet = &s->saddr->u.inet; 2404 if (!inet->has_ipv4 && !inet->has_ipv6 && !inet->has_to) { 2405 host = inet->host; 2406 port = inet->port; 2407 } 2408 } else if (s->saddr->type == SOCKET_ADDRESS_TYPE_UNIX) { 2409 path = s->saddr->u.q_unix.path; 2410 } /* else can't represent as pseudo-filename */ 2411 2412 if (path && s->export) { 2413 len = snprintf(bs->exact_filename, sizeof(bs->exact_filename), 2414 "nbd+unix:///%s?socket=%s", s->export, path); 2415 } else if (path && !s->export) { 2416 len = snprintf(bs->exact_filename, sizeof(bs->exact_filename), 2417 "nbd+unix://?socket=%s", path); 2418 } else if (host && s->export) { 2419 len = snprintf(bs->exact_filename, sizeof(bs->exact_filename), 2420 "nbd://%s:%s/%s", host, port, s->export); 2421 } else if (host && !s->export) { 2422 len = snprintf(bs->exact_filename, sizeof(bs->exact_filename), 2423 "nbd://%s:%s", host, port); 2424 } 2425 if (len >= sizeof(bs->exact_filename)) { 2426 /* Name is too long to represent exactly, so leave it empty. */ 2427 bs->exact_filename[0] = '\0'; 2428 } 2429 } 2430 2431 static char *nbd_dirname(BlockDriverState *bs, Error **errp) 2432 { 2433 /* The generic bdrv_dirname() implementation is able to work out some 2434 * directory name for NBD nodes, but that would be wrong. So far there is no 2435 * specification for how "export paths" would work, so NBD does not have 2436 * directory names. */ 2437 error_setg(errp, "Cannot generate a base directory for NBD nodes"); 2438 return NULL; 2439 } 2440 2441 static const char *const nbd_strong_runtime_opts[] = { 2442 "path", 2443 "host", 2444 "port", 2445 "export", 2446 "tls-creds", 2447 "server.", 2448 2449 NULL 2450 }; 2451 2452 static BlockDriver bdrv_nbd = { 2453 .format_name = "nbd", 2454 .protocol_name = "nbd", 2455 .instance_size = sizeof(BDRVNBDState), 2456 .bdrv_parse_filename = nbd_parse_filename, 2457 .bdrv_co_create_opts = bdrv_co_create_opts_simple, 2458 .create_opts = &bdrv_create_opts_simple, 2459 .bdrv_file_open = nbd_open, 2460 .bdrv_reopen_prepare = nbd_client_reopen_prepare, 2461 .bdrv_co_preadv = nbd_client_co_preadv, 2462 .bdrv_co_pwritev = nbd_client_co_pwritev, 2463 .bdrv_co_pwrite_zeroes = nbd_client_co_pwrite_zeroes, 2464 .bdrv_close = nbd_close, 2465 .bdrv_co_flush_to_os = nbd_co_flush, 2466 .bdrv_co_pdiscard = nbd_client_co_pdiscard, 2467 .bdrv_refresh_limits = nbd_refresh_limits, 2468 .bdrv_co_truncate = nbd_co_truncate, 2469 .bdrv_getlength = nbd_getlength, 2470 .bdrv_detach_aio_context = nbd_client_detach_aio_context, 2471 .bdrv_attach_aio_context = nbd_client_attach_aio_context, 2472 .bdrv_co_drain_begin = nbd_client_co_drain_begin, 2473 .bdrv_co_drain_end = nbd_client_co_drain_end, 2474 .bdrv_refresh_filename = nbd_refresh_filename, 2475 .bdrv_co_block_status = nbd_client_co_block_status, 2476 .bdrv_dirname = nbd_dirname, 2477 .strong_runtime_opts = nbd_strong_runtime_opts, 2478 }; 2479 2480 static BlockDriver bdrv_nbd_tcp = { 2481 .format_name = "nbd", 2482 .protocol_name = "nbd+tcp", 2483 .instance_size = sizeof(BDRVNBDState), 2484 .bdrv_parse_filename = nbd_parse_filename, 2485 .bdrv_co_create_opts = bdrv_co_create_opts_simple, 2486 .create_opts = &bdrv_create_opts_simple, 2487 .bdrv_file_open = nbd_open, 2488 .bdrv_reopen_prepare = nbd_client_reopen_prepare, 2489 .bdrv_co_preadv = nbd_client_co_preadv, 2490 .bdrv_co_pwritev = nbd_client_co_pwritev, 2491 .bdrv_co_pwrite_zeroes = nbd_client_co_pwrite_zeroes, 2492 .bdrv_close = nbd_close, 2493 .bdrv_co_flush_to_os = nbd_co_flush, 2494 .bdrv_co_pdiscard = nbd_client_co_pdiscard, 2495 .bdrv_refresh_limits = nbd_refresh_limits, 2496 .bdrv_co_truncate = nbd_co_truncate, 2497 .bdrv_getlength = nbd_getlength, 2498 .bdrv_detach_aio_context = nbd_client_detach_aio_context, 2499 .bdrv_attach_aio_context = nbd_client_attach_aio_context, 2500 .bdrv_co_drain_begin = nbd_client_co_drain_begin, 2501 .bdrv_co_drain_end = nbd_client_co_drain_end, 2502 .bdrv_refresh_filename = nbd_refresh_filename, 2503 .bdrv_co_block_status = nbd_client_co_block_status, 2504 .bdrv_dirname = nbd_dirname, 2505 .strong_runtime_opts = nbd_strong_runtime_opts, 2506 }; 2507 2508 static BlockDriver bdrv_nbd_unix = { 2509 .format_name = "nbd", 2510 .protocol_name = "nbd+unix", 2511 .instance_size = sizeof(BDRVNBDState), 2512 .bdrv_parse_filename = nbd_parse_filename, 2513 .bdrv_co_create_opts = bdrv_co_create_opts_simple, 2514 .create_opts = &bdrv_create_opts_simple, 2515 .bdrv_file_open = nbd_open, 2516 .bdrv_reopen_prepare = nbd_client_reopen_prepare, 2517 .bdrv_co_preadv = nbd_client_co_preadv, 2518 .bdrv_co_pwritev = nbd_client_co_pwritev, 2519 .bdrv_co_pwrite_zeroes = nbd_client_co_pwrite_zeroes, 2520 .bdrv_close = nbd_close, 2521 .bdrv_co_flush_to_os = nbd_co_flush, 2522 .bdrv_co_pdiscard = nbd_client_co_pdiscard, 2523 .bdrv_refresh_limits = nbd_refresh_limits, 2524 .bdrv_co_truncate = nbd_co_truncate, 2525 .bdrv_getlength = nbd_getlength, 2526 .bdrv_detach_aio_context = nbd_client_detach_aio_context, 2527 .bdrv_attach_aio_context = nbd_client_attach_aio_context, 2528 .bdrv_co_drain_begin = nbd_client_co_drain_begin, 2529 .bdrv_co_drain_end = nbd_client_co_drain_end, 2530 .bdrv_refresh_filename = nbd_refresh_filename, 2531 .bdrv_co_block_status = nbd_client_co_block_status, 2532 .bdrv_dirname = nbd_dirname, 2533 .strong_runtime_opts = nbd_strong_runtime_opts, 2534 }; 2535 2536 static void bdrv_nbd_init(void) 2537 { 2538 bdrv_register(&bdrv_nbd); 2539 bdrv_register(&bdrv_nbd_tcp); 2540 bdrv_register(&bdrv_nbd_unix); 2541 } 2542 2543 block_init(bdrv_nbd_init); 2544