xref: /openbmc/qemu/block/nbd.c (revision 2df1eb27)
1 /*
2  * QEMU Block driver for NBD
3  *
4  * Copyright (c) 2019 Virtuozzo International GmbH.
5  * Copyright Red Hat
6  * Copyright (C) 2008 Bull S.A.S.
7  *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
8  *
9  * Some parts:
10  *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
11  *
12  * Permission is hereby granted, free of charge, to any person obtaining a copy
13  * of this software and associated documentation files (the "Software"), to deal
14  * in the Software without restriction, including without limitation the rights
15  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16  * copies of the Software, and to permit persons to whom the Software is
17  * furnished to do so, subject to the following conditions:
18  *
19  * The above copyright notice and this permission notice shall be included in
20  * all copies or substantial portions of the Software.
21  *
22  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
25  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
28  * THE SOFTWARE.
29  */
30 
31 #include "qemu/osdep.h"
32 
33 #include "trace.h"
34 #include "qemu/uri.h"
35 #include "qemu/option.h"
36 #include "qemu/cutils.h"
37 #include "qemu/main-loop.h"
38 
39 #include "qapi/qapi-visit-sockets.h"
40 #include "qapi/qmp/qstring.h"
41 #include "qapi/clone-visitor.h"
42 
43 #include "block/qdict.h"
44 #include "block/nbd.h"
45 #include "block/block_int.h"
46 #include "block/coroutines.h"
47 
48 #include "qemu/yank.h"
49 
50 #define EN_OPTSTR ":exportname="
51 #define MAX_NBD_REQUESTS    16
52 
53 #define COOKIE_TO_INDEX(cookie) ((cookie) - 1)
54 #define INDEX_TO_COOKIE(index)  ((index) + 1)
55 
56 typedef struct {
57     Coroutine *coroutine;
58     uint64_t offset;        /* original offset of the request */
59     bool receiving;         /* sleeping in the yield in nbd_receive_replies */
60 } NBDClientRequest;
61 
62 typedef enum NBDClientState {
63     NBD_CLIENT_CONNECTING_WAIT,
64     NBD_CLIENT_CONNECTING_NOWAIT,
65     NBD_CLIENT_CONNECTED,
66     NBD_CLIENT_QUIT
67 } NBDClientState;
68 
69 typedef struct BDRVNBDState {
70     QIOChannel *ioc; /* The current I/O channel */
71     NBDExportInfo info;
72 
73     /*
74      * Protects state, free_sema, in_flight, requests[].coroutine,
75      * reconnect_delay_timer.
76      */
77     QemuMutex requests_lock;
78     NBDClientState state;
79     CoQueue free_sema;
80     unsigned in_flight;
81     NBDClientRequest requests[MAX_NBD_REQUESTS];
82     QEMUTimer *reconnect_delay_timer;
83 
84     /* Protects sending data on the socket.  */
85     CoMutex send_mutex;
86 
87     /*
88      * Protects receiving reply headers from the socket, as well as the
89      * fields reply and requests[].receiving
90      */
91     CoMutex receive_mutex;
92     NBDReply reply;
93 
94     QEMUTimer *open_timer;
95 
96     BlockDriverState *bs;
97 
98     /* Connection parameters */
99     uint32_t reconnect_delay;
100     uint32_t open_timeout;
101     SocketAddress *saddr;
102     char *export;
103     char *tlscredsid;
104     QCryptoTLSCreds *tlscreds;
105     char *tlshostname;
106     char *x_dirty_bitmap;
107     bool alloc_depth;
108 
109     NBDClientConnection *conn;
110 } BDRVNBDState;
111 
112 static void nbd_yank(void *opaque);
113 
114 static void nbd_clear_bdrvstate(BlockDriverState *bs)
115 {
116     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
117 
118     nbd_client_connection_release(s->conn);
119     s->conn = NULL;
120 
121     yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
122 
123     /* Must not leave timers behind that would access freed data */
124     assert(!s->reconnect_delay_timer);
125     assert(!s->open_timer);
126 
127     object_unref(OBJECT(s->tlscreds));
128     qapi_free_SocketAddress(s->saddr);
129     s->saddr = NULL;
130     g_free(s->export);
131     s->export = NULL;
132     g_free(s->tlscredsid);
133     s->tlscredsid = NULL;
134     g_free(s->tlshostname);
135     s->tlshostname = NULL;
136     g_free(s->x_dirty_bitmap);
137     s->x_dirty_bitmap = NULL;
138 }
139 
140 /* Called with s->receive_mutex taken.  */
141 static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
142 {
143     if (req->receiving) {
144         req->receiving = false;
145         aio_co_wake(req->coroutine);
146         return true;
147     }
148 
149     return false;
150 }
151 
152 static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
153 {
154     int i;
155 
156     QEMU_LOCK_GUARD(&s->receive_mutex);
157     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
158         if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
159             return;
160         }
161     }
162 }
163 
164 /* Called with s->requests_lock held.  */
165 static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
166 {
167     if (s->state == NBD_CLIENT_CONNECTED) {
168         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
169     }
170 
171     if (ret == -EIO) {
172         if (s->state == NBD_CLIENT_CONNECTED) {
173             s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
174                                             NBD_CLIENT_CONNECTING_NOWAIT;
175         }
176     } else {
177         s->state = NBD_CLIENT_QUIT;
178     }
179 }
180 
181 static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
182 {
183     QEMU_LOCK_GUARD(&s->requests_lock);
184     nbd_channel_error_locked(s, ret);
185 }
186 
187 static void reconnect_delay_timer_del(BDRVNBDState *s)
188 {
189     if (s->reconnect_delay_timer) {
190         timer_free(s->reconnect_delay_timer);
191         s->reconnect_delay_timer = NULL;
192     }
193 }
194 
195 static void reconnect_delay_timer_cb(void *opaque)
196 {
197     BDRVNBDState *s = opaque;
198 
199     reconnect_delay_timer_del(s);
200     WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
201         if (s->state != NBD_CLIENT_CONNECTING_WAIT) {
202             return;
203         }
204         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
205     }
206     nbd_co_establish_connection_cancel(s->conn);
207 }
208 
209 static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
210 {
211     assert(!s->reconnect_delay_timer);
212     s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
213                                              QEMU_CLOCK_REALTIME,
214                                              SCALE_NS,
215                                              reconnect_delay_timer_cb, s);
216     timer_mod(s->reconnect_delay_timer, expire_time_ns);
217 }
218 
219 static void nbd_teardown_connection(BlockDriverState *bs)
220 {
221     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
222 
223     assert(!s->in_flight);
224 
225     if (s->ioc) {
226         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
227         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
228                                  nbd_yank, s->bs);
229         object_unref(OBJECT(s->ioc));
230         s->ioc = NULL;
231     }
232 
233     WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
234         s->state = NBD_CLIENT_QUIT;
235     }
236 }
237 
238 static void open_timer_del(BDRVNBDState *s)
239 {
240     if (s->open_timer) {
241         timer_free(s->open_timer);
242         s->open_timer = NULL;
243     }
244 }
245 
246 static void open_timer_cb(void *opaque)
247 {
248     BDRVNBDState *s = opaque;
249 
250     nbd_co_establish_connection_cancel(s->conn);
251     open_timer_del(s);
252 }
253 
254 static void open_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
255 {
256     assert(!s->open_timer);
257     s->open_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
258                                   QEMU_CLOCK_REALTIME,
259                                   SCALE_NS,
260                                   open_timer_cb, s);
261     timer_mod(s->open_timer, expire_time_ns);
262 }
263 
264 static bool nbd_client_will_reconnect(BDRVNBDState *s)
265 {
266     /*
267      * Called only after a socket error, so this is not performance sensitive.
268      */
269     QEMU_LOCK_GUARD(&s->requests_lock);
270     return s->state == NBD_CLIENT_CONNECTING_WAIT;
271 }
272 
273 /*
274  * Update @bs with information learned during a completed negotiation process.
275  * Return failure if the server's advertised options are incompatible with the
276  * client's needs.
277  */
278 static int coroutine_fn GRAPH_RDLOCK
279 nbd_handle_updated_info(BlockDriverState *bs, Error **errp)
280 {
281     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
282     int ret;
283 
284     if (s->x_dirty_bitmap) {
285         if (!s->info.base_allocation) {
286             error_setg(errp, "requested x-dirty-bitmap %s not found",
287                        s->x_dirty_bitmap);
288             return -EINVAL;
289         }
290         if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) {
291             s->alloc_depth = true;
292         }
293     }
294 
295     if (s->info.flags & NBD_FLAG_READ_ONLY) {
296         ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
297         if (ret < 0) {
298             return ret;
299         }
300     }
301 
302     if (s->info.flags & NBD_FLAG_SEND_FUA) {
303         bs->supported_write_flags = BDRV_REQ_FUA;
304         bs->supported_zero_flags |= BDRV_REQ_FUA;
305     }
306 
307     if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
308         bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
309         if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
310             bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
311         }
312     }
313 
314     trace_nbd_client_handshake_success(s->export);
315 
316     return 0;
317 }
318 
319 int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
320                                                 bool blocking, Error **errp)
321 {
322     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
323     int ret;
324     IO_CODE();
325 
326     assert_bdrv_graph_readable();
327     assert(!s->ioc);
328 
329     s->ioc = nbd_co_establish_connection(s->conn, &s->info, blocking, errp);
330     if (!s->ioc) {
331         return -ECONNREFUSED;
332     }
333 
334     yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank,
335                            bs);
336 
337     ret = nbd_handle_updated_info(s->bs, NULL);
338     if (ret < 0) {
339         /*
340          * We have connected, but must fail for other reasons.
341          * Send NBD_CMD_DISC as a courtesy to the server.
342          */
343         NBDRequest request = { .type = NBD_CMD_DISC, .mode = s->info.mode };
344 
345         nbd_send_request(s->ioc, &request);
346 
347         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
348                                  nbd_yank, bs);
349         object_unref(OBJECT(s->ioc));
350         s->ioc = NULL;
351 
352         return ret;
353     }
354 
355     qio_channel_set_blocking(s->ioc, false, NULL);
356     qio_channel_set_follow_coroutine_ctx(s->ioc, true);
357 
358     /* successfully connected */
359     WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
360         s->state = NBD_CLIENT_CONNECTED;
361     }
362 
363     return 0;
364 }
365 
366 /* Called with s->requests_lock held.  */
367 static bool nbd_client_connecting(BDRVNBDState *s)
368 {
369     return s->state == NBD_CLIENT_CONNECTING_WAIT ||
370         s->state == NBD_CLIENT_CONNECTING_NOWAIT;
371 }
372 
373 /* Called with s->requests_lock taken.  */
374 static void coroutine_fn GRAPH_RDLOCK nbd_reconnect_attempt(BDRVNBDState *s)
375 {
376     int ret;
377     bool blocking = s->state == NBD_CLIENT_CONNECTING_WAIT;
378 
379     /*
380      * Now we are sure that nobody is accessing the channel, and no one will
381      * try until we set the state to CONNECTED.
382      */
383     assert(nbd_client_connecting(s));
384     assert(s->in_flight == 1);
385 
386     trace_nbd_reconnect_attempt(s->bs->in_flight);
387 
388     if (blocking && !s->reconnect_delay_timer) {
389         /*
390          * It's the first reconnect attempt after switching to
391          * NBD_CLIENT_CONNECTING_WAIT
392          */
393         g_assert(s->reconnect_delay);
394         reconnect_delay_timer_init(s,
395             qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
396             s->reconnect_delay * NANOSECONDS_PER_SECOND);
397     }
398 
399     /* Finalize previous connection if any */
400     if (s->ioc) {
401         yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
402                                  nbd_yank, s->bs);
403         object_unref(OBJECT(s->ioc));
404         s->ioc = NULL;
405     }
406 
407     qemu_mutex_unlock(&s->requests_lock);
408     ret = nbd_co_do_establish_connection(s->bs, blocking, NULL);
409     trace_nbd_reconnect_attempt_result(ret, s->bs->in_flight);
410     qemu_mutex_lock(&s->requests_lock);
411 
412     /*
413      * The reconnect attempt is done (maybe successfully, maybe not), so
414      * we no longer need this timer.  Delete it so it will not outlive
415      * this I/O request (so draining removes all timers).
416      */
417     reconnect_delay_timer_del(s);
418 }
419 
420 static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t cookie,
421                                             Error **errp)
422 {
423     int ret;
424     uint64_t ind = COOKIE_TO_INDEX(cookie), ind2;
425     QEMU_LOCK_GUARD(&s->receive_mutex);
426 
427     while (true) {
428         if (s->reply.cookie == cookie) {
429             /* We are done */
430             return 0;
431         }
432 
433         if (s->reply.cookie != 0) {
434             /*
435              * Some other request is being handled now. It should already be
436              * woken by whoever set s->reply.cookie (or never wait in this
437              * yield). So, we should not wake it here.
438              */
439             ind2 = COOKIE_TO_INDEX(s->reply.cookie);
440             assert(!s->requests[ind2].receiving);
441 
442             s->requests[ind].receiving = true;
443             qemu_co_mutex_unlock(&s->receive_mutex);
444 
445             qemu_coroutine_yield();
446             /*
447              * We may be woken for 2 reasons:
448              * 1. From this function, executing in parallel coroutine, when our
449              *    cookie is received.
450              * 2. From nbd_co_receive_one_chunk(), when previous request is
451              *    finished and s->reply.cookie set to 0.
452              * Anyway, it's OK to lock the mutex and go to the next iteration.
453              */
454 
455             qemu_co_mutex_lock(&s->receive_mutex);
456             assert(!s->requests[ind].receiving);
457             continue;
458         }
459 
460         /* We are under mutex and cookie is 0. We have to do the dirty work. */
461         assert(s->reply.cookie == 0);
462         ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, s->info.mode, errp);
463         if (ret == 0) {
464             ret = -EIO;
465             error_setg(errp, "server dropped connection");
466         }
467         if (ret < 0) {
468             nbd_channel_error(s, ret);
469             return ret;
470         }
471         if (nbd_reply_is_structured(&s->reply) &&
472             s->info.mode < NBD_MODE_STRUCTURED) {
473             nbd_channel_error(s, -EINVAL);
474             error_setg(errp, "unexpected structured reply");
475             return -EINVAL;
476         }
477         ind2 = COOKIE_TO_INDEX(s->reply.cookie);
478         if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].coroutine) {
479             nbd_channel_error(s, -EINVAL);
480             error_setg(errp, "unexpected cookie value");
481             return -EINVAL;
482         }
483         if (s->reply.cookie == cookie) {
484             /* We are done */
485             return 0;
486         }
487         nbd_recv_coroutine_wake_one(&s->requests[ind2]);
488     }
489 }
490 
491 static int coroutine_fn GRAPH_RDLOCK
492 nbd_co_send_request(BlockDriverState *bs, NBDRequest *request,
493                     QEMUIOVector *qiov)
494 {
495     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
496     int rc, i = -1;
497 
498     qemu_mutex_lock(&s->requests_lock);
499     while (s->in_flight == MAX_NBD_REQUESTS ||
500            (s->state != NBD_CLIENT_CONNECTED && s->in_flight > 0)) {
501         qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
502     }
503 
504     s->in_flight++;
505     if (s->state != NBD_CLIENT_CONNECTED) {
506         if (nbd_client_connecting(s)) {
507             nbd_reconnect_attempt(s);
508             qemu_co_queue_restart_all(&s->free_sema);
509         }
510         if (s->state != NBD_CLIENT_CONNECTED) {
511             rc = -EIO;
512             goto err;
513         }
514     }
515 
516     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
517         if (s->requests[i].coroutine == NULL) {
518             break;
519         }
520     }
521 
522     assert(i < MAX_NBD_REQUESTS);
523     s->requests[i].coroutine = qemu_coroutine_self();
524     s->requests[i].offset = request->from;
525     s->requests[i].receiving = false;
526     qemu_mutex_unlock(&s->requests_lock);
527 
528     qemu_co_mutex_lock(&s->send_mutex);
529     request->cookie = INDEX_TO_COOKIE(i);
530     request->mode = s->info.mode;
531 
532     assert(s->ioc);
533 
534     if (qiov) {
535         qio_channel_set_cork(s->ioc, true);
536         rc = nbd_send_request(s->ioc, request);
537         if (rc >= 0 && qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
538                                               NULL) < 0) {
539             rc = -EIO;
540         }
541         qio_channel_set_cork(s->ioc, false);
542     } else {
543         rc = nbd_send_request(s->ioc, request);
544     }
545     qemu_co_mutex_unlock(&s->send_mutex);
546 
547     if (rc < 0) {
548         qemu_mutex_lock(&s->requests_lock);
549 err:
550         nbd_channel_error_locked(s, rc);
551         if (i != -1) {
552             s->requests[i].coroutine = NULL;
553         }
554         s->in_flight--;
555         qemu_co_queue_next(&s->free_sema);
556         qemu_mutex_unlock(&s->requests_lock);
557     }
558     return rc;
559 }
560 
561 static inline uint16_t payload_advance16(uint8_t **payload)
562 {
563     *payload += 2;
564     return lduw_be_p(*payload - 2);
565 }
566 
567 static inline uint32_t payload_advance32(uint8_t **payload)
568 {
569     *payload += 4;
570     return ldl_be_p(*payload - 4);
571 }
572 
573 static inline uint64_t payload_advance64(uint8_t **payload)
574 {
575     *payload += 8;
576     return ldq_be_p(*payload - 8);
577 }
578 
579 static int nbd_parse_offset_hole_payload(BDRVNBDState *s,
580                                          NBDStructuredReplyChunk *chunk,
581                                          uint8_t *payload, uint64_t orig_offset,
582                                          QEMUIOVector *qiov, Error **errp)
583 {
584     uint64_t offset;
585     uint32_t hole_size;
586 
587     if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
588         error_setg(errp, "Protocol error: invalid payload for "
589                          "NBD_REPLY_TYPE_OFFSET_HOLE");
590         return -EINVAL;
591     }
592 
593     offset = payload_advance64(&payload);
594     hole_size = payload_advance32(&payload);
595 
596     if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
597         offset > orig_offset + qiov->size - hole_size) {
598         error_setg(errp, "Protocol error: server sent chunk exceeding requested"
599                          " region");
600         return -EINVAL;
601     }
602     if (s->info.min_block &&
603         !QEMU_IS_ALIGNED(hole_size, s->info.min_block)) {
604         trace_nbd_structured_read_compliance("hole");
605     }
606 
607     qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
608 
609     return 0;
610 }
611 
612 /*
613  * nbd_parse_blockstatus_payload
614  * Based on our request, we expect only one extent in reply, for the
615  * base:allocation context.
616  */
617 static int nbd_parse_blockstatus_payload(BDRVNBDState *s,
618                                          NBDStructuredReplyChunk *chunk,
619                                          uint8_t *payload, bool wide,
620                                          uint64_t orig_length,
621                                          NBDExtent64 *extent, Error **errp)
622 {
623     uint32_t context_id;
624     uint32_t count;
625     size_t ext_len = wide ? sizeof(*extent) : sizeof(NBDExtent32);
626     size_t pay_len = sizeof(context_id) + wide * sizeof(count) + ext_len;
627 
628     /* The server succeeded, so it must have sent [at least] one extent */
629     if (chunk->length < pay_len) {
630         error_setg(errp, "Protocol error: invalid payload for "
631                          "NBD_REPLY_TYPE_BLOCK_STATUS");
632         return -EINVAL;
633     }
634 
635     context_id = payload_advance32(&payload);
636     if (s->info.context_id != context_id) {
637         error_setg(errp, "Protocol error: unexpected context id %d for "
638                          "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
639                          "id is %d", context_id,
640                          s->info.context_id);
641         return -EINVAL;
642     }
643 
644     if (wide) {
645         count = payload_advance32(&payload);
646         extent->length = payload_advance64(&payload);
647         extent->flags = payload_advance64(&payload);
648     } else {
649         count = 0;
650         extent->length = payload_advance32(&payload);
651         extent->flags = payload_advance32(&payload);
652     }
653 
654     if (extent->length == 0) {
655         error_setg(errp, "Protocol error: server sent status chunk with "
656                    "zero length");
657         return -EINVAL;
658     }
659 
660     /*
661      * A server sending unaligned block status is in violation of the
662      * protocol, but as qemu-nbd 3.1 is such a server (at least for
663      * POSIX files that are not a multiple of 512 bytes, since qemu
664      * rounds files up to 512-byte multiples but lseek(SEEK_HOLE)
665      * still sees an implicit hole beyond the real EOF), it's nicer to
666      * work around the misbehaving server. If the request included
667      * more than the final unaligned block, truncate it back to an
668      * aligned result; if the request was only the final block, round
669      * up to the full block and change the status to fully-allocated
670      * (always a safe status, even if it loses information).
671      */
672     if (s->info.min_block && !QEMU_IS_ALIGNED(extent->length,
673                                               s->info.min_block)) {
674         trace_nbd_parse_blockstatus_compliance("extent length is unaligned");
675         if (extent->length > s->info.min_block) {
676             extent->length = QEMU_ALIGN_DOWN(extent->length,
677                                              s->info.min_block);
678         } else {
679             extent->length = s->info.min_block;
680             extent->flags = 0;
681         }
682     }
683 
684     /*
685      * We used NBD_CMD_FLAG_REQ_ONE, so the server should not have
686      * sent us any more than one extent, nor should it have included
687      * status beyond our request in that extent. Furthermore, a wide
688      * server should have replied with an accurate count (we left
689      * count at 0 for a narrow server).  However, it's easy enough to
690      * ignore the server's noncompliance without killing the
691      * connection; just ignore trailing extents, and clamp things to
692      * the length of our request.
693      */
694     if (count != wide || chunk->length > pay_len) {
695         trace_nbd_parse_blockstatus_compliance("unexpected extent count");
696     }
697     if (extent->length > orig_length) {
698         extent->length = orig_length;
699         trace_nbd_parse_blockstatus_compliance("extent length too large");
700     }
701 
702     /*
703      * HACK: if we are using x-dirty-bitmaps to access
704      * qemu:allocation-depth, treat all depths > 2 the same as 2,
705      * since nbd_client_co_block_status is only expecting the low two
706      * bits to be set.
707      */
708     if (s->alloc_depth && extent->flags > 2) {
709         extent->flags = 2;
710     }
711 
712     return 0;
713 }
714 
715 /*
716  * nbd_parse_error_payload
717  * on success @errp contains message describing nbd error reply
718  */
719 static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
720                                    uint8_t *payload, int *request_ret,
721                                    Error **errp)
722 {
723     uint32_t error;
724     uint16_t message_size;
725 
726     assert(chunk->type & (1 << 15));
727 
728     if (chunk->length < sizeof(error) + sizeof(message_size)) {
729         error_setg(errp,
730                    "Protocol error: invalid payload for structured error");
731         return -EINVAL;
732     }
733 
734     error = nbd_errno_to_system_errno(payload_advance32(&payload));
735     if (error == 0) {
736         error_setg(errp, "Protocol error: server sent structured error chunk "
737                          "with error = 0");
738         return -EINVAL;
739     }
740 
741     *request_ret = -error;
742     message_size = payload_advance16(&payload);
743 
744     if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
745         error_setg(errp, "Protocol error: server sent structured error chunk "
746                          "with incorrect message size");
747         return -EINVAL;
748     }
749 
750     /* TODO: Add a trace point to mention the server complaint */
751 
752     /* TODO handle ERROR_OFFSET */
753 
754     return 0;
755 }
756 
757 static int coroutine_fn
758 nbd_co_receive_offset_data_payload(BDRVNBDState *s, uint64_t orig_offset,
759                                    QEMUIOVector *qiov, Error **errp)
760 {
761     QEMUIOVector sub_qiov;
762     uint64_t offset;
763     size_t data_size;
764     int ret;
765     NBDStructuredReplyChunk *chunk = &s->reply.structured;
766 
767     assert(nbd_reply_is_structured(&s->reply));
768 
769     /* The NBD spec requires at least one byte of payload */
770     if (chunk->length <= sizeof(offset)) {
771         error_setg(errp, "Protocol error: invalid payload for "
772                          "NBD_REPLY_TYPE_OFFSET_DATA");
773         return -EINVAL;
774     }
775 
776     if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
777         return -EIO;
778     }
779 
780     data_size = chunk->length - sizeof(offset);
781     assert(data_size);
782     if (offset < orig_offset || data_size > qiov->size ||
783         offset > orig_offset + qiov->size - data_size) {
784         error_setg(errp, "Protocol error: server sent chunk exceeding requested"
785                          " region");
786         return -EINVAL;
787     }
788     if (s->info.min_block && !QEMU_IS_ALIGNED(data_size, s->info.min_block)) {
789         trace_nbd_structured_read_compliance("data");
790     }
791 
792     qemu_iovec_init(&sub_qiov, qiov->niov);
793     qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
794     ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
795     qemu_iovec_destroy(&sub_qiov);
796 
797     return ret < 0 ? -EIO : 0;
798 }
799 
800 #define NBD_MAX_MALLOC_PAYLOAD 1000
801 static coroutine_fn int nbd_co_receive_structured_payload(
802         BDRVNBDState *s, void **payload, Error **errp)
803 {
804     int ret;
805     uint32_t len;
806 
807     assert(nbd_reply_is_structured(&s->reply));
808 
809     len = s->reply.structured.length;
810 
811     if (len == 0) {
812         return 0;
813     }
814 
815     if (payload == NULL) {
816         error_setg(errp, "Unexpected structured payload");
817         return -EINVAL;
818     }
819 
820     if (len > NBD_MAX_MALLOC_PAYLOAD) {
821         error_setg(errp, "Payload too large");
822         return -EINVAL;
823     }
824 
825     *payload = g_new(char, len);
826     ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
827     if (ret < 0) {
828         g_free(*payload);
829         *payload = NULL;
830         return ret;
831     }
832 
833     return 0;
834 }
835 
836 /*
837  * nbd_co_do_receive_one_chunk
838  * for simple reply:
839  *   set request_ret to received reply error
840  *   if qiov is not NULL: read payload to @qiov
841  * for structured reply chunk:
842  *   if error chunk: read payload, set @request_ret, do not set @payload
843  *   else if offset_data chunk: read payload data to @qiov, do not set @payload
844  *   else: read payload to @payload
845  *
846  * If function fails, @errp contains corresponding error message, and the
847  * connection with the server is suspect.  If it returns 0, then the
848  * transaction succeeded (although @request_ret may be a negative errno
849  * corresponding to the server's error reply), and errp is unchanged.
850  */
851 static coroutine_fn int nbd_co_do_receive_one_chunk(
852         BDRVNBDState *s, uint64_t cookie, bool only_structured,
853         int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
854 {
855     int ret;
856     int i = COOKIE_TO_INDEX(cookie);
857     void *local_payload = NULL;
858     NBDStructuredReplyChunk *chunk;
859 
860     if (payload) {
861         *payload = NULL;
862     }
863     *request_ret = 0;
864 
865     ret = nbd_receive_replies(s, cookie, errp);
866     if (ret < 0) {
867         error_prepend(errp, "Connection closed: ");
868         return -EIO;
869     }
870     assert(s->ioc);
871 
872     assert(s->reply.cookie == cookie);
873 
874     if (nbd_reply_is_simple(&s->reply)) {
875         if (only_structured) {
876             error_setg(errp, "Protocol error: simple reply when structured "
877                              "reply chunk was expected");
878             return -EINVAL;
879         }
880 
881         *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
882         if (*request_ret < 0 || !qiov) {
883             return 0;
884         }
885 
886         return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
887                                      errp) < 0 ? -EIO : 0;
888     }
889 
890     /* handle structured reply chunk */
891     assert(s->info.mode >= NBD_MODE_STRUCTURED);
892     chunk = &s->reply.structured;
893 
894     if (chunk->type == NBD_REPLY_TYPE_NONE) {
895         if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
896             error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
897                        " NBD_REPLY_FLAG_DONE flag set");
898             return -EINVAL;
899         }
900         if (chunk->length) {
901             error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
902                        " nonzero length");
903             return -EINVAL;
904         }
905         return 0;
906     }
907 
908     if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
909         if (!qiov) {
910             error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
911             return -EINVAL;
912         }
913 
914         return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
915                                                   qiov, errp);
916     }
917 
918     if (nbd_reply_type_is_error(chunk->type)) {
919         payload = &local_payload;
920     }
921 
922     ret = nbd_co_receive_structured_payload(s, payload, errp);
923     if (ret < 0) {
924         return ret;
925     }
926 
927     if (nbd_reply_type_is_error(chunk->type)) {
928         ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
929         g_free(local_payload);
930         return ret;
931     }
932 
933     return 0;
934 }
935 
936 /*
937  * nbd_co_receive_one_chunk
938  * Read reply, wake up connection_co and set s->quit if needed.
939  * Return value is a fatal error code or normal nbd reply error code
940  */
941 static coroutine_fn int nbd_co_receive_one_chunk(
942         BDRVNBDState *s, uint64_t cookie, bool only_structured,
943         int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
944         Error **errp)
945 {
946     int ret = nbd_co_do_receive_one_chunk(s, cookie, only_structured,
947                                           request_ret, qiov, payload, errp);
948 
949     if (ret < 0) {
950         memset(reply, 0, sizeof(*reply));
951         nbd_channel_error(s, ret);
952     } else {
953         /* For assert at loop start in nbd_connection_entry */
954         *reply = s->reply;
955     }
956     s->reply.cookie = 0;
957 
958     nbd_recv_coroutines_wake(s);
959 
960     return ret;
961 }
962 
963 typedef struct NBDReplyChunkIter {
964     int ret;
965     int request_ret;
966     Error *err;
967     bool done, only_structured;
968 } NBDReplyChunkIter;
969 
970 static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
971                                    int ret, Error **local_err)
972 {
973     assert(local_err && *local_err);
974     assert(ret < 0);
975 
976     if (!iter->ret) {
977         iter->ret = ret;
978         error_propagate(&iter->err, *local_err);
979     } else {
980         error_free(*local_err);
981     }
982 
983     *local_err = NULL;
984 }
985 
986 static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
987 {
988     assert(ret < 0);
989 
990     if (!iter->request_ret) {
991         iter->request_ret = ret;
992     }
993 }
994 
995 /*
996  * NBD_FOREACH_REPLY_CHUNK
997  * The pointer stored in @payload requires g_free() to free it.
998  */
999 #define NBD_FOREACH_REPLY_CHUNK(s, iter, cookie, structured, \
1000                                 qiov, reply, payload) \
1001     for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
1002          nbd_reply_chunk_iter_receive(s, &iter, cookie, qiov, reply, payload);)
1003 
1004 /*
1005  * nbd_reply_chunk_iter_receive
1006  * The pointer stored in @payload requires g_free() to free it.
1007  */
1008 static bool coroutine_fn nbd_reply_chunk_iter_receive(BDRVNBDState *s,
1009                                                       NBDReplyChunkIter *iter,
1010                                                       uint64_t cookie,
1011                                                       QEMUIOVector *qiov,
1012                                                       NBDReply *reply,
1013                                                       void **payload)
1014 {
1015     int ret, request_ret;
1016     NBDReply local_reply;
1017     NBDStructuredReplyChunk *chunk;
1018     Error *local_err = NULL;
1019 
1020     if (iter->done) {
1021         /* Previous iteration was last. */
1022         goto break_loop;
1023     }
1024 
1025     if (reply == NULL) {
1026         reply = &local_reply;
1027     }
1028 
1029     ret = nbd_co_receive_one_chunk(s, cookie, iter->only_structured,
1030                                    &request_ret, qiov, reply, payload,
1031                                    &local_err);
1032     if (ret < 0) {
1033         nbd_iter_channel_error(iter, ret, &local_err);
1034     } else if (request_ret < 0) {
1035         nbd_iter_request_error(iter, request_ret);
1036     }
1037 
1038     /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
1039     if (nbd_reply_is_simple(reply) || iter->ret < 0) {
1040         goto break_loop;
1041     }
1042 
1043     chunk = &reply->structured;
1044     iter->only_structured = true;
1045 
1046     if (chunk->type == NBD_REPLY_TYPE_NONE) {
1047         /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
1048         assert(chunk->flags & NBD_REPLY_FLAG_DONE);
1049         goto break_loop;
1050     }
1051 
1052     if (chunk->flags & NBD_REPLY_FLAG_DONE) {
1053         /* This iteration is last. */
1054         iter->done = true;
1055     }
1056 
1057     /* Execute the loop body */
1058     return true;
1059 
1060 break_loop:
1061     qemu_mutex_lock(&s->requests_lock);
1062     s->requests[COOKIE_TO_INDEX(cookie)].coroutine = NULL;
1063     s->in_flight--;
1064     qemu_co_queue_next(&s->free_sema);
1065     qemu_mutex_unlock(&s->requests_lock);
1066 
1067     return false;
1068 }
1069 
1070 static int coroutine_fn
1071 nbd_co_receive_return_code(BDRVNBDState *s, uint64_t cookie,
1072                            int *request_ret, Error **errp)
1073 {
1074     NBDReplyChunkIter iter;
1075 
1076     NBD_FOREACH_REPLY_CHUNK(s, iter, cookie, false, NULL, NULL, NULL) {
1077         /* nbd_reply_chunk_iter_receive does all the work */
1078     }
1079 
1080     error_propagate(errp, iter.err);
1081     *request_ret = iter.request_ret;
1082     return iter.ret;
1083 }
1084 
1085 static int coroutine_fn
1086 nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t cookie,
1087                              uint64_t offset, QEMUIOVector *qiov,
1088                              int *request_ret, Error **errp)
1089 {
1090     NBDReplyChunkIter iter;
1091     NBDReply reply;
1092     void *payload = NULL;
1093     Error *local_err = NULL;
1094 
1095     NBD_FOREACH_REPLY_CHUNK(s, iter, cookie,
1096                             s->info.mode >= NBD_MODE_STRUCTURED,
1097                             qiov, &reply, &payload)
1098     {
1099         int ret;
1100         NBDStructuredReplyChunk *chunk = &reply.structured;
1101 
1102         assert(nbd_reply_is_structured(&reply));
1103 
1104         switch (chunk->type) {
1105         case NBD_REPLY_TYPE_OFFSET_DATA:
1106             /*
1107              * special cased in nbd_co_receive_one_chunk, data is already
1108              * in qiov
1109              */
1110             break;
1111         case NBD_REPLY_TYPE_OFFSET_HOLE:
1112             ret = nbd_parse_offset_hole_payload(s, &reply.structured, payload,
1113                                                 offset, qiov, &local_err);
1114             if (ret < 0) {
1115                 nbd_channel_error(s, ret);
1116                 nbd_iter_channel_error(&iter, ret, &local_err);
1117             }
1118             break;
1119         default:
1120             if (!nbd_reply_type_is_error(chunk->type)) {
1121                 /* not allowed reply type */
1122                 nbd_channel_error(s, -EINVAL);
1123                 error_setg(&local_err,
1124                            "Unexpected reply type: %d (%s) for CMD_READ",
1125                            chunk->type, nbd_reply_type_lookup(chunk->type));
1126                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1127             }
1128         }
1129 
1130         g_free(payload);
1131         payload = NULL;
1132     }
1133 
1134     error_propagate(errp, iter.err);
1135     *request_ret = iter.request_ret;
1136     return iter.ret;
1137 }
1138 
1139 static int coroutine_fn
1140 nbd_co_receive_blockstatus_reply(BDRVNBDState *s, uint64_t cookie,
1141                                  uint64_t length, NBDExtent64 *extent,
1142                                  int *request_ret, Error **errp)
1143 {
1144     NBDReplyChunkIter iter;
1145     NBDReply reply;
1146     void *payload = NULL;
1147     Error *local_err = NULL;
1148     bool received = false;
1149 
1150     assert(!extent->length);
1151     NBD_FOREACH_REPLY_CHUNK(s, iter, cookie, false, NULL, &reply, &payload) {
1152         int ret;
1153         NBDStructuredReplyChunk *chunk = &reply.structured;
1154         bool wide;
1155 
1156         assert(nbd_reply_is_structured(&reply));
1157 
1158         switch (chunk->type) {
1159         case NBD_REPLY_TYPE_BLOCK_STATUS_EXT:
1160         case NBD_REPLY_TYPE_BLOCK_STATUS:
1161             wide = chunk->type == NBD_REPLY_TYPE_BLOCK_STATUS_EXT;
1162             if ((s->info.mode >= NBD_MODE_EXTENDED) != wide) {
1163                 trace_nbd_extended_headers_compliance("block_status");
1164             }
1165             if (received) {
1166                 nbd_channel_error(s, -EINVAL);
1167                 error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
1168                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1169             }
1170             received = true;
1171 
1172             ret = nbd_parse_blockstatus_payload(
1173                 s, &reply.structured, payload, wide,
1174                 length, extent, &local_err);
1175             if (ret < 0) {
1176                 nbd_channel_error(s, ret);
1177                 nbd_iter_channel_error(&iter, ret, &local_err);
1178             }
1179             break;
1180         default:
1181             if (!nbd_reply_type_is_error(chunk->type)) {
1182                 nbd_channel_error(s, -EINVAL);
1183                 error_setg(&local_err,
1184                            "Unexpected reply type: %d (%s) "
1185                            "for CMD_BLOCK_STATUS",
1186                            chunk->type, nbd_reply_type_lookup(chunk->type));
1187                 nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1188             }
1189         }
1190 
1191         g_free(payload);
1192         payload = NULL;
1193     }
1194 
1195     if (!extent->length && !iter.request_ret) {
1196         error_setg(&local_err, "Server did not reply with any status extents");
1197         nbd_iter_channel_error(&iter, -EIO, &local_err);
1198     }
1199 
1200     error_propagate(errp, iter.err);
1201     *request_ret = iter.request_ret;
1202     return iter.ret;
1203 }
1204 
1205 static int coroutine_fn GRAPH_RDLOCK
1206 nbd_co_request(BlockDriverState *bs, NBDRequest *request,
1207                QEMUIOVector *write_qiov)
1208 {
1209     int ret, request_ret;
1210     Error *local_err = NULL;
1211     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1212 
1213     assert(request->type != NBD_CMD_READ);
1214     if (write_qiov) {
1215         assert(request->type == NBD_CMD_WRITE);
1216         assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
1217     } else {
1218         assert(request->type != NBD_CMD_WRITE);
1219     }
1220 
1221     do {
1222         ret = nbd_co_send_request(bs, request, write_qiov);
1223         if (ret < 0) {
1224             continue;
1225         }
1226 
1227         ret = nbd_co_receive_return_code(s, request->cookie,
1228                                          &request_ret, &local_err);
1229         if (local_err) {
1230             trace_nbd_co_request_fail(request->from, request->len,
1231                                       request->cookie, request->flags,
1232                                       request->type,
1233                                       nbd_cmd_lookup(request->type),
1234                                       ret, error_get_pretty(local_err));
1235             error_free(local_err);
1236             local_err = NULL;
1237         }
1238     } while (ret < 0 && nbd_client_will_reconnect(s));
1239 
1240     return ret ? ret : request_ret;
1241 }
1242 
1243 static int coroutine_fn GRAPH_RDLOCK
1244 nbd_client_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
1245                      QEMUIOVector *qiov, BdrvRequestFlags flags)
1246 {
1247     int ret, request_ret;
1248     Error *local_err = NULL;
1249     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1250     NBDRequest request = {
1251         .type = NBD_CMD_READ,
1252         .from = offset,
1253         .len = bytes,
1254     };
1255 
1256     assert(bytes <= NBD_MAX_BUFFER_SIZE);
1257 
1258     if (!bytes) {
1259         return 0;
1260     }
1261     /*
1262      * Work around the fact that the block layer doesn't do
1263      * byte-accurate sizing yet - if the read exceeds the server's
1264      * advertised size because the block layer rounded size up, then
1265      * truncate the request to the server and tail-pad with zero.
1266      */
1267     if (offset >= s->info.size) {
1268         assert(bytes < BDRV_SECTOR_SIZE);
1269         qemu_iovec_memset(qiov, 0, 0, bytes);
1270         return 0;
1271     }
1272     if (offset + bytes > s->info.size) {
1273         uint64_t slop = offset + bytes - s->info.size;
1274 
1275         assert(slop < BDRV_SECTOR_SIZE);
1276         qemu_iovec_memset(qiov, bytes - slop, 0, slop);
1277         request.len -= slop;
1278     }
1279 
1280     do {
1281         ret = nbd_co_send_request(bs, &request, NULL);
1282         if (ret < 0) {
1283             continue;
1284         }
1285 
1286         ret = nbd_co_receive_cmdread_reply(s, request.cookie, offset, qiov,
1287                                            &request_ret, &local_err);
1288         if (local_err) {
1289             trace_nbd_co_request_fail(request.from, request.len, request.cookie,
1290                                       request.flags, request.type,
1291                                       nbd_cmd_lookup(request.type),
1292                                       ret, error_get_pretty(local_err));
1293             error_free(local_err);
1294             local_err = NULL;
1295         }
1296     } while (ret < 0 && nbd_client_will_reconnect(s));
1297 
1298     return ret ? ret : request_ret;
1299 }
1300 
1301 static int coroutine_fn GRAPH_RDLOCK
1302 nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
1303                       QEMUIOVector *qiov, BdrvRequestFlags flags)
1304 {
1305     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1306     NBDRequest request = {
1307         .type = NBD_CMD_WRITE,
1308         .from = offset,
1309         .len = bytes,
1310     };
1311 
1312     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1313     if (flags & BDRV_REQ_FUA) {
1314         assert(s->info.flags & NBD_FLAG_SEND_FUA);
1315         request.flags |= NBD_CMD_FLAG_FUA;
1316     }
1317 
1318     assert(bytes <= NBD_MAX_BUFFER_SIZE);
1319 
1320     if (!bytes) {
1321         return 0;
1322     }
1323     return nbd_co_request(bs, &request, qiov);
1324 }
1325 
1326 static int coroutine_fn GRAPH_RDLOCK
1327 nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int64_t bytes,
1328                             BdrvRequestFlags flags)
1329 {
1330     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1331     NBDRequest request = {
1332         .type = NBD_CMD_WRITE_ZEROES,
1333         .from = offset,
1334         .len = bytes,
1335     };
1336 
1337     /* rely on max_pwrite_zeroes */
1338     assert(bytes <= UINT32_MAX || s->info.mode >= NBD_MODE_EXTENDED);
1339 
1340     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1341     if (!(s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
1342         return -ENOTSUP;
1343     }
1344 
1345     if (flags & BDRV_REQ_FUA) {
1346         assert(s->info.flags & NBD_FLAG_SEND_FUA);
1347         request.flags |= NBD_CMD_FLAG_FUA;
1348     }
1349     if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1350         request.flags |= NBD_CMD_FLAG_NO_HOLE;
1351     }
1352     if (flags & BDRV_REQ_NO_FALLBACK) {
1353         assert(s->info.flags & NBD_FLAG_SEND_FAST_ZERO);
1354         request.flags |= NBD_CMD_FLAG_FAST_ZERO;
1355     }
1356 
1357     if (!bytes) {
1358         return 0;
1359     }
1360     return nbd_co_request(bs, &request, NULL);
1361 }
1362 
1363 static int coroutine_fn GRAPH_RDLOCK nbd_client_co_flush(BlockDriverState *bs)
1364 {
1365     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1366     NBDRequest request = { .type = NBD_CMD_FLUSH };
1367 
1368     if (!(s->info.flags & NBD_FLAG_SEND_FLUSH)) {
1369         return 0;
1370     }
1371 
1372     request.from = 0;
1373     request.len = 0;
1374 
1375     return nbd_co_request(bs, &request, NULL);
1376 }
1377 
1378 static int coroutine_fn GRAPH_RDLOCK
1379 nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes)
1380 {
1381     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1382     NBDRequest request = {
1383         .type = NBD_CMD_TRIM,
1384         .from = offset,
1385         .len = bytes,
1386     };
1387 
1388     /* rely on max_pdiscard */
1389     assert(bytes <= UINT32_MAX || s->info.mode >= NBD_MODE_EXTENDED);
1390 
1391     assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1392     if (!(s->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
1393         return 0;
1394     }
1395 
1396     return nbd_co_request(bs, &request, NULL);
1397 }
1398 
1399 static int coroutine_fn GRAPH_RDLOCK nbd_client_co_block_status(
1400         BlockDriverState *bs, bool want_zero, int64_t offset, int64_t bytes,
1401         int64_t *pnum, int64_t *map, BlockDriverState **file)
1402 {
1403     int ret, request_ret;
1404     NBDExtent64 extent = { 0 };
1405     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1406     Error *local_err = NULL;
1407 
1408     NBDRequest request = {
1409         .type = NBD_CMD_BLOCK_STATUS,
1410         .from = offset,
1411         .len = MIN(bytes, s->info.size - offset),
1412         .flags = NBD_CMD_FLAG_REQ_ONE,
1413     };
1414 
1415     if (!s->info.base_allocation) {
1416         *pnum = bytes;
1417         *map = offset;
1418         *file = bs;
1419         return BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1420     }
1421     if (s->info.mode < NBD_MODE_EXTENDED) {
1422         request.len = MIN(QEMU_ALIGN_DOWN(INT_MAX, bs->bl.request_alignment),
1423                           request.len);
1424     }
1425 
1426     /*
1427      * Work around the fact that the block layer doesn't do
1428      * byte-accurate sizing yet - if the status request exceeds the
1429      * server's advertised size because the block layer rounded size
1430      * up, we truncated the request to the server (above), or are
1431      * called on just the hole.
1432      */
1433     if (offset >= s->info.size) {
1434         *pnum = bytes;
1435         assert(bytes < BDRV_SECTOR_SIZE);
1436         /* Intentionally don't report offset_valid for the hole */
1437         return BDRV_BLOCK_ZERO;
1438     }
1439 
1440     if (s->info.min_block) {
1441         assert(QEMU_IS_ALIGNED(request.len, s->info.min_block));
1442     }
1443     do {
1444         ret = nbd_co_send_request(bs, &request, NULL);
1445         if (ret < 0) {
1446             continue;
1447         }
1448 
1449         ret = nbd_co_receive_blockstatus_reply(s, request.cookie, bytes,
1450                                                &extent, &request_ret,
1451                                                &local_err);
1452         if (local_err) {
1453             trace_nbd_co_request_fail(request.from, request.len, request.cookie,
1454                                       request.flags, request.type,
1455                                       nbd_cmd_lookup(request.type),
1456                                       ret, error_get_pretty(local_err));
1457             error_free(local_err);
1458             local_err = NULL;
1459         }
1460     } while (ret < 0 && nbd_client_will_reconnect(s));
1461 
1462     if (ret < 0 || request_ret < 0) {
1463         return ret ? ret : request_ret;
1464     }
1465 
1466     assert(extent.length);
1467     *pnum = extent.length;
1468     *map = offset;
1469     *file = bs;
1470     return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
1471         (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0) |
1472         BDRV_BLOCK_OFFSET_VALID;
1473 }
1474 
1475 static int nbd_client_reopen_prepare(BDRVReopenState *state,
1476                                      BlockReopenQueue *queue, Error **errp)
1477 {
1478     BDRVNBDState *s = (BDRVNBDState *)state->bs->opaque;
1479 
1480     if ((state->flags & BDRV_O_RDWR) && (s->info.flags & NBD_FLAG_READ_ONLY)) {
1481         error_setg(errp, "Can't reopen read-only NBD mount as read/write");
1482         return -EACCES;
1483     }
1484     return 0;
1485 }
1486 
1487 static void nbd_yank(void *opaque)
1488 {
1489     BlockDriverState *bs = opaque;
1490     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1491 
1492     QEMU_LOCK_GUARD(&s->requests_lock);
1493     qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1494     s->state = NBD_CLIENT_QUIT;
1495 }
1496 
1497 static void nbd_client_close(BlockDriverState *bs)
1498 {
1499     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1500     NBDRequest request = { .type = NBD_CMD_DISC, .mode = s->info.mode };
1501 
1502     if (s->ioc) {
1503         nbd_send_request(s->ioc, &request);
1504     }
1505 
1506     nbd_teardown_connection(bs);
1507 }
1508 
1509 
1510 /*
1511  * Parse nbd_open options
1512  */
1513 
1514 static int nbd_parse_uri(const char *filename, QDict *options)
1515 {
1516     URI *uri;
1517     const char *p;
1518     QueryParams *qp = NULL;
1519     int ret = 0;
1520     bool is_unix;
1521 
1522     uri = uri_parse(filename);
1523     if (!uri) {
1524         return -EINVAL;
1525     }
1526 
1527     /* transport */
1528     if (!g_strcmp0(uri->scheme, "nbd")) {
1529         is_unix = false;
1530     } else if (!g_strcmp0(uri->scheme, "nbd+tcp")) {
1531         is_unix = false;
1532     } else if (!g_strcmp0(uri->scheme, "nbd+unix")) {
1533         is_unix = true;
1534     } else {
1535         ret = -EINVAL;
1536         goto out;
1537     }
1538 
1539     p = uri->path ? uri->path : "";
1540     if (p[0] == '/') {
1541         p++;
1542     }
1543     if (p[0]) {
1544         qdict_put_str(options, "export", p);
1545     }
1546 
1547     qp = query_params_parse(uri->query);
1548     if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) {
1549         ret = -EINVAL;
1550         goto out;
1551     }
1552 
1553     if (is_unix) {
1554         /* nbd+unix:///export?socket=path */
1555         if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
1556             ret = -EINVAL;
1557             goto out;
1558         }
1559         qdict_put_str(options, "server.type", "unix");
1560         qdict_put_str(options, "server.path", qp->p[0].value);
1561     } else {
1562         QString *host;
1563         char *port_str;
1564 
1565         /* nbd[+tcp]://host[:port]/export */
1566         if (!uri->server) {
1567             ret = -EINVAL;
1568             goto out;
1569         }
1570 
1571         /* strip braces from literal IPv6 address */
1572         if (uri->server[0] == '[') {
1573             host = qstring_from_substr(uri->server, 1,
1574                                        strlen(uri->server) - 1);
1575         } else {
1576             host = qstring_from_str(uri->server);
1577         }
1578 
1579         qdict_put_str(options, "server.type", "inet");
1580         qdict_put(options, "server.host", host);
1581 
1582         port_str = g_strdup_printf("%d", uri->port ?: NBD_DEFAULT_PORT);
1583         qdict_put_str(options, "server.port", port_str);
1584         g_free(port_str);
1585     }
1586 
1587 out:
1588     if (qp) {
1589         query_params_free(qp);
1590     }
1591     uri_free(uri);
1592     return ret;
1593 }
1594 
1595 static bool nbd_has_filename_options_conflict(QDict *options, Error **errp)
1596 {
1597     const QDictEntry *e;
1598 
1599     for (e = qdict_first(options); e; e = qdict_next(options, e)) {
1600         if (!strcmp(e->key, "host") ||
1601             !strcmp(e->key, "port") ||
1602             !strcmp(e->key, "path") ||
1603             !strcmp(e->key, "export") ||
1604             strstart(e->key, "server.", NULL))
1605         {
1606             error_setg(errp, "Option '%s' cannot be used with a file name",
1607                        e->key);
1608             return true;
1609         }
1610     }
1611 
1612     return false;
1613 }
1614 
1615 static void nbd_parse_filename(const char *filename, QDict *options,
1616                                Error **errp)
1617 {
1618     g_autofree char *file = NULL;
1619     char *export_name;
1620     const char *host_spec;
1621     const char *unixpath;
1622 
1623     if (nbd_has_filename_options_conflict(options, errp)) {
1624         return;
1625     }
1626 
1627     if (strstr(filename, "://")) {
1628         int ret = nbd_parse_uri(filename, options);
1629         if (ret < 0) {
1630             error_setg(errp, "No valid URL specified");
1631         }
1632         return;
1633     }
1634 
1635     file = g_strdup(filename);
1636 
1637     export_name = strstr(file, EN_OPTSTR);
1638     if (export_name) {
1639         if (export_name[strlen(EN_OPTSTR)] == 0) {
1640             return;
1641         }
1642         export_name[0] = 0; /* truncate 'file' */
1643         export_name += strlen(EN_OPTSTR);
1644 
1645         qdict_put_str(options, "export", export_name);
1646     }
1647 
1648     /* extract the host_spec - fail if it's not nbd:... */
1649     if (!strstart(file, "nbd:", &host_spec)) {
1650         error_setg(errp, "File name string for NBD must start with 'nbd:'");
1651         return;
1652     }
1653 
1654     if (!*host_spec) {
1655         return;
1656     }
1657 
1658     /* are we a UNIX or TCP socket? */
1659     if (strstart(host_spec, "unix:", &unixpath)) {
1660         qdict_put_str(options, "server.type", "unix");
1661         qdict_put_str(options, "server.path", unixpath);
1662     } else {
1663         InetSocketAddress *addr = g_new(InetSocketAddress, 1);
1664 
1665         if (inet_parse(addr, host_spec, errp)) {
1666             goto out_inet;
1667         }
1668 
1669         qdict_put_str(options, "server.type", "inet");
1670         qdict_put_str(options, "server.host", addr->host);
1671         qdict_put_str(options, "server.port", addr->port);
1672     out_inet:
1673         qapi_free_InetSocketAddress(addr);
1674     }
1675 }
1676 
1677 static bool nbd_process_legacy_socket_options(QDict *output_options,
1678                                               QemuOpts *legacy_opts,
1679                                               Error **errp)
1680 {
1681     const char *path = qemu_opt_get(legacy_opts, "path");
1682     const char *host = qemu_opt_get(legacy_opts, "host");
1683     const char *port = qemu_opt_get(legacy_opts, "port");
1684     const QDictEntry *e;
1685 
1686     if (!path && !host && !port) {
1687         return true;
1688     }
1689 
1690     for (e = qdict_first(output_options); e; e = qdict_next(output_options, e))
1691     {
1692         if (strstart(e->key, "server.", NULL)) {
1693             error_setg(errp, "Cannot use 'server' and path/host/port at the "
1694                        "same time");
1695             return false;
1696         }
1697     }
1698 
1699     if (path && host) {
1700         error_setg(errp, "path and host may not be used at the same time");
1701         return false;
1702     } else if (path) {
1703         if (port) {
1704             error_setg(errp, "port may not be used without host");
1705             return false;
1706         }
1707 
1708         qdict_put_str(output_options, "server.type", "unix");
1709         qdict_put_str(output_options, "server.path", path);
1710     } else if (host) {
1711         qdict_put_str(output_options, "server.type", "inet");
1712         qdict_put_str(output_options, "server.host", host);
1713         qdict_put_str(output_options, "server.port",
1714                       port ?: stringify(NBD_DEFAULT_PORT));
1715     }
1716 
1717     return true;
1718 }
1719 
1720 static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options,
1721                                  Error **errp)
1722 {
1723     SocketAddress *saddr = NULL;
1724     QDict *addr = NULL;
1725     Visitor *iv = NULL;
1726 
1727     qdict_extract_subqdict(options, &addr, "server.");
1728     if (!qdict_size(addr)) {
1729         error_setg(errp, "NBD server address missing");
1730         goto done;
1731     }
1732 
1733     iv = qobject_input_visitor_new_flat_confused(addr, errp);
1734     if (!iv) {
1735         goto done;
1736     }
1737 
1738     if (!visit_type_SocketAddress(iv, NULL, &saddr, errp)) {
1739         goto done;
1740     }
1741 
1742     if (socket_address_parse_named_fd(saddr, errp) < 0) {
1743         qapi_free_SocketAddress(saddr);
1744         saddr = NULL;
1745         goto done;
1746     }
1747 
1748 done:
1749     qobject_unref(addr);
1750     visit_free(iv);
1751     return saddr;
1752 }
1753 
1754 static QCryptoTLSCreds *nbd_get_tls_creds(const char *id, Error **errp)
1755 {
1756     Object *obj;
1757     QCryptoTLSCreds *creds;
1758 
1759     obj = object_resolve_path_component(
1760         object_get_objects_root(), id);
1761     if (!obj) {
1762         error_setg(errp, "No TLS credentials with id '%s'",
1763                    id);
1764         return NULL;
1765     }
1766     creds = (QCryptoTLSCreds *)
1767         object_dynamic_cast(obj, TYPE_QCRYPTO_TLS_CREDS);
1768     if (!creds) {
1769         error_setg(errp, "Object with id '%s' is not TLS credentials",
1770                    id);
1771         return NULL;
1772     }
1773 
1774     if (!qcrypto_tls_creds_check_endpoint(creds,
1775                                           QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT,
1776                                           errp)) {
1777         return NULL;
1778     }
1779     object_ref(obj);
1780     return creds;
1781 }
1782 
1783 
1784 static QemuOptsList nbd_runtime_opts = {
1785     .name = "nbd",
1786     .head = QTAILQ_HEAD_INITIALIZER(nbd_runtime_opts.head),
1787     .desc = {
1788         {
1789             .name = "host",
1790             .type = QEMU_OPT_STRING,
1791             .help = "TCP host to connect to",
1792         },
1793         {
1794             .name = "port",
1795             .type = QEMU_OPT_STRING,
1796             .help = "TCP port to connect to",
1797         },
1798         {
1799             .name = "path",
1800             .type = QEMU_OPT_STRING,
1801             .help = "Unix socket path to connect to",
1802         },
1803         {
1804             .name = "export",
1805             .type = QEMU_OPT_STRING,
1806             .help = "Name of the NBD export to open",
1807         },
1808         {
1809             .name = "tls-creds",
1810             .type = QEMU_OPT_STRING,
1811             .help = "ID of the TLS credentials to use",
1812         },
1813         {
1814             .name = "tls-hostname",
1815             .type = QEMU_OPT_STRING,
1816             .help = "Override hostname for validating TLS x509 certificate",
1817         },
1818         {
1819             .name = "x-dirty-bitmap",
1820             .type = QEMU_OPT_STRING,
1821             .help = "experimental: expose named dirty bitmap in place of "
1822                     "block status",
1823         },
1824         {
1825             .name = "reconnect-delay",
1826             .type = QEMU_OPT_NUMBER,
1827             .help = "On an unexpected disconnect, the nbd client tries to "
1828                     "connect again until succeeding or encountering a serious "
1829                     "error.  During the first @reconnect-delay seconds, all "
1830                     "requests are paused and will be rerun on a successful "
1831                     "reconnect. After that time, any delayed requests and all "
1832                     "future requests before a successful reconnect will "
1833                     "immediately fail. Default 0",
1834         },
1835         {
1836             .name = "open-timeout",
1837             .type = QEMU_OPT_NUMBER,
1838             .help = "In seconds. If zero, the nbd driver tries the connection "
1839                     "only once, and fails to open if the connection fails. "
1840                     "If non-zero, the nbd driver will repeat connection "
1841                     "attempts until successful or until @open-timeout seconds "
1842                     "have elapsed. Default 0",
1843         },
1844         { /* end of list */ }
1845     },
1846 };
1847 
1848 static int nbd_process_options(BlockDriverState *bs, QDict *options,
1849                                Error **errp)
1850 {
1851     BDRVNBDState *s = bs->opaque;
1852     QemuOpts *opts;
1853     int ret = -EINVAL;
1854 
1855     opts = qemu_opts_create(&nbd_runtime_opts, NULL, 0, &error_abort);
1856     if (!qemu_opts_absorb_qdict(opts, options, errp)) {
1857         goto error;
1858     }
1859 
1860     /* Translate @host, @port, and @path to a SocketAddress */
1861     if (!nbd_process_legacy_socket_options(options, opts, errp)) {
1862         goto error;
1863     }
1864 
1865     /* Pop the config into our state object. Exit if invalid. */
1866     s->saddr = nbd_config(s, options, errp);
1867     if (!s->saddr) {
1868         goto error;
1869     }
1870 
1871     s->export = g_strdup(qemu_opt_get(opts, "export"));
1872     if (s->export && strlen(s->export) > NBD_MAX_STRING_SIZE) {
1873         error_setg(errp, "export name too long to send to server");
1874         goto error;
1875     }
1876 
1877     s->tlscredsid = g_strdup(qemu_opt_get(opts, "tls-creds"));
1878     if (s->tlscredsid) {
1879         s->tlscreds = nbd_get_tls_creds(s->tlscredsid, errp);
1880         if (!s->tlscreds) {
1881             goto error;
1882         }
1883 
1884         s->tlshostname = g_strdup(qemu_opt_get(opts, "tls-hostname"));
1885         if (!s->tlshostname &&
1886             s->saddr->type == SOCKET_ADDRESS_TYPE_INET) {
1887             s->tlshostname = g_strdup(s->saddr->u.inet.host);
1888         }
1889     }
1890 
1891     s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap"));
1892     if (s->x_dirty_bitmap && strlen(s->x_dirty_bitmap) > NBD_MAX_STRING_SIZE) {
1893         error_setg(errp, "x-dirty-bitmap query too long to send to server");
1894         goto error;
1895     }
1896 
1897     s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
1898     s->open_timeout = qemu_opt_get_number(opts, "open-timeout", 0);
1899 
1900     ret = 0;
1901 
1902  error:
1903     qemu_opts_del(opts);
1904     return ret;
1905 }
1906 
1907 static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
1908                     Error **errp)
1909 {
1910     int ret;
1911     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1912 
1913     s->bs = bs;
1914     qemu_mutex_init(&s->requests_lock);
1915     qemu_co_queue_init(&s->free_sema);
1916     qemu_co_mutex_init(&s->send_mutex);
1917     qemu_co_mutex_init(&s->receive_mutex);
1918 
1919     if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
1920         return -EEXIST;
1921     }
1922 
1923     ret = nbd_process_options(bs, options, errp);
1924     if (ret < 0) {
1925         goto fail;
1926     }
1927 
1928     s->conn = nbd_client_connection_new(s->saddr, true, s->export,
1929                                         s->x_dirty_bitmap, s->tlscreds,
1930                                         s->tlshostname);
1931 
1932     if (s->open_timeout) {
1933         nbd_client_connection_enable_retry(s->conn);
1934         open_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
1935                         s->open_timeout * NANOSECONDS_PER_SECOND);
1936     }
1937 
1938     s->state = NBD_CLIENT_CONNECTING_WAIT;
1939     ret = nbd_do_establish_connection(bs, true, errp);
1940     if (ret < 0) {
1941         goto fail;
1942     }
1943 
1944     /*
1945      * The connect attempt is done, so we no longer need this timer.
1946      * Delete it, because we do not want it to be around when this node
1947      * is drained or closed.
1948      */
1949     open_timer_del(s);
1950 
1951     nbd_client_connection_enable_retry(s->conn);
1952 
1953     return 0;
1954 
1955 fail:
1956     open_timer_del(s);
1957     nbd_clear_bdrvstate(bs);
1958     return ret;
1959 }
1960 
1961 static void nbd_refresh_limits(BlockDriverState *bs, Error **errp)
1962 {
1963     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1964     uint32_t min = s->info.min_block;
1965     uint32_t max = MIN_NON_ZERO(NBD_MAX_BUFFER_SIZE, s->info.max_block);
1966 
1967     /*
1968      * If the server did not advertise an alignment:
1969      * - a size that is not sector-aligned implies that an alignment
1970      *   of 1 can be used to access those tail bytes
1971      * - advertisement of block status requires an alignment of 1, so
1972      *   that we don't violate block layer constraints that block
1973      *   status is always aligned (as we can't control whether the
1974      *   server will report sub-sector extents, such as a hole at EOF
1975      *   on an unaligned POSIX file)
1976      * - otherwise, assume the server is so old that we are safer avoiding
1977      *   sub-sector requests
1978      */
1979     if (!min) {
1980         min = (!QEMU_IS_ALIGNED(s->info.size, BDRV_SECTOR_SIZE) ||
1981                s->info.base_allocation) ? 1 : BDRV_SECTOR_SIZE;
1982     }
1983 
1984     bs->bl.request_alignment = min;
1985     bs->bl.max_pdiscard = QEMU_ALIGN_DOWN(INT_MAX, min);
1986     bs->bl.max_pwrite_zeroes = max;
1987     bs->bl.max_transfer = max;
1988 
1989     /*
1990      * Assume that if the server supports extended headers, it also
1991      * supports unlimited size zero and trim commands.
1992      */
1993     if (s->info.mode >= NBD_MODE_EXTENDED) {
1994         bs->bl.max_pdiscard = bs->bl.max_pwrite_zeroes = 0;
1995     }
1996 
1997     if (s->info.opt_block &&
1998         s->info.opt_block > bs->bl.opt_transfer) {
1999         bs->bl.opt_transfer = s->info.opt_block;
2000     }
2001 }
2002 
2003 static void nbd_close(BlockDriverState *bs)
2004 {
2005     nbd_client_close(bs);
2006     nbd_clear_bdrvstate(bs);
2007 }
2008 
2009 /*
2010  * NBD cannot truncate, but if the caller asks to truncate to the same size, or
2011  * to a smaller size with exact=false, there is no reason to fail the
2012  * operation.
2013  *
2014  * Preallocation mode is ignored since it does not seems useful to fail when
2015  * we never change anything.
2016  */
2017 static int coroutine_fn nbd_co_truncate(BlockDriverState *bs, int64_t offset,
2018                                         bool exact, PreallocMode prealloc,
2019                                         BdrvRequestFlags flags, Error **errp)
2020 {
2021     BDRVNBDState *s = bs->opaque;
2022 
2023     if (offset != s->info.size && exact) {
2024         error_setg(errp, "Cannot resize NBD nodes");
2025         return -ENOTSUP;
2026     }
2027 
2028     if (offset > s->info.size) {
2029         error_setg(errp, "Cannot grow NBD nodes");
2030         return -EINVAL;
2031     }
2032 
2033     return 0;
2034 }
2035 
2036 static int64_t coroutine_fn nbd_co_getlength(BlockDriverState *bs)
2037 {
2038     BDRVNBDState *s = bs->opaque;
2039 
2040     return s->info.size;
2041 }
2042 
2043 static void nbd_refresh_filename(BlockDriverState *bs)
2044 {
2045     BDRVNBDState *s = bs->opaque;
2046     const char *host = NULL, *port = NULL, *path = NULL;
2047     size_t len = 0;
2048 
2049     if (s->saddr->type == SOCKET_ADDRESS_TYPE_INET) {
2050         const InetSocketAddress *inet = &s->saddr->u.inet;
2051         if (!inet->has_ipv4 && !inet->has_ipv6 && !inet->has_to) {
2052             host = inet->host;
2053             port = inet->port;
2054         }
2055     } else if (s->saddr->type == SOCKET_ADDRESS_TYPE_UNIX) {
2056         path = s->saddr->u.q_unix.path;
2057     } /* else can't represent as pseudo-filename */
2058 
2059     if (path && s->export) {
2060         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2061                        "nbd+unix:///%s?socket=%s", s->export, path);
2062     } else if (path && !s->export) {
2063         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2064                        "nbd+unix://?socket=%s", path);
2065     } else if (host && s->export) {
2066         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2067                        "nbd://%s:%s/%s", host, port, s->export);
2068     } else if (host && !s->export) {
2069         len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2070                        "nbd://%s:%s", host, port);
2071     }
2072     if (len >= sizeof(bs->exact_filename)) {
2073         /* Name is too long to represent exactly, so leave it empty. */
2074         bs->exact_filename[0] = '\0';
2075     }
2076 }
2077 
2078 static char *nbd_dirname(BlockDriverState *bs, Error **errp)
2079 {
2080     /* The generic bdrv_dirname() implementation is able to work out some
2081      * directory name for NBD nodes, but that would be wrong. So far there is no
2082      * specification for how "export paths" would work, so NBD does not have
2083      * directory names. */
2084     error_setg(errp, "Cannot generate a base directory for NBD nodes");
2085     return NULL;
2086 }
2087 
2088 static const char *const nbd_strong_runtime_opts[] = {
2089     "path",
2090     "host",
2091     "port",
2092     "export",
2093     "tls-creds",
2094     "tls-hostname",
2095     "server.",
2096 
2097     NULL
2098 };
2099 
2100 static void nbd_cancel_in_flight(BlockDriverState *bs)
2101 {
2102     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
2103 
2104     reconnect_delay_timer_del(s);
2105 
2106     qemu_mutex_lock(&s->requests_lock);
2107     if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
2108         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
2109     }
2110     qemu_mutex_unlock(&s->requests_lock);
2111 
2112     nbd_co_establish_connection_cancel(s->conn);
2113 }
2114 
2115 static void nbd_attach_aio_context(BlockDriverState *bs,
2116                                    AioContext *new_context)
2117 {
2118     BDRVNBDState *s = bs->opaque;
2119 
2120     /* The open_timer is used only during nbd_open() */
2121     assert(!s->open_timer);
2122 
2123     /*
2124      * The reconnect_delay_timer is scheduled in I/O paths when the
2125      * connection is lost, to cancel the reconnection attempt after a
2126      * given time.  Once this attempt is done (successfully or not),
2127      * nbd_reconnect_attempt() ensures the timer is deleted before the
2128      * respective I/O request is resumed.
2129      * Since the AioContext can only be changed when a node is drained,
2130      * the reconnect_delay_timer cannot be active here.
2131      */
2132     assert(!s->reconnect_delay_timer);
2133 }
2134 
2135 static void nbd_detach_aio_context(BlockDriverState *bs)
2136 {
2137     BDRVNBDState *s = bs->opaque;
2138 
2139     assert(!s->open_timer);
2140     assert(!s->reconnect_delay_timer);
2141 }
2142 
2143 static BlockDriver bdrv_nbd = {
2144     .format_name                = "nbd",
2145     .protocol_name              = "nbd",
2146     .instance_size              = sizeof(BDRVNBDState),
2147     .bdrv_parse_filename        = nbd_parse_filename,
2148     .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
2149     .create_opts                = &bdrv_create_opts_simple,
2150     .bdrv_file_open             = nbd_open,
2151     .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2152     .bdrv_co_preadv             = nbd_client_co_preadv,
2153     .bdrv_co_pwritev            = nbd_client_co_pwritev,
2154     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2155     .bdrv_close                 = nbd_close,
2156     .bdrv_co_flush_to_os        = nbd_client_co_flush,
2157     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2158     .bdrv_refresh_limits        = nbd_refresh_limits,
2159     .bdrv_co_truncate           = nbd_co_truncate,
2160     .bdrv_co_getlength          = nbd_co_getlength,
2161     .bdrv_refresh_filename      = nbd_refresh_filename,
2162     .bdrv_co_block_status       = nbd_client_co_block_status,
2163     .bdrv_dirname               = nbd_dirname,
2164     .strong_runtime_opts        = nbd_strong_runtime_opts,
2165     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
2166 
2167     .bdrv_attach_aio_context    = nbd_attach_aio_context,
2168     .bdrv_detach_aio_context    = nbd_detach_aio_context,
2169 };
2170 
2171 static BlockDriver bdrv_nbd_tcp = {
2172     .format_name                = "nbd",
2173     .protocol_name              = "nbd+tcp",
2174     .instance_size              = sizeof(BDRVNBDState),
2175     .bdrv_parse_filename        = nbd_parse_filename,
2176     .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
2177     .create_opts                = &bdrv_create_opts_simple,
2178     .bdrv_file_open             = nbd_open,
2179     .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2180     .bdrv_co_preadv             = nbd_client_co_preadv,
2181     .bdrv_co_pwritev            = nbd_client_co_pwritev,
2182     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2183     .bdrv_close                 = nbd_close,
2184     .bdrv_co_flush_to_os        = nbd_client_co_flush,
2185     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2186     .bdrv_refresh_limits        = nbd_refresh_limits,
2187     .bdrv_co_truncate           = nbd_co_truncate,
2188     .bdrv_co_getlength          = nbd_co_getlength,
2189     .bdrv_refresh_filename      = nbd_refresh_filename,
2190     .bdrv_co_block_status       = nbd_client_co_block_status,
2191     .bdrv_dirname               = nbd_dirname,
2192     .strong_runtime_opts        = nbd_strong_runtime_opts,
2193     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
2194 
2195     .bdrv_attach_aio_context    = nbd_attach_aio_context,
2196     .bdrv_detach_aio_context    = nbd_detach_aio_context,
2197 };
2198 
2199 static BlockDriver bdrv_nbd_unix = {
2200     .format_name                = "nbd",
2201     .protocol_name              = "nbd+unix",
2202     .instance_size              = sizeof(BDRVNBDState),
2203     .bdrv_parse_filename        = nbd_parse_filename,
2204     .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
2205     .create_opts                = &bdrv_create_opts_simple,
2206     .bdrv_file_open             = nbd_open,
2207     .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2208     .bdrv_co_preadv             = nbd_client_co_preadv,
2209     .bdrv_co_pwritev            = nbd_client_co_pwritev,
2210     .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2211     .bdrv_close                 = nbd_close,
2212     .bdrv_co_flush_to_os        = nbd_client_co_flush,
2213     .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2214     .bdrv_refresh_limits        = nbd_refresh_limits,
2215     .bdrv_co_truncate           = nbd_co_truncate,
2216     .bdrv_co_getlength          = nbd_co_getlength,
2217     .bdrv_refresh_filename      = nbd_refresh_filename,
2218     .bdrv_co_block_status       = nbd_client_co_block_status,
2219     .bdrv_dirname               = nbd_dirname,
2220     .strong_runtime_opts        = nbd_strong_runtime_opts,
2221     .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
2222 
2223     .bdrv_attach_aio_context    = nbd_attach_aio_context,
2224     .bdrv_detach_aio_context    = nbd_detach_aio_context,
2225 };
2226 
2227 static void bdrv_nbd_init(void)
2228 {
2229     bdrv_register(&bdrv_nbd);
2230     bdrv_register(&bdrv_nbd_tcp);
2231     bdrv_register(&bdrv_nbd_unix);
2232 }
2233 
2234 block_init(bdrv_nbd_init);
2235