xref: /openbmc/qemu/nbd/server.c (revision 13b5ae94)
1 /*
2  *  Copyright Red Hat
3  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
4  *
5  *  Network Block Device Server Side
6  *
7  *  This program is free software; you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License as published by
9  *  the Free Software Foundation; under version 2 of the License.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "qemu/osdep.h"
21 
22 #include "block/block_int.h"
23 #include "block/export.h"
24 #include "block/dirty-bitmap.h"
25 #include "qapi/error.h"
26 #include "qemu/queue.h"
27 #include "trace.h"
28 #include "nbd-internal.h"
29 #include "qemu/units.h"
30 #include "qemu/memalign.h"
31 
32 #define NBD_META_ID_BASE_ALLOCATION 0
33 #define NBD_META_ID_ALLOCATION_DEPTH 1
34 /* Dirty bitmaps use 'NBD_META_ID_DIRTY_BITMAP + i', so keep this id last. */
35 #define NBD_META_ID_DIRTY_BITMAP 2
36 
37 /*
38  * NBD_MAX_BLOCK_STATUS_EXTENTS: 1 MiB of extents data. An empirical
39  * constant. If an increase is needed, note that the NBD protocol
40  * recommends no larger than 32 mb, so that the client won't consider
41  * the reply as a denial of service attack.
42  */
43 #define NBD_MAX_BLOCK_STATUS_EXTENTS (1 * MiB / 8)
44 
45 static int system_errno_to_nbd_errno(int err)
46 {
47     switch (err) {
48     case 0:
49         return NBD_SUCCESS;
50     case EPERM:
51     case EROFS:
52         return NBD_EPERM;
53     case EIO:
54         return NBD_EIO;
55     case ENOMEM:
56         return NBD_ENOMEM;
57 #ifdef EDQUOT
58     case EDQUOT:
59 #endif
60     case EFBIG:
61     case ENOSPC:
62         return NBD_ENOSPC;
63     case EOVERFLOW:
64         return NBD_EOVERFLOW;
65     case ENOTSUP:
66 #if ENOTSUP != EOPNOTSUPP
67     case EOPNOTSUPP:
68 #endif
69         return NBD_ENOTSUP;
70     case ESHUTDOWN:
71         return NBD_ESHUTDOWN;
72     case EINVAL:
73     default:
74         return NBD_EINVAL;
75     }
76 }
77 
78 /* Definitions for opaque data types */
79 
80 typedef struct NBDRequestData NBDRequestData;
81 
82 struct NBDRequestData {
83     NBDClient *client;
84     uint8_t *data;
85     bool complete;
86 };
87 
88 struct NBDExport {
89     BlockExport common;
90 
91     char *name;
92     char *description;
93     uint64_t size;
94     uint16_t nbdflags;
95     QTAILQ_HEAD(, NBDClient) clients;
96     QTAILQ_ENTRY(NBDExport) next;
97 
98     BlockBackend *eject_notifier_blk;
99     Notifier eject_notifier;
100 
101     bool allocation_depth;
102     BdrvDirtyBitmap **export_bitmaps;
103     size_t nr_export_bitmaps;
104 };
105 
106 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
107 
108 /*
109  * NBDMetaContexts represents a list of meta contexts in use,
110  * as selected by NBD_OPT_SET_META_CONTEXT. Also used for
111  * NBD_OPT_LIST_META_CONTEXT.
112  */
113 struct NBDMetaContexts {
114     const NBDExport *exp; /* associated export */
115     size_t count; /* number of negotiated contexts */
116     bool base_allocation; /* export base:allocation context (block status) */
117     bool allocation_depth; /* export qemu:allocation-depth */
118     bool *bitmaps; /*
119                     * export qemu:dirty-bitmap:<export bitmap name>,
120                     * sized by exp->nr_export_bitmaps
121                     */
122 };
123 
124 struct NBDClient {
125     int refcount; /* atomic */
126     void (*close_fn)(NBDClient *client, bool negotiated);
127 
128     QemuMutex lock;
129 
130     NBDExport *exp;
131     QCryptoTLSCreds *tlscreds;
132     char *tlsauthz;
133     QIOChannelSocket *sioc; /* The underlying data channel */
134     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
135 
136     Coroutine *recv_coroutine; /* protected by lock */
137 
138     CoMutex send_lock;
139     Coroutine *send_coroutine;
140 
141     bool read_yielding; /* protected by lock */
142     bool quiescing; /* protected by lock */
143 
144     QTAILQ_ENTRY(NBDClient) next;
145     int nb_requests; /* protected by lock */
146     bool closing; /* protected by lock */
147 
148     uint32_t check_align; /* If non-zero, check for aligned client requests */
149 
150     NBDMode mode;
151     NBDMetaContexts contexts; /* Negotiated meta contexts */
152 
153     uint32_t opt; /* Current option being negotiated */
154     uint32_t optlen; /* remaining length of data in ioc for the option being
155                         negotiated now */
156 };
157 
158 static void nbd_client_receive_next_request(NBDClient *client);
159 
160 /* Basic flow for negotiation
161 
162    Server         Client
163    Negotiate
164 
165    or
166 
167    Server         Client
168    Negotiate #1
169                   Option
170    Negotiate #2
171 
172    ----
173 
174    followed by
175 
176    Server         Client
177                   Request
178    Response
179                   Request
180    Response
181                   ...
182    ...
183                   Request (type == 2)
184 
185 */
186 
187 static inline void set_be_option_rep(NBDOptionReply *rep, uint32_t option,
188                                      uint32_t type, uint32_t length)
189 {
190     stq_be_p(&rep->magic, NBD_REP_MAGIC);
191     stl_be_p(&rep->option, option);
192     stl_be_p(&rep->type, type);
193     stl_be_p(&rep->length, length);
194 }
195 
196 /* Send a reply header, including length, but no payload.
197  * Return -errno on error, 0 on success. */
198 static coroutine_fn int
199 nbd_negotiate_send_rep_len(NBDClient *client, uint32_t type,
200                            uint32_t len, Error **errp)
201 {
202     NBDOptionReply rep;
203 
204     trace_nbd_negotiate_send_rep_len(client->opt, nbd_opt_lookup(client->opt),
205                                      type, nbd_rep_lookup(type), len);
206 
207     assert(len < NBD_MAX_BUFFER_SIZE);
208 
209     set_be_option_rep(&rep, client->opt, type, len);
210     return nbd_write(client->ioc, &rep, sizeof(rep), errp);
211 }
212 
213 /* Send a reply header with default 0 length.
214  * Return -errno on error, 0 on success. */
215 static coroutine_fn int
216 nbd_negotiate_send_rep(NBDClient *client, uint32_t type, Error **errp)
217 {
218     return nbd_negotiate_send_rep_len(client, type, 0, errp);
219 }
220 
221 /* Send an error reply.
222  * Return -errno on error, 0 on success. */
223 static coroutine_fn int G_GNUC_PRINTF(4, 0)
224 nbd_negotiate_send_rep_verr(NBDClient *client, uint32_t type,
225                             Error **errp, const char *fmt, va_list va)
226 {
227     ERRP_GUARD();
228     g_autofree char *msg = NULL;
229     int ret;
230     size_t len;
231 
232     msg = g_strdup_vprintf(fmt, va);
233     len = strlen(msg);
234     assert(len < NBD_MAX_STRING_SIZE);
235     trace_nbd_negotiate_send_rep_err(msg);
236     ret = nbd_negotiate_send_rep_len(client, type, len, errp);
237     if (ret < 0) {
238         return ret;
239     }
240     if (nbd_write(client->ioc, msg, len, errp) < 0) {
241         error_prepend(errp, "write failed (error message): ");
242         return -EIO;
243     }
244 
245     return 0;
246 }
247 
248 /*
249  * Return a malloc'd copy of @name suitable for use in an error reply.
250  */
251 static char *
252 nbd_sanitize_name(const char *name)
253 {
254     if (strnlen(name, 80) < 80) {
255         return g_strdup(name);
256     }
257     /* XXX Should we also try to sanitize any control characters? */
258     return g_strdup_printf("%.80s...", name);
259 }
260 
261 /* Send an error reply.
262  * Return -errno on error, 0 on success. */
263 static coroutine_fn int G_GNUC_PRINTF(4, 5)
264 nbd_negotiate_send_rep_err(NBDClient *client, uint32_t type,
265                            Error **errp, const char *fmt, ...)
266 {
267     va_list va;
268     int ret;
269 
270     va_start(va, fmt);
271     ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
272     va_end(va);
273     return ret;
274 }
275 
276 /* Drop remainder of the current option, and send a reply with the
277  * given error type and message. Return -errno on read or write
278  * failure; or 0 if connection is still live. */
279 static coroutine_fn int G_GNUC_PRINTF(4, 0)
280 nbd_opt_vdrop(NBDClient *client, uint32_t type, Error **errp,
281               const char *fmt, va_list va)
282 {
283     int ret = nbd_drop(client->ioc, client->optlen, errp);
284 
285     client->optlen = 0;
286     if (!ret) {
287         ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
288     }
289     return ret;
290 }
291 
292 static coroutine_fn int G_GNUC_PRINTF(4, 5)
293 nbd_opt_drop(NBDClient *client, uint32_t type, Error **errp,
294              const char *fmt, ...)
295 {
296     int ret;
297     va_list va;
298 
299     va_start(va, fmt);
300     ret = nbd_opt_vdrop(client, type, errp, fmt, va);
301     va_end(va);
302 
303     return ret;
304 }
305 
306 static coroutine_fn int G_GNUC_PRINTF(3, 4)
307 nbd_opt_invalid(NBDClient *client, Error **errp, const char *fmt, ...)
308 {
309     int ret;
310     va_list va;
311 
312     va_start(va, fmt);
313     ret = nbd_opt_vdrop(client, NBD_REP_ERR_INVALID, errp, fmt, va);
314     va_end(va);
315 
316     return ret;
317 }
318 
319 /* Read size bytes from the unparsed payload of the current option.
320  * If @check_nul, require that no NUL bytes appear in buffer.
321  * Return -errno on I/O error, 0 if option was completely handled by
322  * sending a reply about inconsistent lengths, or 1 on success. */
323 static coroutine_fn int
324 nbd_opt_read(NBDClient *client, void *buffer, size_t size,
325              bool check_nul, Error **errp)
326 {
327     if (size > client->optlen) {
328         return nbd_opt_invalid(client, errp,
329                                "Inconsistent lengths in option %s",
330                                nbd_opt_lookup(client->opt));
331     }
332     client->optlen -= size;
333     if (qio_channel_read_all(client->ioc, buffer, size, errp) < 0) {
334         return -EIO;
335     }
336 
337     if (check_nul && strnlen(buffer, size) != size) {
338         return nbd_opt_invalid(client, errp,
339                                "Unexpected embedded NUL in option %s",
340                                nbd_opt_lookup(client->opt));
341     }
342     return 1;
343 }
344 
345 /* Drop size bytes from the unparsed payload of the current option.
346  * Return -errno on I/O error, 0 if option was completely handled by
347  * sending a reply about inconsistent lengths, or 1 on success. */
348 static coroutine_fn int
349 nbd_opt_skip(NBDClient *client, size_t size, Error **errp)
350 {
351     if (size > client->optlen) {
352         return nbd_opt_invalid(client, errp,
353                                "Inconsistent lengths in option %s",
354                                nbd_opt_lookup(client->opt));
355     }
356     client->optlen -= size;
357     return nbd_drop(client->ioc, size, errp) < 0 ? -EIO : 1;
358 }
359 
360 /* nbd_opt_read_name
361  *
362  * Read a string with the format:
363  *   uint32_t len     (<= NBD_MAX_STRING_SIZE)
364  *   len bytes string (not 0-terminated)
365  *
366  * On success, @name will be allocated.
367  * If @length is non-null, it will be set to the actual string length.
368  *
369  * Return -errno on I/O error, 0 if option was completely handled by
370  * sending a reply about inconsistent lengths, or 1 on success.
371  */
372 static coroutine_fn int
373 nbd_opt_read_name(NBDClient *client, char **name, uint32_t *length,
374                   Error **errp)
375 {
376     int ret;
377     uint32_t len;
378     g_autofree char *local_name = NULL;
379 
380     *name = NULL;
381     ret = nbd_opt_read(client, &len, sizeof(len), false, errp);
382     if (ret <= 0) {
383         return ret;
384     }
385     len = cpu_to_be32(len);
386 
387     if (len > NBD_MAX_STRING_SIZE) {
388         return nbd_opt_invalid(client, errp,
389                                "Invalid name length: %" PRIu32, len);
390     }
391 
392     local_name = g_malloc(len + 1);
393     ret = nbd_opt_read(client, local_name, len, true, errp);
394     if (ret <= 0) {
395         return ret;
396     }
397     local_name[len] = '\0';
398 
399     if (length) {
400         *length = len;
401     }
402     *name = g_steal_pointer(&local_name);
403 
404     return 1;
405 }
406 
407 /* Send a single NBD_REP_SERVER reply to NBD_OPT_LIST, including payload.
408  * Return -errno on error, 0 on success. */
409 static coroutine_fn int
410 nbd_negotiate_send_rep_list(NBDClient *client, NBDExport *exp, Error **errp)
411 {
412     ERRP_GUARD();
413     size_t name_len, desc_len;
414     uint32_t len;
415     const char *name = exp->name ? exp->name : "";
416     const char *desc = exp->description ? exp->description : "";
417     QIOChannel *ioc = client->ioc;
418     int ret;
419 
420     trace_nbd_negotiate_send_rep_list(name, desc);
421     name_len = strlen(name);
422     desc_len = strlen(desc);
423     assert(name_len <= NBD_MAX_STRING_SIZE && desc_len <= NBD_MAX_STRING_SIZE);
424     len = name_len + desc_len + sizeof(len);
425     ret = nbd_negotiate_send_rep_len(client, NBD_REP_SERVER, len, errp);
426     if (ret < 0) {
427         return ret;
428     }
429 
430     len = cpu_to_be32(name_len);
431     if (nbd_write(ioc, &len, sizeof(len), errp) < 0) {
432         error_prepend(errp, "write failed (name length): ");
433         return -EINVAL;
434     }
435 
436     if (nbd_write(ioc, name, name_len, errp) < 0) {
437         error_prepend(errp, "write failed (name buffer): ");
438         return -EINVAL;
439     }
440 
441     if (nbd_write(ioc, desc, desc_len, errp) < 0) {
442         error_prepend(errp, "write failed (description buffer): ");
443         return -EINVAL;
444     }
445 
446     return 0;
447 }
448 
449 /* Process the NBD_OPT_LIST command, with a potential series of replies.
450  * Return -errno on error, 0 on success. */
451 static coroutine_fn int
452 nbd_negotiate_handle_list(NBDClient *client, Error **errp)
453 {
454     NBDExport *exp;
455     assert(client->opt == NBD_OPT_LIST);
456 
457     /* For each export, send a NBD_REP_SERVER reply. */
458     QTAILQ_FOREACH(exp, &exports, next) {
459         if (nbd_negotiate_send_rep_list(client, exp, errp)) {
460             return -EINVAL;
461         }
462     }
463     /* Finish with a NBD_REP_ACK. */
464     return nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
465 }
466 
467 static coroutine_fn void
468 nbd_check_meta_export(NBDClient *client, NBDExport *exp)
469 {
470     if (exp != client->contexts.exp) {
471         client->contexts.count = 0;
472     }
473 }
474 
475 /* Send a reply to NBD_OPT_EXPORT_NAME.
476  * Return -errno on error, 0 on success. */
477 static coroutine_fn int
478 nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes,
479                                  Error **errp)
480 {
481     ERRP_GUARD();
482     g_autofree char *name = NULL;
483     char buf[NBD_REPLY_EXPORT_NAME_SIZE] = "";
484     size_t len;
485     int ret;
486     uint16_t myflags;
487 
488     /* Client sends:
489         [20 ..  xx]   export name (length bytes)
490        Server replies:
491         [ 0 ..   7]   size
492         [ 8 ..   9]   export flags
493         [10 .. 133]   reserved     (0) [unless no_zeroes]
494      */
495     trace_nbd_negotiate_handle_export_name();
496     if (client->mode >= NBD_MODE_EXTENDED) {
497         error_setg(errp, "Extended headers already negotiated");
498         return -EINVAL;
499     }
500     if (client->optlen > NBD_MAX_STRING_SIZE) {
501         error_setg(errp, "Bad length received");
502         return -EINVAL;
503     }
504     name = g_malloc(client->optlen + 1);
505     if (nbd_read(client->ioc, name, client->optlen, "export name", errp) < 0) {
506         return -EIO;
507     }
508     name[client->optlen] = '\0';
509     client->optlen = 0;
510 
511     trace_nbd_negotiate_handle_export_name_request(name);
512 
513     client->exp = nbd_export_find(name);
514     if (!client->exp) {
515         error_setg(errp, "export not found");
516         return -EINVAL;
517     }
518     nbd_check_meta_export(client, client->exp);
519 
520     myflags = client->exp->nbdflags;
521     if (client->mode >= NBD_MODE_STRUCTURED) {
522         myflags |= NBD_FLAG_SEND_DF;
523     }
524     if (client->mode >= NBD_MODE_EXTENDED && client->contexts.count) {
525         myflags |= NBD_FLAG_BLOCK_STAT_PAYLOAD;
526     }
527     trace_nbd_negotiate_new_style_size_flags(client->exp->size, myflags);
528     stq_be_p(buf, client->exp->size);
529     stw_be_p(buf + 8, myflags);
530     len = no_zeroes ? 10 : sizeof(buf);
531     ret = nbd_write(client->ioc, buf, len, errp);
532     if (ret < 0) {
533         error_prepend(errp, "write failed: ");
534         return ret;
535     }
536 
537     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
538     blk_exp_ref(&client->exp->common);
539 
540     return 0;
541 }
542 
543 /* Send a single NBD_REP_INFO, with a buffer @buf of @length bytes.
544  * The buffer does NOT include the info type prefix.
545  * Return -errno on error, 0 if ready to send more. */
546 static coroutine_fn int
547 nbd_negotiate_send_info(NBDClient *client, uint16_t info, uint32_t length,
548                         void *buf, Error **errp)
549 {
550     int rc;
551 
552     trace_nbd_negotiate_send_info(info, nbd_info_lookup(info), length);
553     rc = nbd_negotiate_send_rep_len(client, NBD_REP_INFO,
554                                     sizeof(info) + length, errp);
555     if (rc < 0) {
556         return rc;
557     }
558     info = cpu_to_be16(info);
559     if (nbd_write(client->ioc, &info, sizeof(info), errp) < 0) {
560         return -EIO;
561     }
562     if (nbd_write(client->ioc, buf, length, errp) < 0) {
563         return -EIO;
564     }
565     return 0;
566 }
567 
568 /* nbd_reject_length: Handle any unexpected payload.
569  * @fatal requests that we quit talking to the client, even if we are able
570  * to successfully send an error reply.
571  * Return:
572  * -errno  transmission error occurred or @fatal was requested, errp is set
573  * 0       error message successfully sent to client, errp is not set
574  */
575 static coroutine_fn int
576 nbd_reject_length(NBDClient *client, bool fatal, Error **errp)
577 {
578     int ret;
579 
580     assert(client->optlen);
581     ret = nbd_opt_invalid(client, errp, "option '%s' has unexpected length",
582                           nbd_opt_lookup(client->opt));
583     if (fatal && !ret) {
584         error_setg(errp, "option '%s' has unexpected length",
585                    nbd_opt_lookup(client->opt));
586         return -EINVAL;
587     }
588     return ret;
589 }
590 
591 /* Handle NBD_OPT_INFO and NBD_OPT_GO.
592  * Return -errno on error, 0 if ready for next option, and 1 to move
593  * into transmission phase.  */
594 static coroutine_fn int
595 nbd_negotiate_handle_info(NBDClient *client, Error **errp)
596 {
597     int rc;
598     g_autofree char *name = NULL;
599     NBDExport *exp;
600     uint16_t requests;
601     uint16_t request;
602     uint32_t namelen = 0;
603     bool sendname = false;
604     bool blocksize = false;
605     uint32_t sizes[3];
606     char buf[sizeof(uint64_t) + sizeof(uint16_t)];
607     uint32_t check_align = 0;
608     uint16_t myflags;
609 
610     /* Client sends:
611         4 bytes: L, name length (can be 0)
612         L bytes: export name
613         2 bytes: N, number of requests (can be 0)
614         N * 2 bytes: N requests
615     */
616     rc = nbd_opt_read_name(client, &name, &namelen, errp);
617     if (rc <= 0) {
618         return rc;
619     }
620     trace_nbd_negotiate_handle_export_name_request(name);
621 
622     rc = nbd_opt_read(client, &requests, sizeof(requests), false, errp);
623     if (rc <= 0) {
624         return rc;
625     }
626     requests = be16_to_cpu(requests);
627     trace_nbd_negotiate_handle_info_requests(requests);
628     while (requests--) {
629         rc = nbd_opt_read(client, &request, sizeof(request), false, errp);
630         if (rc <= 0) {
631             return rc;
632         }
633         request = be16_to_cpu(request);
634         trace_nbd_negotiate_handle_info_request(request,
635                                                 nbd_info_lookup(request));
636         /* We care about NBD_INFO_NAME and NBD_INFO_BLOCK_SIZE;
637          * everything else is either a request we don't know or
638          * something we send regardless of request */
639         switch (request) {
640         case NBD_INFO_NAME:
641             sendname = true;
642             break;
643         case NBD_INFO_BLOCK_SIZE:
644             blocksize = true;
645             break;
646         }
647     }
648     if (client->optlen) {
649         return nbd_reject_length(client, false, errp);
650     }
651 
652     exp = nbd_export_find(name);
653     if (!exp) {
654         g_autofree char *sane_name = nbd_sanitize_name(name);
655 
656         return nbd_negotiate_send_rep_err(client, NBD_REP_ERR_UNKNOWN,
657                                           errp, "export '%s' not present",
658                                           sane_name);
659     }
660     if (client->opt == NBD_OPT_GO) {
661         nbd_check_meta_export(client, exp);
662     }
663 
664     /* Don't bother sending NBD_INFO_NAME unless client requested it */
665     if (sendname) {
666         rc = nbd_negotiate_send_info(client, NBD_INFO_NAME, namelen, name,
667                                      errp);
668         if (rc < 0) {
669             return rc;
670         }
671     }
672 
673     /* Send NBD_INFO_DESCRIPTION only if available, regardless of
674      * client request */
675     if (exp->description) {
676         size_t len = strlen(exp->description);
677 
678         assert(len <= NBD_MAX_STRING_SIZE);
679         rc = nbd_negotiate_send_info(client, NBD_INFO_DESCRIPTION,
680                                      len, exp->description, errp);
681         if (rc < 0) {
682             return rc;
683         }
684     }
685 
686     /* Send NBD_INFO_BLOCK_SIZE always, but tweak the minimum size
687      * according to whether the client requested it, and according to
688      * whether this is OPT_INFO or OPT_GO. */
689     /* minimum - 1 for back-compat, or actual if client will obey it. */
690     if (client->opt == NBD_OPT_INFO || blocksize) {
691         check_align = sizes[0] = blk_get_request_alignment(exp->common.blk);
692     } else {
693         sizes[0] = 1;
694     }
695     assert(sizes[0] <= NBD_MAX_BUFFER_SIZE);
696     /* preferred - Hard-code to 4096 for now.
697      * TODO: is blk_bs(blk)->bl.opt_transfer appropriate? */
698     sizes[1] = MAX(4096, sizes[0]);
699     /* maximum - At most 32M, but smaller as appropriate. */
700     sizes[2] = MIN(blk_get_max_transfer(exp->common.blk), NBD_MAX_BUFFER_SIZE);
701     trace_nbd_negotiate_handle_info_block_size(sizes[0], sizes[1], sizes[2]);
702     sizes[0] = cpu_to_be32(sizes[0]);
703     sizes[1] = cpu_to_be32(sizes[1]);
704     sizes[2] = cpu_to_be32(sizes[2]);
705     rc = nbd_negotiate_send_info(client, NBD_INFO_BLOCK_SIZE,
706                                  sizeof(sizes), sizes, errp);
707     if (rc < 0) {
708         return rc;
709     }
710 
711     /* Send NBD_INFO_EXPORT always */
712     myflags = exp->nbdflags;
713     if (client->mode >= NBD_MODE_STRUCTURED) {
714         myflags |= NBD_FLAG_SEND_DF;
715     }
716     if (client->mode >= NBD_MODE_EXTENDED &&
717         (client->contexts.count || client->opt == NBD_OPT_INFO)) {
718         myflags |= NBD_FLAG_BLOCK_STAT_PAYLOAD;
719     }
720     trace_nbd_negotiate_new_style_size_flags(exp->size, myflags);
721     stq_be_p(buf, exp->size);
722     stw_be_p(buf + 8, myflags);
723     rc = nbd_negotiate_send_info(client, NBD_INFO_EXPORT,
724                                  sizeof(buf), buf, errp);
725     if (rc < 0) {
726         return rc;
727     }
728 
729     /*
730      * If the client is just asking for NBD_OPT_INFO, but forgot to
731      * request block sizes in a situation that would impact
732      * performance, then return an error. But for NBD_OPT_GO, we
733      * tolerate all clients, regardless of alignments.
734      */
735     if (client->opt == NBD_OPT_INFO && !blocksize &&
736         blk_get_request_alignment(exp->common.blk) > 1) {
737         return nbd_negotiate_send_rep_err(client,
738                                           NBD_REP_ERR_BLOCK_SIZE_REQD,
739                                           errp,
740                                           "request NBD_INFO_BLOCK_SIZE to "
741                                           "use this export");
742     }
743 
744     /* Final reply */
745     rc = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
746     if (rc < 0) {
747         return rc;
748     }
749 
750     if (client->opt == NBD_OPT_GO) {
751         client->exp = exp;
752         client->check_align = check_align;
753         QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
754         blk_exp_ref(&client->exp->common);
755         rc = 1;
756     }
757     return rc;
758 }
759 
760 /* Callback to learn when QIO TLS upgrade is complete */
761 struct NBDTLSServerHandshakeData {
762     bool complete;
763     Error *error;
764     Coroutine *co;
765 };
766 
767 static void
768 nbd_server_tls_handshake(QIOTask *task, void *opaque)
769 {
770     struct NBDTLSServerHandshakeData *data = opaque;
771 
772     qio_task_propagate_error(task, &data->error);
773     data->complete = true;
774     if (!qemu_coroutine_entered(data->co)) {
775         aio_co_wake(data->co);
776     }
777 }
778 
779 /* Handle NBD_OPT_STARTTLS. Return NULL to drop connection, or else the
780  * new channel for all further (now-encrypted) communication. */
781 static coroutine_fn QIOChannel *
782 nbd_negotiate_handle_starttls(NBDClient *client, Error **errp)
783 {
784     QIOChannel *ioc;
785     QIOChannelTLS *tioc;
786     struct NBDTLSServerHandshakeData data = { 0 };
787 
788     assert(client->opt == NBD_OPT_STARTTLS);
789 
790     trace_nbd_negotiate_handle_starttls();
791     ioc = client->ioc;
792 
793     if (nbd_negotiate_send_rep(client, NBD_REP_ACK, errp) < 0) {
794         return NULL;
795     }
796 
797     tioc = qio_channel_tls_new_server(ioc,
798                                       client->tlscreds,
799                                       client->tlsauthz,
800                                       errp);
801     if (!tioc) {
802         return NULL;
803     }
804 
805     qio_channel_set_name(QIO_CHANNEL(tioc), "nbd-server-tls");
806     trace_nbd_negotiate_handle_starttls_handshake();
807     data.co = qemu_coroutine_self();
808     qio_channel_tls_handshake(tioc,
809                               nbd_server_tls_handshake,
810                               &data,
811                               NULL,
812                               NULL);
813 
814     if (!data.complete) {
815         qemu_coroutine_yield();
816         assert(data.complete);
817     }
818 
819     if (data.error) {
820         object_unref(OBJECT(tioc));
821         error_propagate(errp, data.error);
822         return NULL;
823     }
824 
825     return QIO_CHANNEL(tioc);
826 }
827 
828 /* nbd_negotiate_send_meta_context
829  *
830  * Send one chunk of reply to NBD_OPT_{LIST,SET}_META_CONTEXT
831  *
832  * For NBD_OPT_LIST_META_CONTEXT @context_id is ignored, 0 is used instead.
833  */
834 static coroutine_fn int
835 nbd_negotiate_send_meta_context(NBDClient *client, const char *context,
836                                 uint32_t context_id, Error **errp)
837 {
838     NBDOptionReplyMetaContext opt;
839     struct iovec iov[] = {
840         {.iov_base = &opt, .iov_len = sizeof(opt)},
841         {.iov_base = (void *)context, .iov_len = strlen(context)}
842     };
843 
844     assert(iov[1].iov_len <= NBD_MAX_STRING_SIZE);
845     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
846         context_id = 0;
847     }
848 
849     trace_nbd_negotiate_meta_query_reply(context, context_id);
850     set_be_option_rep(&opt.h, client->opt, NBD_REP_META_CONTEXT,
851                       sizeof(opt) - sizeof(opt.h) + iov[1].iov_len);
852     stl_be_p(&opt.context_id, context_id);
853 
854     return qio_channel_writev_all(client->ioc, iov, 2, errp) < 0 ? -EIO : 0;
855 }
856 
857 /*
858  * Return true if @query matches @pattern, or if @query is empty when
859  * the @client is performing _LIST_.
860  */
861 static coroutine_fn bool
862 nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
863                           const char *query)
864 {
865     if (!*query) {
866         trace_nbd_negotiate_meta_query_parse("empty");
867         return client->opt == NBD_OPT_LIST_META_CONTEXT;
868     }
869     if (strcmp(query, pattern) == 0) {
870         trace_nbd_negotiate_meta_query_parse(pattern);
871         return true;
872     }
873     trace_nbd_negotiate_meta_query_skip("pattern not matched");
874     return false;
875 }
876 
877 /*
878  * Return true and adjust @str in place if it begins with @prefix.
879  */
880 static coroutine_fn bool
881 nbd_strshift(const char **str, const char *prefix)
882 {
883     size_t len = strlen(prefix);
884 
885     if (strncmp(*str, prefix, len) == 0) {
886         *str += len;
887         return true;
888     }
889     return false;
890 }
891 
892 /* nbd_meta_base_query
893  *
894  * Handle queries to 'base' namespace. For now, only the base:allocation
895  * context is available.  Return true if @query has been handled.
896  */
897 static coroutine_fn bool
898 nbd_meta_base_query(NBDClient *client, NBDMetaContexts *meta,
899                     const char *query)
900 {
901     if (!nbd_strshift(&query, "base:")) {
902         return false;
903     }
904     trace_nbd_negotiate_meta_query_parse("base:");
905 
906     if (nbd_meta_empty_or_pattern(client, "allocation", query)) {
907         meta->base_allocation = true;
908     }
909     return true;
910 }
911 
912 /* nbd_meta_qemu_query
913  *
914  * Handle queries to 'qemu' namespace. For now, only the qemu:dirty-bitmap:
915  * and qemu:allocation-depth contexts are available.  Return true if @query
916  * has been handled.
917  */
918 static coroutine_fn bool
919 nbd_meta_qemu_query(NBDClient *client, NBDMetaContexts *meta,
920                     const char *query)
921 {
922     size_t i;
923 
924     if (!nbd_strshift(&query, "qemu:")) {
925         return false;
926     }
927     trace_nbd_negotiate_meta_query_parse("qemu:");
928 
929     if (!*query) {
930         if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
931             meta->allocation_depth = meta->exp->allocation_depth;
932             if (meta->exp->nr_export_bitmaps) {
933                 memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
934             }
935         }
936         trace_nbd_negotiate_meta_query_parse("empty");
937         return true;
938     }
939 
940     if (strcmp(query, "allocation-depth") == 0) {
941         trace_nbd_negotiate_meta_query_parse("allocation-depth");
942         meta->allocation_depth = meta->exp->allocation_depth;
943         return true;
944     }
945 
946     if (nbd_strshift(&query, "dirty-bitmap:")) {
947         trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
948         if (!*query) {
949             if (client->opt == NBD_OPT_LIST_META_CONTEXT &&
950                 meta->exp->nr_export_bitmaps) {
951                 memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
952             }
953             trace_nbd_negotiate_meta_query_parse("empty");
954             return true;
955         }
956 
957         for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
958             const char *bm_name;
959 
960             bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
961             if (strcmp(bm_name, query) == 0) {
962                 meta->bitmaps[i] = true;
963                 trace_nbd_negotiate_meta_query_parse(query);
964                 return true;
965             }
966         }
967         trace_nbd_negotiate_meta_query_skip("no dirty-bitmap match");
968         return true;
969     }
970 
971     trace_nbd_negotiate_meta_query_skip("unknown qemu context");
972     return true;
973 }
974 
975 /* nbd_negotiate_meta_query
976  *
977  * Parse namespace name and call corresponding function to parse body of the
978  * query.
979  *
980  * The only supported namespaces are 'base' and 'qemu'.
981  *
982  * Return -errno on I/O error, 0 if option was completely handled by
983  * sending a reply about inconsistent lengths, or 1 on success. */
984 static coroutine_fn int
985 nbd_negotiate_meta_query(NBDClient *client,
986                          NBDMetaContexts *meta, Error **errp)
987 {
988     int ret;
989     g_autofree char *query = NULL;
990     uint32_t len;
991 
992     ret = nbd_opt_read(client, &len, sizeof(len), false, errp);
993     if (ret <= 0) {
994         return ret;
995     }
996     len = cpu_to_be32(len);
997 
998     if (len > NBD_MAX_STRING_SIZE) {
999         trace_nbd_negotiate_meta_query_skip("length too long");
1000         return nbd_opt_skip(client, len, errp);
1001     }
1002 
1003     query = g_malloc(len + 1);
1004     ret = nbd_opt_read(client, query, len, true, errp);
1005     if (ret <= 0) {
1006         return ret;
1007     }
1008     query[len] = '\0';
1009 
1010     if (nbd_meta_base_query(client, meta, query)) {
1011         return 1;
1012     }
1013     if (nbd_meta_qemu_query(client, meta, query)) {
1014         return 1;
1015     }
1016 
1017     trace_nbd_negotiate_meta_query_skip("unknown namespace");
1018     return 1;
1019 }
1020 
1021 /* nbd_negotiate_meta_queries
1022  * Handle NBD_OPT_LIST_META_CONTEXT and NBD_OPT_SET_META_CONTEXT
1023  *
1024  * Return -errno on I/O error, or 0 if option was completely handled. */
1025 static coroutine_fn int
1026 nbd_negotiate_meta_queries(NBDClient *client, Error **errp)
1027 {
1028     int ret;
1029     g_autofree char *export_name = NULL;
1030     /* Mark unused to work around https://bugs.llvm.org/show_bug.cgi?id=3888 */
1031     g_autofree G_GNUC_UNUSED bool *bitmaps = NULL;
1032     NBDMetaContexts local_meta = {0};
1033     NBDMetaContexts *meta;
1034     uint32_t nb_queries;
1035     size_t i;
1036     size_t count = 0;
1037 
1038     if (client->opt == NBD_OPT_SET_META_CONTEXT &&
1039         client->mode < NBD_MODE_STRUCTURED) {
1040         return nbd_opt_invalid(client, errp,
1041                                "request option '%s' when structured reply "
1042                                "is not negotiated",
1043                                nbd_opt_lookup(client->opt));
1044     }
1045 
1046     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
1047         /* Only change the caller's meta on SET. */
1048         meta = &local_meta;
1049     } else {
1050         meta = &client->contexts;
1051     }
1052 
1053     g_free(meta->bitmaps);
1054     memset(meta, 0, sizeof(*meta));
1055 
1056     ret = nbd_opt_read_name(client, &export_name, NULL, errp);
1057     if (ret <= 0) {
1058         return ret;
1059     }
1060 
1061     meta->exp = nbd_export_find(export_name);
1062     if (meta->exp == NULL) {
1063         g_autofree char *sane_name = nbd_sanitize_name(export_name);
1064 
1065         return nbd_opt_drop(client, NBD_REP_ERR_UNKNOWN, errp,
1066                             "export '%s' not present", sane_name);
1067     }
1068     meta->bitmaps = g_new0(bool, meta->exp->nr_export_bitmaps);
1069     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
1070         bitmaps = meta->bitmaps;
1071     }
1072 
1073     ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), false, errp);
1074     if (ret <= 0) {
1075         return ret;
1076     }
1077     nb_queries = cpu_to_be32(nb_queries);
1078     trace_nbd_negotiate_meta_context(nbd_opt_lookup(client->opt),
1079                                      export_name, nb_queries);
1080 
1081     if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
1082         /* enable all known contexts */
1083         meta->base_allocation = true;
1084         meta->allocation_depth = meta->exp->allocation_depth;
1085         if (meta->exp->nr_export_bitmaps) {
1086             memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
1087         }
1088     } else {
1089         for (i = 0; i < nb_queries; ++i) {
1090             ret = nbd_negotiate_meta_query(client, meta, errp);
1091             if (ret <= 0) {
1092                 return ret;
1093             }
1094         }
1095     }
1096 
1097     if (meta->base_allocation) {
1098         ret = nbd_negotiate_send_meta_context(client, "base:allocation",
1099                                               NBD_META_ID_BASE_ALLOCATION,
1100                                               errp);
1101         if (ret < 0) {
1102             return ret;
1103         }
1104         count++;
1105     }
1106 
1107     if (meta->allocation_depth) {
1108         ret = nbd_negotiate_send_meta_context(client, "qemu:allocation-depth",
1109                                               NBD_META_ID_ALLOCATION_DEPTH,
1110                                               errp);
1111         if (ret < 0) {
1112             return ret;
1113         }
1114         count++;
1115     }
1116 
1117     for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
1118         const char *bm_name;
1119         g_autofree char *context = NULL;
1120 
1121         if (!meta->bitmaps[i]) {
1122             continue;
1123         }
1124 
1125         bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
1126         context = g_strdup_printf("qemu:dirty-bitmap:%s", bm_name);
1127 
1128         ret = nbd_negotiate_send_meta_context(client, context,
1129                                               NBD_META_ID_DIRTY_BITMAP + i,
1130                                               errp);
1131         if (ret < 0) {
1132             return ret;
1133         }
1134         count++;
1135     }
1136 
1137     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1138     if (ret == 0) {
1139         meta->count = count;
1140     }
1141 
1142     return ret;
1143 }
1144 
1145 /* nbd_negotiate_options
1146  * Process all NBD_OPT_* client option commands, during fixed newstyle
1147  * negotiation.
1148  * Return:
1149  * -errno  on error, errp is set
1150  * 0       on successful negotiation, errp is not set
1151  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
1152  *         errp is not set
1153  */
1154 static coroutine_fn int
1155 nbd_negotiate_options(NBDClient *client, Error **errp)
1156 {
1157     uint32_t flags;
1158     bool fixedNewstyle = false;
1159     bool no_zeroes = false;
1160 
1161     /* Client sends:
1162         [ 0 ..   3]   client flags
1163 
1164        Then we loop until NBD_OPT_EXPORT_NAME or NBD_OPT_GO:
1165         [ 0 ..   7]   NBD_OPTS_MAGIC
1166         [ 8 ..  11]   NBD option
1167         [12 ..  15]   Data length
1168         ...           Rest of request
1169 
1170         [ 0 ..   7]   NBD_OPTS_MAGIC
1171         [ 8 ..  11]   Second NBD option
1172         [12 ..  15]   Data length
1173         ...           Rest of request
1174     */
1175 
1176     if (nbd_read32(client->ioc, &flags, "flags", errp) < 0) {
1177         return -EIO;
1178     }
1179     client->mode = NBD_MODE_EXPORT_NAME;
1180     trace_nbd_negotiate_options_flags(flags);
1181     if (flags & NBD_FLAG_C_FIXED_NEWSTYLE) {
1182         fixedNewstyle = true;
1183         flags &= ~NBD_FLAG_C_FIXED_NEWSTYLE;
1184         client->mode = NBD_MODE_SIMPLE;
1185     }
1186     if (flags & NBD_FLAG_C_NO_ZEROES) {
1187         no_zeroes = true;
1188         flags &= ~NBD_FLAG_C_NO_ZEROES;
1189     }
1190     if (flags != 0) {
1191         error_setg(errp, "Unknown client flags 0x%" PRIx32 " received", flags);
1192         return -EINVAL;
1193     }
1194 
1195     while (1) {
1196         int ret;
1197         uint32_t option, length;
1198         uint64_t magic;
1199 
1200         if (nbd_read64(client->ioc, &magic, "opts magic", errp) < 0) {
1201             return -EINVAL;
1202         }
1203         trace_nbd_negotiate_options_check_magic(magic);
1204         if (magic != NBD_OPTS_MAGIC) {
1205             error_setg(errp, "Bad magic received");
1206             return -EINVAL;
1207         }
1208 
1209         if (nbd_read32(client->ioc, &option, "option", errp) < 0) {
1210             return -EINVAL;
1211         }
1212         client->opt = option;
1213 
1214         if (nbd_read32(client->ioc, &length, "option length", errp) < 0) {
1215             return -EINVAL;
1216         }
1217         assert(!client->optlen);
1218         client->optlen = length;
1219 
1220         if (length > NBD_MAX_BUFFER_SIZE) {
1221             error_setg(errp, "len (%" PRIu32 ") is larger than max len (%u)",
1222                        length, NBD_MAX_BUFFER_SIZE);
1223             return -EINVAL;
1224         }
1225 
1226         trace_nbd_negotiate_options_check_option(option,
1227                                                  nbd_opt_lookup(option));
1228         if (client->tlscreds &&
1229             client->ioc == (QIOChannel *)client->sioc) {
1230             QIOChannel *tioc;
1231             if (!fixedNewstyle) {
1232                 error_setg(errp, "Unsupported option 0x%" PRIx32, option);
1233                 return -EINVAL;
1234             }
1235             switch (option) {
1236             case NBD_OPT_STARTTLS:
1237                 if (length) {
1238                     /* Unconditionally drop the connection if the client
1239                      * can't start a TLS negotiation correctly */
1240                     return nbd_reject_length(client, true, errp);
1241                 }
1242                 tioc = nbd_negotiate_handle_starttls(client, errp);
1243                 if (!tioc) {
1244                     return -EIO;
1245                 }
1246                 ret = 0;
1247                 object_unref(OBJECT(client->ioc));
1248                 client->ioc = tioc;
1249                 break;
1250 
1251             case NBD_OPT_EXPORT_NAME:
1252                 /* No way to return an error to client, so drop connection */
1253                 error_setg(errp, "Option 0x%x not permitted before TLS",
1254                            option);
1255                 return -EINVAL;
1256 
1257             default:
1258                 /* Let the client keep trying, unless they asked to
1259                  * quit. Always try to give an error back to the
1260                  * client; but when replying to OPT_ABORT, be aware
1261                  * that the client may hang up before receiving the
1262                  * error, in which case we are fine ignoring the
1263                  * resulting EPIPE. */
1264                 ret = nbd_opt_drop(client, NBD_REP_ERR_TLS_REQD,
1265                                    option == NBD_OPT_ABORT ? NULL : errp,
1266                                    "Option 0x%" PRIx32
1267                                    " not permitted before TLS", option);
1268                 if (option == NBD_OPT_ABORT) {
1269                     return 1;
1270                 }
1271                 break;
1272             }
1273         } else if (fixedNewstyle) {
1274             switch (option) {
1275             case NBD_OPT_LIST:
1276                 if (length) {
1277                     ret = nbd_reject_length(client, false, errp);
1278                 } else {
1279                     ret = nbd_negotiate_handle_list(client, errp);
1280                 }
1281                 break;
1282 
1283             case NBD_OPT_ABORT:
1284                 /* NBD spec says we must try to reply before
1285                  * disconnecting, but that we must also tolerate
1286                  * guests that don't wait for our reply. */
1287                 nbd_negotiate_send_rep(client, NBD_REP_ACK, NULL);
1288                 return 1;
1289 
1290             case NBD_OPT_EXPORT_NAME:
1291                 return nbd_negotiate_handle_export_name(client, no_zeroes,
1292                                                         errp);
1293 
1294             case NBD_OPT_INFO:
1295             case NBD_OPT_GO:
1296                 ret = nbd_negotiate_handle_info(client, errp);
1297                 if (ret == 1) {
1298                     assert(option == NBD_OPT_GO);
1299                     return 0;
1300                 }
1301                 break;
1302 
1303             case NBD_OPT_STARTTLS:
1304                 if (length) {
1305                     ret = nbd_reject_length(client, false, errp);
1306                 } else if (client->tlscreds) {
1307                     ret = nbd_negotiate_send_rep_err(client,
1308                                                      NBD_REP_ERR_INVALID, errp,
1309                                                      "TLS already enabled");
1310                 } else {
1311                     ret = nbd_negotiate_send_rep_err(client,
1312                                                      NBD_REP_ERR_POLICY, errp,
1313                                                      "TLS not configured");
1314                 }
1315                 break;
1316 
1317             case NBD_OPT_STRUCTURED_REPLY:
1318                 if (length) {
1319                     ret = nbd_reject_length(client, false, errp);
1320                 } else if (client->mode >= NBD_MODE_EXTENDED) {
1321                     ret = nbd_negotiate_send_rep_err(
1322                         client, NBD_REP_ERR_EXT_HEADER_REQD, errp,
1323                         "extended headers already negotiated");
1324                 } else if (client->mode >= NBD_MODE_STRUCTURED) {
1325                     ret = nbd_negotiate_send_rep_err(
1326                         client, NBD_REP_ERR_INVALID, errp,
1327                         "structured reply already negotiated");
1328                 } else {
1329                     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1330                     client->mode = NBD_MODE_STRUCTURED;
1331                 }
1332                 break;
1333 
1334             case NBD_OPT_LIST_META_CONTEXT:
1335             case NBD_OPT_SET_META_CONTEXT:
1336                 ret = nbd_negotiate_meta_queries(client, errp);
1337                 break;
1338 
1339             case NBD_OPT_EXTENDED_HEADERS:
1340                 if (length) {
1341                     ret = nbd_reject_length(client, false, errp);
1342                 } else if (client->mode >= NBD_MODE_EXTENDED) {
1343                     ret = nbd_negotiate_send_rep_err(
1344                         client, NBD_REP_ERR_INVALID, errp,
1345                         "extended headers already negotiated");
1346                 } else {
1347                     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1348                     client->mode = NBD_MODE_EXTENDED;
1349                 }
1350                 break;
1351 
1352             default:
1353                 ret = nbd_opt_drop(client, NBD_REP_ERR_UNSUP, errp,
1354                                    "Unsupported option %" PRIu32 " (%s)",
1355                                    option, nbd_opt_lookup(option));
1356                 break;
1357             }
1358         } else {
1359             /*
1360              * If broken new-style we should drop the connection
1361              * for anything except NBD_OPT_EXPORT_NAME
1362              */
1363             switch (option) {
1364             case NBD_OPT_EXPORT_NAME:
1365                 return nbd_negotiate_handle_export_name(client, no_zeroes,
1366                                                         errp);
1367 
1368             default:
1369                 error_setg(errp, "Unsupported option %" PRIu32 " (%s)",
1370                            option, nbd_opt_lookup(option));
1371                 return -EINVAL;
1372             }
1373         }
1374         if (ret < 0) {
1375             return ret;
1376         }
1377     }
1378 }
1379 
1380 /* nbd_negotiate
1381  * Return:
1382  * -errno  on error, errp is set
1383  * 0       on successful negotiation, errp is not set
1384  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
1385  *         errp is not set
1386  */
1387 static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
1388 {
1389     ERRP_GUARD();
1390     char buf[NBD_OLDSTYLE_NEGOTIATE_SIZE] = "";
1391     int ret;
1392 
1393     /* Old style negotiation header, no room for options
1394         [ 0 ..   7]   passwd       ("NBDMAGIC")
1395         [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
1396         [16 ..  23]   size
1397         [24 ..  27]   export flags (zero-extended)
1398         [28 .. 151]   reserved     (0)
1399 
1400        New style negotiation header, client can send options
1401         [ 0 ..   7]   passwd       ("NBDMAGIC")
1402         [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
1403         [16 ..  17]   server flags (0)
1404         ....options sent, ending in NBD_OPT_EXPORT_NAME or NBD_OPT_GO....
1405      */
1406 
1407     qio_channel_set_blocking(client->ioc, false, NULL);
1408     qio_channel_set_follow_coroutine_ctx(client->ioc, true);
1409 
1410     trace_nbd_negotiate_begin();
1411     memcpy(buf, "NBDMAGIC", 8);
1412 
1413     stq_be_p(buf + 8, NBD_OPTS_MAGIC);
1414     stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES);
1415 
1416     if (nbd_write(client->ioc, buf, 18, errp) < 0) {
1417         error_prepend(errp, "write failed: ");
1418         return -EINVAL;
1419     }
1420     ret = nbd_negotiate_options(client, errp);
1421     if (ret != 0) {
1422         if (ret < 0) {
1423             error_prepend(errp, "option negotiation failed: ");
1424         }
1425         return ret;
1426     }
1427 
1428     assert(!client->optlen);
1429     trace_nbd_negotiate_success();
1430 
1431     return 0;
1432 }
1433 
1434 /* nbd_read_eof
1435  * Tries to read @size bytes from @ioc. This is a local implementation of
1436  * qio_channel_readv_all_eof. We have it here because we need it to be
1437  * interruptible and to know when the coroutine is yielding.
1438  * Returns 1 on success
1439  *         0 on eof, when no data was read (errp is not set)
1440  *         negative errno on failure (errp is set)
1441  */
1442 static inline int coroutine_fn
1443 nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
1444 {
1445     bool partial = false;
1446 
1447     assert(size);
1448     while (size > 0) {
1449         struct iovec iov = { .iov_base = buffer, .iov_len = size };
1450         ssize_t len;
1451 
1452         len = qio_channel_readv(client->ioc, &iov, 1, errp);
1453         if (len == QIO_CHANNEL_ERR_BLOCK) {
1454             WITH_QEMU_LOCK_GUARD(&client->lock) {
1455                 client->read_yielding = true;
1456 
1457                 /* Prompt main loop thread to re-run nbd_drained_poll() */
1458                 aio_wait_kick();
1459             }
1460             qio_channel_yield(client->ioc, G_IO_IN);
1461             WITH_QEMU_LOCK_GUARD(&client->lock) {
1462                 client->read_yielding = false;
1463                 if (client->quiescing) {
1464                     return -EAGAIN;
1465                 }
1466             }
1467             continue;
1468         } else if (len < 0) {
1469             return -EIO;
1470         } else if (len == 0) {
1471             if (partial) {
1472                 error_setg(errp,
1473                            "Unexpected end-of-file before all bytes were read");
1474                 return -EIO;
1475             } else {
1476                 return 0;
1477             }
1478         }
1479 
1480         partial = true;
1481         size -= len;
1482         buffer = (uint8_t *) buffer + len;
1483     }
1484     return 1;
1485 }
1486 
1487 static int coroutine_fn nbd_receive_request(NBDClient *client, NBDRequest *request,
1488                                             Error **errp)
1489 {
1490     uint8_t buf[NBD_EXTENDED_REQUEST_SIZE];
1491     uint32_t magic, expect;
1492     int ret;
1493     size_t size = client->mode >= NBD_MODE_EXTENDED ?
1494         NBD_EXTENDED_REQUEST_SIZE : NBD_REQUEST_SIZE;
1495 
1496     ret = nbd_read_eof(client, buf, size, errp);
1497     if (ret < 0) {
1498         return ret;
1499     }
1500     if (ret == 0) {
1501         return -EIO;
1502     }
1503 
1504     /*
1505      * Compact request
1506      *  [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
1507      *  [ 4 ..  5]   flags   (NBD_CMD_FLAG_FUA, ...)
1508      *  [ 6 ..  7]   type    (NBD_CMD_READ, ...)
1509      *  [ 8 .. 15]   cookie
1510      *  [16 .. 23]   from
1511      *  [24 .. 27]   len
1512      * Extended request
1513      *  [ 0 ..  3]   magic   (NBD_EXTENDED_REQUEST_MAGIC)
1514      *  [ 4 ..  5]   flags   (NBD_CMD_FLAG_FUA, NBD_CMD_FLAG_PAYLOAD_LEN, ...)
1515      *  [ 6 ..  7]   type    (NBD_CMD_READ, ...)
1516      *  [ 8 .. 15]   cookie
1517      *  [16 .. 23]   from
1518      *  [24 .. 31]   len
1519      */
1520 
1521     magic = ldl_be_p(buf);
1522     request->flags  = lduw_be_p(buf + 4);
1523     request->type   = lduw_be_p(buf + 6);
1524     request->cookie = ldq_be_p(buf + 8);
1525     request->from   = ldq_be_p(buf + 16);
1526     if (client->mode >= NBD_MODE_EXTENDED) {
1527         request->len = ldq_be_p(buf + 24);
1528         expect = NBD_EXTENDED_REQUEST_MAGIC;
1529     } else {
1530         request->len = (uint32_t)ldl_be_p(buf + 24); /* widen 32 to 64 bits */
1531         expect = NBD_REQUEST_MAGIC;
1532     }
1533 
1534     trace_nbd_receive_request(magic, request->flags, request->type,
1535                               request->from, request->len);
1536 
1537     if (magic != expect) {
1538         error_setg(errp, "invalid magic (got 0x%" PRIx32 ", expected 0x%"
1539                    PRIx32 ")", magic, expect);
1540         return -EINVAL;
1541     }
1542     return 0;
1543 }
1544 
1545 #define MAX_NBD_REQUESTS 16
1546 
1547 /* Runs in export AioContext and main loop thread */
1548 void nbd_client_get(NBDClient *client)
1549 {
1550     qatomic_inc(&client->refcount);
1551 }
1552 
1553 void nbd_client_put(NBDClient *client)
1554 {
1555     assert(qemu_in_main_thread());
1556 
1557     if (qatomic_fetch_dec(&client->refcount) == 1) {
1558         /* The last reference should be dropped by client->close,
1559          * which is called by client_close.
1560          */
1561         assert(client->closing);
1562 
1563         object_unref(OBJECT(client->sioc));
1564         object_unref(OBJECT(client->ioc));
1565         if (client->tlscreds) {
1566             object_unref(OBJECT(client->tlscreds));
1567         }
1568         g_free(client->tlsauthz);
1569         if (client->exp) {
1570             QTAILQ_REMOVE(&client->exp->clients, client, next);
1571             blk_exp_unref(&client->exp->common);
1572         }
1573         g_free(client->contexts.bitmaps);
1574         qemu_mutex_destroy(&client->lock);
1575         g_free(client);
1576     }
1577 }
1578 
1579 /*
1580  * Tries to release the reference to @client, but only if other references
1581  * remain. This is an optimization for the common case where we want to avoid
1582  * the expense of scheduling nbd_client_put() in the main loop thread.
1583  *
1584  * Returns true upon success or false if the reference was not released because
1585  * it is the last reference.
1586  */
1587 static bool nbd_client_put_nonzero(NBDClient *client)
1588 {
1589     int old = qatomic_read(&client->refcount);
1590     int expected;
1591 
1592     do {
1593         if (old == 1) {
1594             return false;
1595         }
1596 
1597         expected = old;
1598         old = qatomic_cmpxchg(&client->refcount, expected, expected - 1);
1599     } while (old != expected);
1600 
1601     return true;
1602 }
1603 
1604 static void client_close(NBDClient *client, bool negotiated)
1605 {
1606     assert(qemu_in_main_thread());
1607 
1608     WITH_QEMU_LOCK_GUARD(&client->lock) {
1609         if (client->closing) {
1610             return;
1611         }
1612 
1613         client->closing = true;
1614     }
1615 
1616     /* Force requests to finish.  They will drop their own references,
1617      * then we'll close the socket and free the NBDClient.
1618      */
1619     qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
1620                          NULL);
1621 
1622     /* Also tell the client, so that they release their reference.  */
1623     if (client->close_fn) {
1624         client->close_fn(client, negotiated);
1625     }
1626 }
1627 
1628 /* Runs in export AioContext with client->lock held */
1629 static NBDRequestData *nbd_request_get(NBDClient *client)
1630 {
1631     NBDRequestData *req;
1632 
1633     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
1634     client->nb_requests++;
1635 
1636     req = g_new0(NBDRequestData, 1);
1637     req->client = client;
1638     return req;
1639 }
1640 
1641 /* Runs in export AioContext with client->lock held */
1642 static void nbd_request_put(NBDRequestData *req)
1643 {
1644     NBDClient *client = req->client;
1645 
1646     if (req->data) {
1647         qemu_vfree(req->data);
1648     }
1649     g_free(req);
1650 
1651     client->nb_requests--;
1652 
1653     if (client->quiescing && client->nb_requests == 0) {
1654         aio_wait_kick();
1655     }
1656 
1657     nbd_client_receive_next_request(client);
1658 }
1659 
1660 static void blk_aio_attached(AioContext *ctx, void *opaque)
1661 {
1662     NBDExport *exp = opaque;
1663     NBDClient *client;
1664 
1665     assert(qemu_in_main_thread());
1666 
1667     trace_nbd_blk_aio_attached(exp->name, ctx);
1668 
1669     exp->common.ctx = ctx;
1670 
1671     QTAILQ_FOREACH(client, &exp->clients, next) {
1672         WITH_QEMU_LOCK_GUARD(&client->lock) {
1673             assert(client->nb_requests == 0);
1674             assert(client->recv_coroutine == NULL);
1675             assert(client->send_coroutine == NULL);
1676         }
1677     }
1678 }
1679 
1680 static void blk_aio_detach(void *opaque)
1681 {
1682     NBDExport *exp = opaque;
1683 
1684     assert(qemu_in_main_thread());
1685 
1686     trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
1687 
1688     exp->common.ctx = NULL;
1689 }
1690 
1691 static void nbd_drained_begin(void *opaque)
1692 {
1693     NBDExport *exp = opaque;
1694     NBDClient *client;
1695 
1696     assert(qemu_in_main_thread());
1697 
1698     QTAILQ_FOREACH(client, &exp->clients, next) {
1699         WITH_QEMU_LOCK_GUARD(&client->lock) {
1700             client->quiescing = true;
1701         }
1702     }
1703 }
1704 
1705 static void nbd_drained_end(void *opaque)
1706 {
1707     NBDExport *exp = opaque;
1708     NBDClient *client;
1709 
1710     assert(qemu_in_main_thread());
1711 
1712     QTAILQ_FOREACH(client, &exp->clients, next) {
1713         WITH_QEMU_LOCK_GUARD(&client->lock) {
1714             client->quiescing = false;
1715             nbd_client_receive_next_request(client);
1716         }
1717     }
1718 }
1719 
1720 /* Runs in export AioContext */
1721 static void nbd_wake_read_bh(void *opaque)
1722 {
1723     NBDClient *client = opaque;
1724     qio_channel_wake_read(client->ioc);
1725 }
1726 
1727 static bool nbd_drained_poll(void *opaque)
1728 {
1729     NBDExport *exp = opaque;
1730     NBDClient *client;
1731 
1732     assert(qemu_in_main_thread());
1733 
1734     QTAILQ_FOREACH(client, &exp->clients, next) {
1735         WITH_QEMU_LOCK_GUARD(&client->lock) {
1736             if (client->nb_requests != 0) {
1737                 /*
1738                  * If there's a coroutine waiting for a request on nbd_read_eof()
1739                  * enter it here so we don't depend on the client to wake it up.
1740                  *
1741                  * Schedule a BH in the export AioContext to avoid missing the
1742                  * wake up due to the race between qio_channel_wake_read() and
1743                  * qio_channel_yield().
1744                  */
1745                 if (client->recv_coroutine != NULL && client->read_yielding) {
1746                     aio_bh_schedule_oneshot(nbd_export_aio_context(client->exp),
1747                                             nbd_wake_read_bh, client);
1748                 }
1749 
1750                 return true;
1751             }
1752         }
1753     }
1754 
1755     return false;
1756 }
1757 
1758 static void nbd_eject_notifier(Notifier *n, void *data)
1759 {
1760     NBDExport *exp = container_of(n, NBDExport, eject_notifier);
1761 
1762     assert(qemu_in_main_thread());
1763 
1764     blk_exp_request_shutdown(&exp->common);
1765 }
1766 
1767 void nbd_export_set_on_eject_blk(BlockExport *exp, BlockBackend *blk)
1768 {
1769     NBDExport *nbd_exp = container_of(exp, NBDExport, common);
1770     assert(exp->drv == &blk_exp_nbd);
1771     assert(nbd_exp->eject_notifier_blk == NULL);
1772 
1773     blk_ref(blk);
1774     nbd_exp->eject_notifier_blk = blk;
1775     nbd_exp->eject_notifier.notify = nbd_eject_notifier;
1776     blk_add_remove_bs_notifier(blk, &nbd_exp->eject_notifier);
1777 }
1778 
1779 static const BlockDevOps nbd_block_ops = {
1780     .drained_begin = nbd_drained_begin,
1781     .drained_end = nbd_drained_end,
1782     .drained_poll = nbd_drained_poll,
1783 };
1784 
1785 static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
1786                              Error **errp)
1787 {
1788     NBDExport *exp = container_of(blk_exp, NBDExport, common);
1789     BlockExportOptionsNbd *arg = &exp_args->u.nbd;
1790     const char *name = arg->name ?: exp_args->node_name;
1791     BlockBackend *blk = blk_exp->blk;
1792     int64_t size;
1793     uint64_t perm, shared_perm;
1794     bool readonly = !exp_args->writable;
1795     BlockDirtyBitmapOrStrList *bitmaps;
1796     size_t i;
1797     int ret;
1798 
1799     GLOBAL_STATE_CODE();
1800     assert(exp_args->type == BLOCK_EXPORT_TYPE_NBD);
1801 
1802     if (!nbd_server_is_running()) {
1803         error_setg(errp, "NBD server not running");
1804         return -EINVAL;
1805     }
1806 
1807     if (strlen(name) > NBD_MAX_STRING_SIZE) {
1808         error_setg(errp, "export name '%s' too long", name);
1809         return -EINVAL;
1810     }
1811 
1812     if (arg->description && strlen(arg->description) > NBD_MAX_STRING_SIZE) {
1813         error_setg(errp, "description '%s' too long", arg->description);
1814         return -EINVAL;
1815     }
1816 
1817     if (nbd_export_find(name)) {
1818         error_setg(errp, "NBD server already has export named '%s'", name);
1819         return -EEXIST;
1820     }
1821 
1822     size = blk_getlength(blk);
1823     if (size < 0) {
1824         error_setg_errno(errp, -size,
1825                          "Failed to determine the NBD export's length");
1826         return size;
1827     }
1828 
1829     /* Don't allow resize while the NBD server is running, otherwise we don't
1830      * care what happens with the node. */
1831     blk_get_perm(blk, &perm, &shared_perm);
1832     ret = blk_set_perm(blk, perm, shared_perm & ~BLK_PERM_RESIZE, errp);
1833     if (ret < 0) {
1834         return ret;
1835     }
1836 
1837     QTAILQ_INIT(&exp->clients);
1838     exp->name = g_strdup(name);
1839     exp->description = g_strdup(arg->description);
1840     exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH |
1841                      NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE);
1842 
1843     if (nbd_server_max_connections() != 1) {
1844         exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN;
1845     }
1846     if (readonly) {
1847         exp->nbdflags |= NBD_FLAG_READ_ONLY;
1848     } else {
1849         exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES |
1850                           NBD_FLAG_SEND_FAST_ZERO);
1851     }
1852     exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
1853 
1854     bdrv_graph_rdlock_main_loop();
1855 
1856     for (bitmaps = arg->bitmaps; bitmaps; bitmaps = bitmaps->next) {
1857         exp->nr_export_bitmaps++;
1858     }
1859     exp->export_bitmaps = g_new0(BdrvDirtyBitmap *, exp->nr_export_bitmaps);
1860     for (i = 0, bitmaps = arg->bitmaps; bitmaps;
1861          i++, bitmaps = bitmaps->next)
1862     {
1863         const char *bitmap;
1864         BlockDriverState *bs = blk_bs(blk);
1865         BdrvDirtyBitmap *bm = NULL;
1866 
1867         switch (bitmaps->value->type) {
1868         case QTYPE_QSTRING:
1869             bitmap = bitmaps->value->u.local;
1870             while (bs) {
1871                 bm = bdrv_find_dirty_bitmap(bs, bitmap);
1872                 if (bm != NULL) {
1873                     break;
1874                 }
1875 
1876                 bs = bdrv_filter_or_cow_bs(bs);
1877             }
1878 
1879             if (bm == NULL) {
1880                 ret = -ENOENT;
1881                 error_setg(errp, "Bitmap '%s' is not found",
1882                            bitmaps->value->u.local);
1883                 goto fail;
1884             }
1885 
1886             if (readonly && bdrv_is_writable(bs) &&
1887                 bdrv_dirty_bitmap_enabled(bm)) {
1888                 ret = -EINVAL;
1889                 error_setg(errp, "Enabled bitmap '%s' incompatible with "
1890                            "readonly export", bitmap);
1891                 goto fail;
1892             }
1893             break;
1894         case QTYPE_QDICT:
1895             bitmap = bitmaps->value->u.external.name;
1896             bm = block_dirty_bitmap_lookup(bitmaps->value->u.external.node,
1897                                            bitmap, NULL, errp);
1898             if (!bm) {
1899                 ret = -ENOENT;
1900                 goto fail;
1901             }
1902             break;
1903         default:
1904             abort();
1905         }
1906 
1907         assert(bm);
1908 
1909         if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
1910             ret = -EINVAL;
1911             goto fail;
1912         }
1913 
1914         exp->export_bitmaps[i] = bm;
1915         assert(strlen(bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
1916     }
1917 
1918     /* Mark bitmaps busy in a separate loop, to simplify roll-back concerns. */
1919     for (i = 0; i < exp->nr_export_bitmaps; i++) {
1920         bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], true);
1921     }
1922 
1923     exp->allocation_depth = arg->allocation_depth;
1924 
1925     /*
1926      * We need to inhibit request queuing in the block layer to ensure we can
1927      * be properly quiesced when entering a drained section, as our coroutines
1928      * servicing pending requests might enter blk_pread().
1929      */
1930     blk_set_disable_request_queuing(blk, true);
1931 
1932     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
1933 
1934     blk_set_dev_ops(blk, &nbd_block_ops, exp);
1935 
1936     QTAILQ_INSERT_TAIL(&exports, exp, next);
1937 
1938     bdrv_graph_rdunlock_main_loop();
1939 
1940     return 0;
1941 
1942 fail:
1943     bdrv_graph_rdunlock_main_loop();
1944     g_free(exp->export_bitmaps);
1945     g_free(exp->name);
1946     g_free(exp->description);
1947     return ret;
1948 }
1949 
1950 NBDExport *nbd_export_find(const char *name)
1951 {
1952     NBDExport *exp;
1953     QTAILQ_FOREACH(exp, &exports, next) {
1954         if (strcmp(name, exp->name) == 0) {
1955             return exp;
1956         }
1957     }
1958 
1959     return NULL;
1960 }
1961 
1962 AioContext *
1963 nbd_export_aio_context(NBDExport *exp)
1964 {
1965     return exp->common.ctx;
1966 }
1967 
1968 static void nbd_export_request_shutdown(BlockExport *blk_exp)
1969 {
1970     NBDExport *exp = container_of(blk_exp, NBDExport, common);
1971     NBDClient *client, *next;
1972 
1973     blk_exp_ref(&exp->common);
1974     /*
1975      * TODO: Should we expand QMP NbdServerRemoveNode enum to allow a
1976      * close mode that stops advertising the export to new clients but
1977      * still permits existing clients to run to completion? Because of
1978      * that possibility, nbd_export_close() can be called more than
1979      * once on an export.
1980      */
1981     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
1982         client_close(client, true);
1983     }
1984     if (exp->name) {
1985         g_free(exp->name);
1986         exp->name = NULL;
1987         QTAILQ_REMOVE(&exports, exp, next);
1988     }
1989     blk_exp_unref(&exp->common);
1990 }
1991 
1992 static void nbd_export_delete(BlockExport *blk_exp)
1993 {
1994     size_t i;
1995     NBDExport *exp = container_of(blk_exp, NBDExport, common);
1996 
1997     assert(exp->name == NULL);
1998     assert(QTAILQ_EMPTY(&exp->clients));
1999 
2000     g_free(exp->description);
2001     exp->description = NULL;
2002 
2003     if (exp->eject_notifier_blk) {
2004         notifier_remove(&exp->eject_notifier);
2005         blk_unref(exp->eject_notifier_blk);
2006     }
2007     blk_remove_aio_context_notifier(exp->common.blk, blk_aio_attached,
2008                                     blk_aio_detach, exp);
2009     blk_set_disable_request_queuing(exp->common.blk, false);
2010 
2011     for (i = 0; i < exp->nr_export_bitmaps; i++) {
2012         bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], false);
2013     }
2014 }
2015 
2016 const BlockExportDriver blk_exp_nbd = {
2017     .type               = BLOCK_EXPORT_TYPE_NBD,
2018     .instance_size      = sizeof(NBDExport),
2019     .create             = nbd_export_create,
2020     .delete             = nbd_export_delete,
2021     .request_shutdown   = nbd_export_request_shutdown,
2022 };
2023 
2024 static int coroutine_fn nbd_co_send_iov(NBDClient *client, struct iovec *iov,
2025                                         unsigned niov, Error **errp)
2026 {
2027     int ret;
2028 
2029     g_assert(qemu_in_coroutine());
2030     qemu_co_mutex_lock(&client->send_lock);
2031     client->send_coroutine = qemu_coroutine_self();
2032 
2033     ret = qio_channel_writev_all(client->ioc, iov, niov, errp) < 0 ? -EIO : 0;
2034 
2035     client->send_coroutine = NULL;
2036     qemu_co_mutex_unlock(&client->send_lock);
2037 
2038     return ret;
2039 }
2040 
2041 static inline void set_be_simple_reply(NBDSimpleReply *reply, uint64_t error,
2042                                        uint64_t cookie)
2043 {
2044     stl_be_p(&reply->magic, NBD_SIMPLE_REPLY_MAGIC);
2045     stl_be_p(&reply->error, error);
2046     stq_be_p(&reply->cookie, cookie);
2047 }
2048 
2049 static int coroutine_fn nbd_co_send_simple_reply(NBDClient *client,
2050                                                  NBDRequest *request,
2051                                                  uint32_t error,
2052                                                  void *data,
2053                                                  uint64_t len,
2054                                                  Error **errp)
2055 {
2056     NBDSimpleReply reply;
2057     int nbd_err = system_errno_to_nbd_errno(error);
2058     struct iovec iov[] = {
2059         {.iov_base = &reply, .iov_len = sizeof(reply)},
2060         {.iov_base = data, .iov_len = len}
2061     };
2062 
2063     assert(!len || !nbd_err);
2064     assert(len <= NBD_MAX_BUFFER_SIZE);
2065     assert(client->mode < NBD_MODE_STRUCTURED ||
2066            (client->mode == NBD_MODE_STRUCTURED &&
2067             request->type != NBD_CMD_READ));
2068     trace_nbd_co_send_simple_reply(request->cookie, nbd_err,
2069                                    nbd_err_lookup(nbd_err), len);
2070     set_be_simple_reply(&reply, nbd_err, request->cookie);
2071 
2072     return nbd_co_send_iov(client, iov, 2, errp);
2073 }
2074 
2075 /*
2076  * Prepare the header of a reply chunk for network transmission.
2077  *
2078  * On input, @iov is partially initialized: iov[0].iov_base must point
2079  * to an uninitialized NBDReply, while the remaining @niov elements
2080  * (if any) must be ready for transmission.  This function then
2081  * populates iov[0] for transmission.
2082  */
2083 static inline void set_be_chunk(NBDClient *client, struct iovec *iov,
2084                                 size_t niov, uint16_t flags, uint16_t type,
2085                                 NBDRequest *request)
2086 {
2087     size_t i, length = 0;
2088 
2089     for (i = 1; i < niov; i++) {
2090         length += iov[i].iov_len;
2091     }
2092     assert(length <= NBD_MAX_BUFFER_SIZE + sizeof(NBDStructuredReadData));
2093 
2094     if (client->mode >= NBD_MODE_EXTENDED) {
2095         NBDExtendedReplyChunk *chunk = iov->iov_base;
2096 
2097         iov[0].iov_len = sizeof(*chunk);
2098         stl_be_p(&chunk->magic, NBD_EXTENDED_REPLY_MAGIC);
2099         stw_be_p(&chunk->flags, flags);
2100         stw_be_p(&chunk->type, type);
2101         stq_be_p(&chunk->cookie, request->cookie);
2102         stq_be_p(&chunk->offset, request->from);
2103         stq_be_p(&chunk->length, length);
2104     } else {
2105         NBDStructuredReplyChunk *chunk = iov->iov_base;
2106 
2107         iov[0].iov_len = sizeof(*chunk);
2108         stl_be_p(&chunk->magic, NBD_STRUCTURED_REPLY_MAGIC);
2109         stw_be_p(&chunk->flags, flags);
2110         stw_be_p(&chunk->type, type);
2111         stq_be_p(&chunk->cookie, request->cookie);
2112         stl_be_p(&chunk->length, length);
2113     }
2114 }
2115 
2116 static int coroutine_fn nbd_co_send_chunk_done(NBDClient *client,
2117                                                NBDRequest *request,
2118                                                Error **errp)
2119 {
2120     NBDReply hdr;
2121     struct iovec iov[] = {
2122         {.iov_base = &hdr},
2123     };
2124 
2125     trace_nbd_co_send_chunk_done(request->cookie);
2126     set_be_chunk(client, iov, 1, NBD_REPLY_FLAG_DONE,
2127                  NBD_REPLY_TYPE_NONE, request);
2128     return nbd_co_send_iov(client, iov, 1, errp);
2129 }
2130 
2131 static int coroutine_fn nbd_co_send_chunk_read(NBDClient *client,
2132                                                NBDRequest *request,
2133                                                uint64_t offset,
2134                                                void *data,
2135                                                uint64_t size,
2136                                                bool final,
2137                                                Error **errp)
2138 {
2139     NBDReply hdr;
2140     NBDStructuredReadData chunk;
2141     struct iovec iov[] = {
2142         {.iov_base = &hdr},
2143         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
2144         {.iov_base = data, .iov_len = size}
2145     };
2146 
2147     assert(size && size <= NBD_MAX_BUFFER_SIZE);
2148     trace_nbd_co_send_chunk_read(request->cookie, offset, data, size);
2149     set_be_chunk(client, iov, 3, final ? NBD_REPLY_FLAG_DONE : 0,
2150                  NBD_REPLY_TYPE_OFFSET_DATA, request);
2151     stq_be_p(&chunk.offset, offset);
2152 
2153     return nbd_co_send_iov(client, iov, 3, errp);
2154 }
2155 
2156 static int coroutine_fn nbd_co_send_chunk_error(NBDClient *client,
2157                                                 NBDRequest *request,
2158                                                 uint32_t error,
2159                                                 const char *msg,
2160                                                 Error **errp)
2161 {
2162     NBDReply hdr;
2163     NBDStructuredError chunk;
2164     int nbd_err = system_errno_to_nbd_errno(error);
2165     struct iovec iov[] = {
2166         {.iov_base = &hdr},
2167         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
2168         {.iov_base = (char *)msg, .iov_len = msg ? strlen(msg) : 0},
2169     };
2170 
2171     assert(nbd_err);
2172     trace_nbd_co_send_chunk_error(request->cookie, nbd_err,
2173                                   nbd_err_lookup(nbd_err), msg ? msg : "");
2174     set_be_chunk(client, iov, 3, NBD_REPLY_FLAG_DONE,
2175                  NBD_REPLY_TYPE_ERROR, request);
2176     stl_be_p(&chunk.error, nbd_err);
2177     stw_be_p(&chunk.message_length, iov[2].iov_len);
2178 
2179     return nbd_co_send_iov(client, iov, 3, errp);
2180 }
2181 
2182 /* Do a sparse read and send the structured reply to the client.
2183  * Returns -errno if sending fails. blk_co_block_status_above() failure is
2184  * reported to the client, at which point this function succeeds.
2185  */
2186 static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
2187                                                 NBDRequest *request,
2188                                                 uint64_t offset,
2189                                                 uint8_t *data,
2190                                                 uint64_t size,
2191                                                 Error **errp)
2192 {
2193     int ret = 0;
2194     NBDExport *exp = client->exp;
2195     size_t progress = 0;
2196 
2197     assert(size <= NBD_MAX_BUFFER_SIZE);
2198     while (progress < size) {
2199         int64_t pnum;
2200         int status = blk_co_block_status_above(exp->common.blk, NULL,
2201                                                offset + progress,
2202                                                size - progress, &pnum, NULL,
2203                                                NULL);
2204         bool final;
2205 
2206         if (status < 0) {
2207             char *msg = g_strdup_printf("unable to check for holes: %s",
2208                                         strerror(-status));
2209 
2210             ret = nbd_co_send_chunk_error(client, request, -status, msg, errp);
2211             g_free(msg);
2212             return ret;
2213         }
2214         assert(pnum && pnum <= size - progress);
2215         final = progress + pnum == size;
2216         if (status & BDRV_BLOCK_ZERO) {
2217             NBDReply hdr;
2218             NBDStructuredReadHole chunk;
2219             struct iovec iov[] = {
2220                 {.iov_base = &hdr},
2221                 {.iov_base = &chunk, .iov_len = sizeof(chunk)},
2222             };
2223 
2224             trace_nbd_co_send_chunk_read_hole(request->cookie,
2225                                               offset + progress, pnum);
2226             set_be_chunk(client, iov, 2,
2227                          final ? NBD_REPLY_FLAG_DONE : 0,
2228                          NBD_REPLY_TYPE_OFFSET_HOLE, request);
2229             stq_be_p(&chunk.offset, offset + progress);
2230             stl_be_p(&chunk.length, pnum);
2231             ret = nbd_co_send_iov(client, iov, 2, errp);
2232         } else {
2233             ret = blk_co_pread(exp->common.blk, offset + progress, pnum,
2234                                data + progress, 0);
2235             if (ret < 0) {
2236                 error_setg_errno(errp, -ret, "reading from file failed");
2237                 break;
2238             }
2239             ret = nbd_co_send_chunk_read(client, request, offset + progress,
2240                                          data + progress, pnum, final, errp);
2241         }
2242 
2243         if (ret < 0) {
2244             break;
2245         }
2246         progress += pnum;
2247     }
2248     return ret;
2249 }
2250 
2251 typedef struct NBDExtentArray {
2252     NBDExtent64 *extents;
2253     unsigned int nb_alloc;
2254     unsigned int count;
2255     uint64_t total_length;
2256     bool extended;
2257     bool can_add;
2258     bool converted_to_be;
2259 } NBDExtentArray;
2260 
2261 static NBDExtentArray *nbd_extent_array_new(unsigned int nb_alloc,
2262                                             NBDMode mode)
2263 {
2264     NBDExtentArray *ea = g_new0(NBDExtentArray, 1);
2265 
2266     assert(mode >= NBD_MODE_STRUCTURED);
2267     ea->nb_alloc = nb_alloc;
2268     ea->extents = g_new(NBDExtent64, nb_alloc);
2269     ea->extended = mode >= NBD_MODE_EXTENDED;
2270     ea->can_add = true;
2271 
2272     return ea;
2273 }
2274 
2275 static void nbd_extent_array_free(NBDExtentArray *ea)
2276 {
2277     g_free(ea->extents);
2278     g_free(ea);
2279 }
2280 G_DEFINE_AUTOPTR_CLEANUP_FUNC(NBDExtentArray, nbd_extent_array_free)
2281 
2282 /* Further modifications of the array after conversion are abandoned */
2283 static void nbd_extent_array_convert_to_be(NBDExtentArray *ea)
2284 {
2285     int i;
2286 
2287     assert(!ea->converted_to_be);
2288     assert(ea->extended);
2289     ea->can_add = false;
2290     ea->converted_to_be = true;
2291 
2292     for (i = 0; i < ea->count; i++) {
2293         ea->extents[i].length = cpu_to_be64(ea->extents[i].length);
2294         ea->extents[i].flags = cpu_to_be64(ea->extents[i].flags);
2295     }
2296 }
2297 
2298 /* Further modifications of the array after conversion are abandoned */
2299 static NBDExtent32 *nbd_extent_array_convert_to_narrow(NBDExtentArray *ea)
2300 {
2301     int i;
2302     NBDExtent32 *extents = g_new(NBDExtent32, ea->count);
2303 
2304     assert(!ea->converted_to_be);
2305     assert(!ea->extended);
2306     ea->can_add = false;
2307     ea->converted_to_be = true;
2308 
2309     for (i = 0; i < ea->count; i++) {
2310         assert((ea->extents[i].length | ea->extents[i].flags) <= UINT32_MAX);
2311         extents[i].length = cpu_to_be32(ea->extents[i].length);
2312         extents[i].flags = cpu_to_be32(ea->extents[i].flags);
2313     }
2314 
2315     return extents;
2316 }
2317 
2318 /*
2319  * Add extent to NBDExtentArray. If extent can't be added (no available space),
2320  * return -1.
2321  * For safety, when returning -1 for the first time, .can_add is set to false,
2322  * and further calls to nbd_extent_array_add() will crash.
2323  * (this avoids the situation where a caller ignores failure to add one extent,
2324  * where adding another extent that would squash into the last array entry
2325  * would result in an incorrect range reported to the client)
2326  */
2327 static int nbd_extent_array_add(NBDExtentArray *ea,
2328                                 uint64_t length, uint32_t flags)
2329 {
2330     assert(ea->can_add);
2331 
2332     if (!length) {
2333         return 0;
2334     }
2335     if (!ea->extended) {
2336         assert(length <= UINT32_MAX);
2337     }
2338 
2339     /* Extend previous extent if flags are the same */
2340     if (ea->count > 0 && flags == ea->extents[ea->count - 1].flags) {
2341         uint64_t sum = length + ea->extents[ea->count - 1].length;
2342 
2343         /*
2344          * sum cannot overflow: the block layer bounds image size at
2345          * 2^63, and ea->extents[].length comes from the block layer.
2346          */
2347         assert(sum >= length);
2348         if (sum <= UINT32_MAX || ea->extended) {
2349             ea->extents[ea->count - 1].length = sum;
2350             ea->total_length += length;
2351             return 0;
2352         }
2353     }
2354 
2355     if (ea->count >= ea->nb_alloc) {
2356         ea->can_add = false;
2357         return -1;
2358     }
2359 
2360     ea->total_length += length;
2361     ea->extents[ea->count] = (NBDExtent64) {.length = length, .flags = flags};
2362     ea->count++;
2363 
2364     return 0;
2365 }
2366 
2367 static int coroutine_fn blockstatus_to_extents(BlockBackend *blk,
2368                                                uint64_t offset, uint64_t bytes,
2369                                                NBDExtentArray *ea)
2370 {
2371     while (bytes) {
2372         uint32_t flags;
2373         int64_t num;
2374         int ret = blk_co_block_status_above(blk, NULL, offset, bytes, &num,
2375                                             NULL, NULL);
2376 
2377         if (ret < 0) {
2378             return ret;
2379         }
2380 
2381         flags = (ret & BDRV_BLOCK_DATA ? 0 : NBD_STATE_HOLE) |
2382                 (ret & BDRV_BLOCK_ZERO ? NBD_STATE_ZERO : 0);
2383 
2384         if (nbd_extent_array_add(ea, num, flags) < 0) {
2385             return 0;
2386         }
2387 
2388         offset += num;
2389         bytes -= num;
2390     }
2391 
2392     return 0;
2393 }
2394 
2395 static int coroutine_fn blockalloc_to_extents(BlockBackend *blk,
2396                                               uint64_t offset, uint64_t bytes,
2397                                               NBDExtentArray *ea)
2398 {
2399     while (bytes) {
2400         int64_t num;
2401         int ret = blk_co_is_allocated_above(blk, NULL, false, offset, bytes,
2402                                             &num);
2403 
2404         if (ret < 0) {
2405             return ret;
2406         }
2407 
2408         if (nbd_extent_array_add(ea, num, ret) < 0) {
2409             return 0;
2410         }
2411 
2412         offset += num;
2413         bytes -= num;
2414     }
2415 
2416     return 0;
2417 }
2418 
2419 /*
2420  * nbd_co_send_extents
2421  *
2422  * @ea is converted to BE by the function
2423  * @last controls whether NBD_REPLY_FLAG_DONE is sent.
2424  */
2425 static int coroutine_fn
2426 nbd_co_send_extents(NBDClient *client, NBDRequest *request, NBDExtentArray *ea,
2427                     bool last, uint32_t context_id, Error **errp)
2428 {
2429     NBDReply hdr;
2430     NBDStructuredMeta meta;
2431     NBDExtendedMeta meta_ext;
2432     g_autofree NBDExtent32 *extents = NULL;
2433     uint16_t type;
2434     struct iovec iov[] = { {.iov_base = &hdr}, {0}, {0} };
2435 
2436     if (client->mode >= NBD_MODE_EXTENDED) {
2437         type = NBD_REPLY_TYPE_BLOCK_STATUS_EXT;
2438 
2439         iov[1].iov_base = &meta_ext;
2440         iov[1].iov_len = sizeof(meta_ext);
2441         stl_be_p(&meta_ext.context_id, context_id);
2442         stl_be_p(&meta_ext.count, ea->count);
2443 
2444         nbd_extent_array_convert_to_be(ea);
2445         iov[2].iov_base = ea->extents;
2446         iov[2].iov_len = ea->count * sizeof(ea->extents[0]);
2447     } else {
2448         type = NBD_REPLY_TYPE_BLOCK_STATUS;
2449 
2450         iov[1].iov_base = &meta;
2451         iov[1].iov_len = sizeof(meta);
2452         stl_be_p(&meta.context_id, context_id);
2453 
2454         extents = nbd_extent_array_convert_to_narrow(ea);
2455         iov[2].iov_base = extents;
2456         iov[2].iov_len = ea->count * sizeof(extents[0]);
2457     }
2458 
2459     trace_nbd_co_send_extents(request->cookie, ea->count, context_id,
2460                               ea->total_length, last);
2461     set_be_chunk(client, iov, 3, last ? NBD_REPLY_FLAG_DONE : 0, type,
2462                  request);
2463 
2464     return nbd_co_send_iov(client, iov, 3, errp);
2465 }
2466 
2467 /* Get block status from the exported device and send it to the client */
2468 static int
2469 coroutine_fn nbd_co_send_block_status(NBDClient *client, NBDRequest *request,
2470                                       BlockBackend *blk, uint64_t offset,
2471                                       uint64_t length, bool dont_fragment,
2472                                       bool last, uint32_t context_id,
2473                                       Error **errp)
2474 {
2475     int ret;
2476     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2477     g_autoptr(NBDExtentArray) ea =
2478         nbd_extent_array_new(nb_extents, client->mode);
2479 
2480     if (context_id == NBD_META_ID_BASE_ALLOCATION) {
2481         ret = blockstatus_to_extents(blk, offset, length, ea);
2482     } else {
2483         ret = blockalloc_to_extents(blk, offset, length, ea);
2484     }
2485     if (ret < 0) {
2486         return nbd_co_send_chunk_error(client, request, -ret,
2487                                        "can't get block status", errp);
2488     }
2489 
2490     return nbd_co_send_extents(client, request, ea, last, context_id, errp);
2491 }
2492 
2493 /* Populate @ea from a dirty bitmap. */
2494 static void bitmap_to_extents(BdrvDirtyBitmap *bitmap,
2495                               uint64_t offset, uint64_t length,
2496                               NBDExtentArray *es)
2497 {
2498     int64_t start, dirty_start, dirty_count;
2499     int64_t end = offset + length;
2500     bool full = false;
2501     int64_t bound = es->extended ? INT64_MAX : INT32_MAX;
2502 
2503     bdrv_dirty_bitmap_lock(bitmap);
2504 
2505     for (start = offset;
2506          bdrv_dirty_bitmap_next_dirty_area(bitmap, start, end, bound,
2507                                            &dirty_start, &dirty_count);
2508          start = dirty_start + dirty_count)
2509     {
2510         if ((nbd_extent_array_add(es, dirty_start - start, 0) < 0) ||
2511             (nbd_extent_array_add(es, dirty_count, NBD_STATE_DIRTY) < 0))
2512         {
2513             full = true;
2514             break;
2515         }
2516     }
2517 
2518     if (!full) {
2519         /* last non dirty extent, nothing to do if array is now full */
2520         (void) nbd_extent_array_add(es, end - start, 0);
2521     }
2522 
2523     bdrv_dirty_bitmap_unlock(bitmap);
2524 }
2525 
2526 static int coroutine_fn nbd_co_send_bitmap(NBDClient *client,
2527                                            NBDRequest *request,
2528                                            BdrvDirtyBitmap *bitmap,
2529                                            uint64_t offset,
2530                                            uint64_t length, bool dont_fragment,
2531                                            bool last, uint32_t context_id,
2532                                            Error **errp)
2533 {
2534     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2535     g_autoptr(NBDExtentArray) ea =
2536         nbd_extent_array_new(nb_extents, client->mode);
2537 
2538     bitmap_to_extents(bitmap, offset, length, ea);
2539 
2540     return nbd_co_send_extents(client, request, ea, last, context_id, errp);
2541 }
2542 
2543 /*
2544  * nbd_co_block_status_payload_read
2545  * Called when a client wants a subset of negotiated contexts via a
2546  * BLOCK_STATUS payload.  Check the payload for valid length and
2547  * contents.  On success, return 0 with request updated to effective
2548  * length.  If request was invalid but all payload consumed, return 0
2549  * with request->len and request->contexts->count set to 0 (which will
2550  * trigger an appropriate NBD_EINVAL response later on).  Return
2551  * negative errno if the payload was not fully consumed.
2552  */
2553 static int
2554 nbd_co_block_status_payload_read(NBDClient *client, NBDRequest *request,
2555                                  Error **errp)
2556 {
2557     uint64_t payload_len = request->len;
2558     g_autofree char *buf = NULL;
2559     size_t count, i, nr_bitmaps;
2560     uint32_t id;
2561 
2562     if (payload_len > NBD_MAX_BUFFER_SIZE) {
2563         error_setg(errp, "len (%" PRIu64 ") is larger than max len (%u)",
2564                    request->len, NBD_MAX_BUFFER_SIZE);
2565         return -EINVAL;
2566     }
2567 
2568     assert(client->contexts.exp == client->exp);
2569     nr_bitmaps = client->exp->nr_export_bitmaps;
2570     request->contexts = g_new0(NBDMetaContexts, 1);
2571     request->contexts->exp = client->exp;
2572 
2573     if (payload_len % sizeof(uint32_t) ||
2574         payload_len < sizeof(NBDBlockStatusPayload) ||
2575         payload_len > (sizeof(NBDBlockStatusPayload) +
2576                        sizeof(id) * client->contexts.count)) {
2577         goto skip;
2578     }
2579 
2580     buf = g_malloc(payload_len);
2581     if (nbd_read(client->ioc, buf, payload_len,
2582                  "CMD_BLOCK_STATUS data", errp) < 0) {
2583         return -EIO;
2584     }
2585     trace_nbd_co_receive_request_payload_received(request->cookie,
2586                                                   payload_len);
2587     request->contexts->bitmaps = g_new0(bool, nr_bitmaps);
2588     count = (payload_len - sizeof(NBDBlockStatusPayload)) / sizeof(id);
2589     payload_len = 0;
2590 
2591     for (i = 0; i < count; i++) {
2592         id = ldl_be_p(buf + sizeof(NBDBlockStatusPayload) + sizeof(id) * i);
2593         if (id == NBD_META_ID_BASE_ALLOCATION) {
2594             if (!client->contexts.base_allocation ||
2595                 request->contexts->base_allocation) {
2596                 goto skip;
2597             }
2598             request->contexts->base_allocation = true;
2599         } else if (id == NBD_META_ID_ALLOCATION_DEPTH) {
2600             if (!client->contexts.allocation_depth ||
2601                 request->contexts->allocation_depth) {
2602                 goto skip;
2603             }
2604             request->contexts->allocation_depth = true;
2605         } else {
2606             unsigned idx = id - NBD_META_ID_DIRTY_BITMAP;
2607 
2608             if (idx >= nr_bitmaps || !client->contexts.bitmaps[idx] ||
2609                 request->contexts->bitmaps[idx]) {
2610                 goto skip;
2611             }
2612             request->contexts->bitmaps[idx] = true;
2613         }
2614     }
2615 
2616     request->len = ldq_be_p(buf);
2617     request->contexts->count = count;
2618     return 0;
2619 
2620  skip:
2621     trace_nbd_co_receive_block_status_payload_compliance(request->from,
2622                                                          request->len);
2623     request->len = request->contexts->count = 0;
2624     return nbd_drop(client->ioc, payload_len, errp);
2625 }
2626 
2627 /* nbd_co_receive_request
2628  * Collect a client request. Return 0 if request looks valid, -EIO to drop
2629  * connection right away, -EAGAIN to indicate we were interrupted and the
2630  * channel should be quiesced, and any other negative value to report an error
2631  * to the client (although the caller may still need to disconnect after
2632  * reporting the error).
2633  */
2634 static int coroutine_fn nbd_co_receive_request(NBDRequestData *req,
2635                                                NBDRequest *request,
2636                                                Error **errp)
2637 {
2638     NBDClient *client = req->client;
2639     bool extended_with_payload;
2640     bool check_length = false;
2641     bool check_rofs = false;
2642     bool allocate_buffer = false;
2643     bool payload_okay = false;
2644     uint64_t payload_len = 0;
2645     int valid_flags = NBD_CMD_FLAG_FUA;
2646     int ret;
2647 
2648     g_assert(qemu_in_coroutine());
2649     ret = nbd_receive_request(client, request, errp);
2650     if (ret < 0) {
2651         return ret;
2652     }
2653 
2654     trace_nbd_co_receive_request_decode_type(request->cookie, request->type,
2655                                              nbd_cmd_lookup(request->type));
2656     extended_with_payload = client->mode >= NBD_MODE_EXTENDED &&
2657         request->flags & NBD_CMD_FLAG_PAYLOAD_LEN;
2658     if (extended_with_payload) {
2659         payload_len = request->len;
2660         check_length = true;
2661     }
2662 
2663     switch (request->type) {
2664     case NBD_CMD_DISC:
2665         /* Special case: we're going to disconnect without a reply,
2666          * whether or not flags, from, or len are bogus */
2667         req->complete = true;
2668         return -EIO;
2669 
2670     case NBD_CMD_READ:
2671         if (client->mode >= NBD_MODE_STRUCTURED) {
2672             valid_flags |= NBD_CMD_FLAG_DF;
2673         }
2674         check_length = true;
2675         allocate_buffer = true;
2676         break;
2677 
2678     case NBD_CMD_WRITE:
2679         if (client->mode >= NBD_MODE_EXTENDED) {
2680             if (!extended_with_payload) {
2681                 /* The client is noncompliant. Trace it, but proceed. */
2682                 trace_nbd_co_receive_ext_payload_compliance(request->from,
2683                                                             request->len);
2684             }
2685             valid_flags |= NBD_CMD_FLAG_PAYLOAD_LEN;
2686         }
2687         payload_okay = true;
2688         payload_len = request->len;
2689         check_length = true;
2690         allocate_buffer = true;
2691         check_rofs = true;
2692         break;
2693 
2694     case NBD_CMD_FLUSH:
2695         break;
2696 
2697     case NBD_CMD_TRIM:
2698         check_rofs = true;
2699         break;
2700 
2701     case NBD_CMD_CACHE:
2702         check_length = true;
2703         break;
2704 
2705     case NBD_CMD_WRITE_ZEROES:
2706         valid_flags |= NBD_CMD_FLAG_NO_HOLE | NBD_CMD_FLAG_FAST_ZERO;
2707         check_rofs = true;
2708         break;
2709 
2710     case NBD_CMD_BLOCK_STATUS:
2711         if (extended_with_payload) {
2712             ret = nbd_co_block_status_payload_read(client, request, errp);
2713             if (ret < 0) {
2714                 return ret;
2715             }
2716             /* payload now consumed */
2717             check_length = false;
2718             payload_len = 0;
2719             valid_flags |= NBD_CMD_FLAG_PAYLOAD_LEN;
2720         } else {
2721             request->contexts = &client->contexts;
2722         }
2723         valid_flags |= NBD_CMD_FLAG_REQ_ONE;
2724         break;
2725 
2726     default:
2727         /* Unrecognized, will fail later */
2728         ;
2729     }
2730 
2731     /* Payload and buffer handling. */
2732     if (!payload_len) {
2733         req->complete = true;
2734     }
2735     if (check_length && request->len > NBD_MAX_BUFFER_SIZE) {
2736         /* READ, WRITE, CACHE */
2737         error_setg(errp, "len (%" PRIu64 ") is larger than max len (%u)",
2738                    request->len, NBD_MAX_BUFFER_SIZE);
2739         return -EINVAL;
2740     }
2741     if (payload_len && !payload_okay) {
2742         /*
2743          * For now, we don't support payloads on other commands; but
2744          * we can keep the connection alive by ignoring the payload.
2745          * We will fail the command later with NBD_EINVAL for the use
2746          * of an unsupported flag (and not for access beyond bounds).
2747          */
2748         assert(request->type != NBD_CMD_WRITE);
2749         request->len = 0;
2750     }
2751     if (allocate_buffer) {
2752         /* READ, WRITE */
2753         req->data = blk_try_blockalign(client->exp->common.blk,
2754                                        request->len);
2755         if (req->data == NULL) {
2756             error_setg(errp, "No memory");
2757             return -ENOMEM;
2758         }
2759     }
2760     if (payload_len) {
2761         if (payload_okay) {
2762             /* WRITE */
2763             assert(req->data);
2764             ret = nbd_read(client->ioc, req->data, payload_len,
2765                            "CMD_WRITE data", errp);
2766         } else {
2767             ret = nbd_drop(client->ioc, payload_len, errp);
2768         }
2769         if (ret < 0) {
2770             return -EIO;
2771         }
2772         req->complete = true;
2773         trace_nbd_co_receive_request_payload_received(request->cookie,
2774                                                       payload_len);
2775     }
2776 
2777     /* Sanity checks. */
2778     if (client->exp->nbdflags & NBD_FLAG_READ_ONLY && check_rofs) {
2779         /* WRITE, TRIM, WRITE_ZEROES */
2780         error_setg(errp, "Export is read-only");
2781         return -EROFS;
2782     }
2783     if (request->from > client->exp->size ||
2784         request->len > client->exp->size - request->from) {
2785         error_setg(errp, "operation past EOF; From: %" PRIu64 ", Len: %" PRIu64
2786                    ", Size: %" PRIu64, request->from, request->len,
2787                    client->exp->size);
2788         return (request->type == NBD_CMD_WRITE ||
2789                 request->type == NBD_CMD_WRITE_ZEROES) ? -ENOSPC : -EINVAL;
2790     }
2791     if (client->check_align && !QEMU_IS_ALIGNED(request->from | request->len,
2792                                                 client->check_align)) {
2793         /*
2794          * The block layer gracefully handles unaligned requests, but
2795          * it's still worth tracing client non-compliance
2796          */
2797         trace_nbd_co_receive_align_compliance(nbd_cmd_lookup(request->type),
2798                                               request->from,
2799                                               request->len,
2800                                               client->check_align);
2801     }
2802     if (request->flags & ~valid_flags) {
2803         error_setg(errp, "unsupported flags for command %s (got 0x%x)",
2804                    nbd_cmd_lookup(request->type), request->flags);
2805         return -EINVAL;
2806     }
2807 
2808     return 0;
2809 }
2810 
2811 /* Send simple reply without a payload, or a structured error
2812  * @error_msg is ignored if @ret >= 0
2813  * Returns 0 if connection is still live, -errno on failure to talk to client
2814  */
2815 static coroutine_fn int nbd_send_generic_reply(NBDClient *client,
2816                                                NBDRequest *request,
2817                                                int ret,
2818                                                const char *error_msg,
2819                                                Error **errp)
2820 {
2821     if (client->mode >= NBD_MODE_STRUCTURED && ret < 0) {
2822         return nbd_co_send_chunk_error(client, request, -ret, error_msg, errp);
2823     } else if (client->mode >= NBD_MODE_EXTENDED) {
2824         return nbd_co_send_chunk_done(client, request, errp);
2825     } else {
2826         return nbd_co_send_simple_reply(client, request, ret < 0 ? -ret : 0,
2827                                         NULL, 0, errp);
2828     }
2829 }
2830 
2831 /* Handle NBD_CMD_READ request.
2832  * Return -errno if sending fails. Other errors are reported directly to the
2833  * client as an error reply. */
2834 static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
2835                                         uint8_t *data, Error **errp)
2836 {
2837     int ret;
2838     NBDExport *exp = client->exp;
2839 
2840     assert(request->type == NBD_CMD_READ);
2841     assert(request->len <= NBD_MAX_BUFFER_SIZE);
2842 
2843     /* XXX: NBD Protocol only documents use of FUA with WRITE */
2844     if (request->flags & NBD_CMD_FLAG_FUA) {
2845         ret = blk_co_flush(exp->common.blk);
2846         if (ret < 0) {
2847             return nbd_send_generic_reply(client, request, ret,
2848                                           "flush failed", errp);
2849         }
2850     }
2851 
2852     if (client->mode >= NBD_MODE_STRUCTURED &&
2853         !(request->flags & NBD_CMD_FLAG_DF) && request->len)
2854     {
2855         return nbd_co_send_sparse_read(client, request, request->from,
2856                                        data, request->len, errp);
2857     }
2858 
2859     ret = blk_co_pread(exp->common.blk, request->from, request->len, data, 0);
2860     if (ret < 0) {
2861         return nbd_send_generic_reply(client, request, ret,
2862                                       "reading from file failed", errp);
2863     }
2864 
2865     if (client->mode >= NBD_MODE_STRUCTURED) {
2866         if (request->len) {
2867             return nbd_co_send_chunk_read(client, request, request->from, data,
2868                                           request->len, true, errp);
2869         } else {
2870             return nbd_co_send_chunk_done(client, request, errp);
2871         }
2872     } else {
2873         return nbd_co_send_simple_reply(client, request, 0,
2874                                         data, request->len, errp);
2875     }
2876 }
2877 
2878 /*
2879  * nbd_do_cmd_cache
2880  *
2881  * Handle NBD_CMD_CACHE request.
2882  * Return -errno if sending fails. Other errors are reported directly to the
2883  * client as an error reply.
2884  */
2885 static coroutine_fn int nbd_do_cmd_cache(NBDClient *client, NBDRequest *request,
2886                                          Error **errp)
2887 {
2888     int ret;
2889     NBDExport *exp = client->exp;
2890 
2891     assert(request->type == NBD_CMD_CACHE);
2892     assert(request->len <= NBD_MAX_BUFFER_SIZE);
2893 
2894     ret = blk_co_preadv(exp->common.blk, request->from, request->len,
2895                         NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
2896 
2897     return nbd_send_generic_reply(client, request, ret,
2898                                   "caching data failed", errp);
2899 }
2900 
2901 /* Handle NBD request.
2902  * Return -errno if sending fails. Other errors are reported directly to the
2903  * client as an error reply. */
2904 static coroutine_fn int nbd_handle_request(NBDClient *client,
2905                                            NBDRequest *request,
2906                                            uint8_t *data, Error **errp)
2907 {
2908     int ret;
2909     int flags;
2910     NBDExport *exp = client->exp;
2911     char *msg;
2912     size_t i;
2913 
2914     switch (request->type) {
2915     case NBD_CMD_CACHE:
2916         return nbd_do_cmd_cache(client, request, errp);
2917 
2918     case NBD_CMD_READ:
2919         return nbd_do_cmd_read(client, request, data, errp);
2920 
2921     case NBD_CMD_WRITE:
2922         flags = 0;
2923         if (request->flags & NBD_CMD_FLAG_FUA) {
2924             flags |= BDRV_REQ_FUA;
2925         }
2926         assert(request->len <= NBD_MAX_BUFFER_SIZE);
2927         ret = blk_co_pwrite(exp->common.blk, request->from, request->len, data,
2928                             flags);
2929         return nbd_send_generic_reply(client, request, ret,
2930                                       "writing to file failed", errp);
2931 
2932     case NBD_CMD_WRITE_ZEROES:
2933         flags = 0;
2934         if (request->flags & NBD_CMD_FLAG_FUA) {
2935             flags |= BDRV_REQ_FUA;
2936         }
2937         if (!(request->flags & NBD_CMD_FLAG_NO_HOLE)) {
2938             flags |= BDRV_REQ_MAY_UNMAP;
2939         }
2940         if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
2941             flags |= BDRV_REQ_NO_FALLBACK;
2942         }
2943         ret = blk_co_pwrite_zeroes(exp->common.blk, request->from, request->len,
2944                                    flags);
2945         return nbd_send_generic_reply(client, request, ret,
2946                                       "writing to file failed", errp);
2947 
2948     case NBD_CMD_DISC:
2949         /* unreachable, thanks to special case in nbd_co_receive_request() */
2950         abort();
2951 
2952     case NBD_CMD_FLUSH:
2953         ret = blk_co_flush(exp->common.blk);
2954         return nbd_send_generic_reply(client, request, ret,
2955                                       "flush failed", errp);
2956 
2957     case NBD_CMD_TRIM:
2958         ret = blk_co_pdiscard(exp->common.blk, request->from, request->len);
2959         if (ret >= 0 && request->flags & NBD_CMD_FLAG_FUA) {
2960             ret = blk_co_flush(exp->common.blk);
2961         }
2962         return nbd_send_generic_reply(client, request, ret,
2963                                       "discard failed", errp);
2964 
2965     case NBD_CMD_BLOCK_STATUS:
2966         assert(request->contexts);
2967         assert(client->mode >= NBD_MODE_EXTENDED ||
2968                request->len <= UINT32_MAX);
2969         if (request->contexts->count) {
2970             bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE;
2971             int contexts_remaining = request->contexts->count;
2972 
2973             if (!request->len) {
2974                 return nbd_send_generic_reply(client, request, -EINVAL,
2975                                               "need non-zero length", errp);
2976             }
2977             if (request->contexts->base_allocation) {
2978                 ret = nbd_co_send_block_status(client, request,
2979                                                exp->common.blk,
2980                                                request->from,
2981                                                request->len, dont_fragment,
2982                                                !--contexts_remaining,
2983                                                NBD_META_ID_BASE_ALLOCATION,
2984                                                errp);
2985                 if (ret < 0) {
2986                     return ret;
2987                 }
2988             }
2989 
2990             if (request->contexts->allocation_depth) {
2991                 ret = nbd_co_send_block_status(client, request,
2992                                                exp->common.blk,
2993                                                request->from, request->len,
2994                                                dont_fragment,
2995                                                !--contexts_remaining,
2996                                                NBD_META_ID_ALLOCATION_DEPTH,
2997                                                errp);
2998                 if (ret < 0) {
2999                     return ret;
3000                 }
3001             }
3002 
3003             assert(request->contexts->exp == client->exp);
3004             for (i = 0; i < client->exp->nr_export_bitmaps; i++) {
3005                 if (!request->contexts->bitmaps[i]) {
3006                     continue;
3007                 }
3008                 ret = nbd_co_send_bitmap(client, request,
3009                                          client->exp->export_bitmaps[i],
3010                                          request->from, request->len,
3011                                          dont_fragment, !--contexts_remaining,
3012                                          NBD_META_ID_DIRTY_BITMAP + i, errp);
3013                 if (ret < 0) {
3014                     return ret;
3015                 }
3016             }
3017 
3018             assert(!contexts_remaining);
3019 
3020             return 0;
3021         } else if (client->contexts.count) {
3022             return nbd_send_generic_reply(client, request, -EINVAL,
3023                                           "CMD_BLOCK_STATUS payload not valid",
3024                                           errp);
3025         } else {
3026             return nbd_send_generic_reply(client, request, -EINVAL,
3027                                           "CMD_BLOCK_STATUS not negotiated",
3028                                           errp);
3029         }
3030 
3031     default:
3032         msg = g_strdup_printf("invalid request type (%" PRIu32 ") received",
3033                               request->type);
3034         ret = nbd_send_generic_reply(client, request, -EINVAL, msg,
3035                                      errp);
3036         g_free(msg);
3037         return ret;
3038     }
3039 }
3040 
3041 /* Owns a reference to the NBDClient passed as opaque.  */
3042 static coroutine_fn void nbd_trip(void *opaque)
3043 {
3044     NBDRequestData *req = opaque;
3045     NBDClient *client = req->client;
3046     NBDRequest request = { 0 };    /* GCC thinks it can be used uninitialized */
3047     int ret;
3048     Error *local_err = NULL;
3049 
3050     /*
3051      * Note that nbd_client_put() and client_close() must be called from the
3052      * main loop thread. Use aio_co_reschedule_self() to switch AioContext
3053      * before calling these functions.
3054      */
3055 
3056     trace_nbd_trip();
3057 
3058     qemu_mutex_lock(&client->lock);
3059 
3060     if (client->closing) {
3061         goto done;
3062     }
3063 
3064     if (client->quiescing) {
3065         /*
3066          * We're switching between AIO contexts. Don't attempt to receive a new
3067          * request and kick the main context which may be waiting for us.
3068          */
3069         client->recv_coroutine = NULL;
3070         aio_wait_kick();
3071         goto done;
3072     }
3073 
3074     /*
3075      * nbd_co_receive_request() returns -EAGAIN when nbd_drained_begin() has
3076      * set client->quiescing but by the time we get back nbd_drained_end() may
3077      * have already cleared client->quiescing. In that case we try again
3078      * because nothing else will spawn an nbd_trip() coroutine until we set
3079      * client->recv_coroutine = NULL further down.
3080      */
3081     do {
3082         assert(client->recv_coroutine == qemu_coroutine_self());
3083         qemu_mutex_unlock(&client->lock);
3084         ret = nbd_co_receive_request(req, &request, &local_err);
3085         qemu_mutex_lock(&client->lock);
3086     } while (ret == -EAGAIN && !client->quiescing);
3087 
3088     client->recv_coroutine = NULL;
3089 
3090     if (client->closing) {
3091         /*
3092          * The client may be closed when we are blocked in
3093          * nbd_co_receive_request()
3094          */
3095         goto done;
3096     }
3097 
3098     if (ret == -EAGAIN) {
3099         goto done;
3100     }
3101 
3102     nbd_client_receive_next_request(client);
3103 
3104     if (ret == -EIO) {
3105         goto disconnect;
3106     }
3107 
3108     qemu_mutex_unlock(&client->lock);
3109     qio_channel_set_cork(client->ioc, true);
3110 
3111     if (ret < 0) {
3112         /* It wasn't -EIO, so, according to nbd_co_receive_request()
3113          * semantics, we should return the error to the client. */
3114         Error *export_err = local_err;
3115 
3116         local_err = NULL;
3117         ret = nbd_send_generic_reply(client, &request, -EINVAL,
3118                                      error_get_pretty(export_err), &local_err);
3119         error_free(export_err);
3120     } else {
3121         ret = nbd_handle_request(client, &request, req->data, &local_err);
3122     }
3123     if (request.contexts && request.contexts != &client->contexts) {
3124         assert(request.type == NBD_CMD_BLOCK_STATUS);
3125         g_free(request.contexts->bitmaps);
3126         g_free(request.contexts);
3127     }
3128 
3129     qio_channel_set_cork(client->ioc, false);
3130     qemu_mutex_lock(&client->lock);
3131 
3132     if (ret < 0) {
3133         error_prepend(&local_err, "Failed to send reply: ");
3134         goto disconnect;
3135     }
3136 
3137     /*
3138      * We must disconnect after NBD_CMD_WRITE or BLOCK_STATUS with
3139      * payload if we did not read the payload.
3140      */
3141     if (!req->complete) {
3142         error_setg(&local_err, "Request handling failed in intermediate state");
3143         goto disconnect;
3144     }
3145 
3146 done:
3147     nbd_request_put(req);
3148 
3149     qemu_mutex_unlock(&client->lock);
3150 
3151     if (!nbd_client_put_nonzero(client)) {
3152         aio_co_reschedule_self(qemu_get_aio_context());
3153         nbd_client_put(client);
3154     }
3155     return;
3156 
3157 disconnect:
3158     if (local_err) {
3159         error_reportf_err(local_err, "Disconnect client, due to: ");
3160     }
3161 
3162     nbd_request_put(req);
3163     qemu_mutex_unlock(&client->lock);
3164 
3165     aio_co_reschedule_self(qemu_get_aio_context());
3166     client_close(client, true);
3167     nbd_client_put(client);
3168 }
3169 
3170 /*
3171  * Runs in export AioContext and main loop thread. Caller must hold
3172  * client->lock.
3173  */
3174 static void nbd_client_receive_next_request(NBDClient *client)
3175 {
3176     NBDRequestData *req;
3177 
3178     if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
3179         !client->quiescing) {
3180         nbd_client_get(client);
3181         req = nbd_request_get(client);
3182         client->recv_coroutine = qemu_coroutine_create(nbd_trip, req);
3183         aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);
3184     }
3185 }
3186 
3187 static coroutine_fn void nbd_co_client_start(void *opaque)
3188 {
3189     NBDClient *client = opaque;
3190     Error *local_err = NULL;
3191 
3192     qemu_co_mutex_init(&client->send_lock);
3193 
3194     if (nbd_negotiate(client, &local_err)) {
3195         if (local_err) {
3196             error_report_err(local_err);
3197         }
3198         client_close(client, false);
3199         return;
3200     }
3201 
3202     WITH_QEMU_LOCK_GUARD(&client->lock) {
3203         nbd_client_receive_next_request(client);
3204     }
3205 }
3206 
3207 /*
3208  * Create a new client listener using the given channel @sioc.
3209  * Begin servicing it in a coroutine.  When the connection closes, call
3210  * @close_fn with an indication of whether the client completed negotiation.
3211  */
3212 void nbd_client_new(QIOChannelSocket *sioc,
3213                     QCryptoTLSCreds *tlscreds,
3214                     const char *tlsauthz,
3215                     void (*close_fn)(NBDClient *, bool))
3216 {
3217     NBDClient *client;
3218     Coroutine *co;
3219 
3220     client = g_new0(NBDClient, 1);
3221     qemu_mutex_init(&client->lock);
3222     client->refcount = 1;
3223     client->tlscreds = tlscreds;
3224     if (tlscreds) {
3225         object_ref(OBJECT(client->tlscreds));
3226     }
3227     client->tlsauthz = g_strdup(tlsauthz);
3228     client->sioc = sioc;
3229     qio_channel_set_delay(QIO_CHANNEL(sioc), false);
3230     object_ref(OBJECT(client->sioc));
3231     client->ioc = QIO_CHANNEL(sioc);
3232     object_ref(OBJECT(client->ioc));
3233     client->close_fn = close_fn;
3234 
3235     co = qemu_coroutine_create(nbd_co_client_start, client);
3236     qemu_coroutine_enter(co);
3237 }
3238