xref: /openbmc/qemu/nbd/server.c (revision 1c8222b0)
1 /*
2  *  Copyright (C) 2016-2018 Red Hat, Inc.
3  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
4  *
5  *  Network Block Device Server Side
6  *
7  *  This program is free software; you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License as published by
9  *  the Free Software Foundation; under version 2 of the License.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "qemu/osdep.h"
21 
22 #include "block/export.h"
23 #include "qapi/error.h"
24 #include "qemu/queue.h"
25 #include "trace.h"
26 #include "nbd-internal.h"
27 #include "qemu/units.h"
28 
29 #define NBD_META_ID_BASE_ALLOCATION 0
30 #define NBD_META_ID_DIRTY_BITMAP 1
31 
32 /*
33  * NBD_MAX_BLOCK_STATUS_EXTENTS: 1 MiB of extents data. An empirical
34  * constant. If an increase is needed, note that the NBD protocol
35  * recommends no larger than 32 mb, so that the client won't consider
36  * the reply as a denial of service attack.
37  */
38 #define NBD_MAX_BLOCK_STATUS_EXTENTS (1 * MiB / 8)
39 
40 static int system_errno_to_nbd_errno(int err)
41 {
42     switch (err) {
43     case 0:
44         return NBD_SUCCESS;
45     case EPERM:
46     case EROFS:
47         return NBD_EPERM;
48     case EIO:
49         return NBD_EIO;
50     case ENOMEM:
51         return NBD_ENOMEM;
52 #ifdef EDQUOT
53     case EDQUOT:
54 #endif
55     case EFBIG:
56     case ENOSPC:
57         return NBD_ENOSPC;
58     case EOVERFLOW:
59         return NBD_EOVERFLOW;
60     case ENOTSUP:
61 #if ENOTSUP != EOPNOTSUPP
62     case EOPNOTSUPP:
63 #endif
64         return NBD_ENOTSUP;
65     case ESHUTDOWN:
66         return NBD_ESHUTDOWN;
67     case EINVAL:
68     default:
69         return NBD_EINVAL;
70     }
71 }
72 
73 /* Definitions for opaque data types */
74 
75 typedef struct NBDRequestData NBDRequestData;
76 
77 struct NBDRequestData {
78     QSIMPLEQ_ENTRY(NBDRequestData) entry;
79     NBDClient *client;
80     uint8_t *data;
81     bool complete;
82 };
83 
84 struct NBDExport {
85     BlockExport common;
86     int refcount;
87     void (*close)(NBDExport *exp);
88 
89     BlockBackend *blk;
90     char *name;
91     char *description;
92     uint64_t size;
93     uint16_t nbdflags;
94     QTAILQ_HEAD(, NBDClient) clients;
95     QTAILQ_ENTRY(NBDExport) next;
96 
97     AioContext *ctx;
98 
99     BlockBackend *eject_notifier_blk;
100     Notifier eject_notifier;
101 
102     BdrvDirtyBitmap *export_bitmap;
103     char *export_bitmap_context;
104 };
105 
106 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
107 static QTAILQ_HEAD(, NBDExport) closed_exports =
108         QTAILQ_HEAD_INITIALIZER(closed_exports);
109 
110 /* NBDExportMetaContexts represents a list of contexts to be exported,
111  * as selected by NBD_OPT_SET_META_CONTEXT. Also used for
112  * NBD_OPT_LIST_META_CONTEXT. */
113 typedef struct NBDExportMetaContexts {
114     NBDExport *exp;
115     bool valid; /* means that negotiation of the option finished without
116                    errors */
117     bool base_allocation; /* export base:allocation context (block status) */
118     bool bitmap; /* export qemu:dirty-bitmap:<export bitmap name> */
119 } NBDExportMetaContexts;
120 
121 struct NBDClient {
122     int refcount;
123     void (*close_fn)(NBDClient *client, bool negotiated);
124 
125     NBDExport *exp;
126     QCryptoTLSCreds *tlscreds;
127     char *tlsauthz;
128     QIOChannelSocket *sioc; /* The underlying data channel */
129     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
130 
131     Coroutine *recv_coroutine;
132 
133     CoMutex send_lock;
134     Coroutine *send_coroutine;
135 
136     QTAILQ_ENTRY(NBDClient) next;
137     int nb_requests;
138     bool closing;
139 
140     uint32_t check_align; /* If non-zero, check for aligned client requests */
141 
142     bool structured_reply;
143     NBDExportMetaContexts export_meta;
144 
145     uint32_t opt; /* Current option being negotiated */
146     uint32_t optlen; /* remaining length of data in ioc for the option being
147                         negotiated now */
148 };
149 
150 static void nbd_client_receive_next_request(NBDClient *client);
151 
152 /* Basic flow for negotiation
153 
154    Server         Client
155    Negotiate
156 
157    or
158 
159    Server         Client
160    Negotiate #1
161                   Option
162    Negotiate #2
163 
164    ----
165 
166    followed by
167 
168    Server         Client
169                   Request
170    Response
171                   Request
172    Response
173                   ...
174    ...
175                   Request (type == 2)
176 
177 */
178 
179 static inline void set_be_option_rep(NBDOptionReply *rep, uint32_t option,
180                                      uint32_t type, uint32_t length)
181 {
182     stq_be_p(&rep->magic, NBD_REP_MAGIC);
183     stl_be_p(&rep->option, option);
184     stl_be_p(&rep->type, type);
185     stl_be_p(&rep->length, length);
186 }
187 
188 /* Send a reply header, including length, but no payload.
189  * Return -errno on error, 0 on success. */
190 static int nbd_negotiate_send_rep_len(NBDClient *client, uint32_t type,
191                                       uint32_t len, Error **errp)
192 {
193     NBDOptionReply rep;
194 
195     trace_nbd_negotiate_send_rep_len(client->opt, nbd_opt_lookup(client->opt),
196                                      type, nbd_rep_lookup(type), len);
197 
198     assert(len < NBD_MAX_BUFFER_SIZE);
199 
200     set_be_option_rep(&rep, client->opt, type, len);
201     return nbd_write(client->ioc, &rep, sizeof(rep), errp);
202 }
203 
204 /* Send a reply header with default 0 length.
205  * Return -errno on error, 0 on success. */
206 static int nbd_negotiate_send_rep(NBDClient *client, uint32_t type,
207                                   Error **errp)
208 {
209     return nbd_negotiate_send_rep_len(client, type, 0, errp);
210 }
211 
212 /* Send an error reply.
213  * Return -errno on error, 0 on success. */
214 static int GCC_FMT_ATTR(4, 0)
215 nbd_negotiate_send_rep_verr(NBDClient *client, uint32_t type,
216                             Error **errp, const char *fmt, va_list va)
217 {
218     ERRP_GUARD();
219     g_autofree char *msg = NULL;
220     int ret;
221     size_t len;
222 
223     msg = g_strdup_vprintf(fmt, va);
224     len = strlen(msg);
225     assert(len < NBD_MAX_STRING_SIZE);
226     trace_nbd_negotiate_send_rep_err(msg);
227     ret = nbd_negotiate_send_rep_len(client, type, len, errp);
228     if (ret < 0) {
229         return ret;
230     }
231     if (nbd_write(client->ioc, msg, len, errp) < 0) {
232         error_prepend(errp, "write failed (error message): ");
233         return -EIO;
234     }
235 
236     return 0;
237 }
238 
239 /*
240  * Return a malloc'd copy of @name suitable for use in an error reply.
241  */
242 static char *
243 nbd_sanitize_name(const char *name)
244 {
245     if (strnlen(name, 80) < 80) {
246         return g_strdup(name);
247     }
248     /* XXX Should we also try to sanitize any control characters? */
249     return g_strdup_printf("%.80s...", name);
250 }
251 
252 /* Send an error reply.
253  * Return -errno on error, 0 on success. */
254 static int GCC_FMT_ATTR(4, 5)
255 nbd_negotiate_send_rep_err(NBDClient *client, uint32_t type,
256                            Error **errp, const char *fmt, ...)
257 {
258     va_list va;
259     int ret;
260 
261     va_start(va, fmt);
262     ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
263     va_end(va);
264     return ret;
265 }
266 
267 /* Drop remainder of the current option, and send a reply with the
268  * given error type and message. Return -errno on read or write
269  * failure; or 0 if connection is still live. */
270 static int GCC_FMT_ATTR(4, 0)
271 nbd_opt_vdrop(NBDClient *client, uint32_t type, Error **errp,
272               const char *fmt, va_list va)
273 {
274     int ret = nbd_drop(client->ioc, client->optlen, errp);
275 
276     client->optlen = 0;
277     if (!ret) {
278         ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
279     }
280     return ret;
281 }
282 
283 static int GCC_FMT_ATTR(4, 5)
284 nbd_opt_drop(NBDClient *client, uint32_t type, Error **errp,
285              const char *fmt, ...)
286 {
287     int ret;
288     va_list va;
289 
290     va_start(va, fmt);
291     ret = nbd_opt_vdrop(client, type, errp, fmt, va);
292     va_end(va);
293 
294     return ret;
295 }
296 
297 static int GCC_FMT_ATTR(3, 4)
298 nbd_opt_invalid(NBDClient *client, Error **errp, const char *fmt, ...)
299 {
300     int ret;
301     va_list va;
302 
303     va_start(va, fmt);
304     ret = nbd_opt_vdrop(client, NBD_REP_ERR_INVALID, errp, fmt, va);
305     va_end(va);
306 
307     return ret;
308 }
309 
310 /* Read size bytes from the unparsed payload of the current option.
311  * Return -errno on I/O error, 0 if option was completely handled by
312  * sending a reply about inconsistent lengths, or 1 on success. */
313 static int nbd_opt_read(NBDClient *client, void *buffer, size_t size,
314                         Error **errp)
315 {
316     if (size > client->optlen) {
317         return nbd_opt_invalid(client, errp,
318                                "Inconsistent lengths in option %s",
319                                nbd_opt_lookup(client->opt));
320     }
321     client->optlen -= size;
322     return qio_channel_read_all(client->ioc, buffer, size, errp) < 0 ? -EIO : 1;
323 }
324 
325 /* Drop size bytes from the unparsed payload of the current option.
326  * Return -errno on I/O error, 0 if option was completely handled by
327  * sending a reply about inconsistent lengths, or 1 on success. */
328 static int nbd_opt_skip(NBDClient *client, size_t size, Error **errp)
329 {
330     if (size > client->optlen) {
331         return nbd_opt_invalid(client, errp,
332                                "Inconsistent lengths in option %s",
333                                nbd_opt_lookup(client->opt));
334     }
335     client->optlen -= size;
336     return nbd_drop(client->ioc, size, errp) < 0 ? -EIO : 1;
337 }
338 
339 /* nbd_opt_read_name
340  *
341  * Read a string with the format:
342  *   uint32_t len     (<= NBD_MAX_STRING_SIZE)
343  *   len bytes string (not 0-terminated)
344  *
345  * On success, @name will be allocated.
346  * If @length is non-null, it will be set to the actual string length.
347  *
348  * Return -errno on I/O error, 0 if option was completely handled by
349  * sending a reply about inconsistent lengths, or 1 on success.
350  */
351 static int nbd_opt_read_name(NBDClient *client, char **name, uint32_t *length,
352                              Error **errp)
353 {
354     int ret;
355     uint32_t len;
356     g_autofree char *local_name = NULL;
357 
358     *name = NULL;
359     ret = nbd_opt_read(client, &len, sizeof(len), errp);
360     if (ret <= 0) {
361         return ret;
362     }
363     len = cpu_to_be32(len);
364 
365     if (len > NBD_MAX_STRING_SIZE) {
366         return nbd_opt_invalid(client, errp,
367                                "Invalid name length: %" PRIu32, len);
368     }
369 
370     local_name = g_malloc(len + 1);
371     ret = nbd_opt_read(client, local_name, len, errp);
372     if (ret <= 0) {
373         return ret;
374     }
375     local_name[len] = '\0';
376 
377     if (length) {
378         *length = len;
379     }
380     *name = g_steal_pointer(&local_name);
381 
382     return 1;
383 }
384 
385 /* Send a single NBD_REP_SERVER reply to NBD_OPT_LIST, including payload.
386  * Return -errno on error, 0 on success. */
387 static int nbd_negotiate_send_rep_list(NBDClient *client, NBDExport *exp,
388                                        Error **errp)
389 {
390     ERRP_GUARD();
391     size_t name_len, desc_len;
392     uint32_t len;
393     const char *name = exp->name ? exp->name : "";
394     const char *desc = exp->description ? exp->description : "";
395     QIOChannel *ioc = client->ioc;
396     int ret;
397 
398     trace_nbd_negotiate_send_rep_list(name, desc);
399     name_len = strlen(name);
400     desc_len = strlen(desc);
401     assert(name_len <= NBD_MAX_STRING_SIZE && desc_len <= NBD_MAX_STRING_SIZE);
402     len = name_len + desc_len + sizeof(len);
403     ret = nbd_negotiate_send_rep_len(client, NBD_REP_SERVER, len, errp);
404     if (ret < 0) {
405         return ret;
406     }
407 
408     len = cpu_to_be32(name_len);
409     if (nbd_write(ioc, &len, sizeof(len), errp) < 0) {
410         error_prepend(errp, "write failed (name length): ");
411         return -EINVAL;
412     }
413 
414     if (nbd_write(ioc, name, name_len, errp) < 0) {
415         error_prepend(errp, "write failed (name buffer): ");
416         return -EINVAL;
417     }
418 
419     if (nbd_write(ioc, desc, desc_len, errp) < 0) {
420         error_prepend(errp, "write failed (description buffer): ");
421         return -EINVAL;
422     }
423 
424     return 0;
425 }
426 
427 /* Process the NBD_OPT_LIST command, with a potential series of replies.
428  * Return -errno on error, 0 on success. */
429 static int nbd_negotiate_handle_list(NBDClient *client, Error **errp)
430 {
431     NBDExport *exp;
432     assert(client->opt == NBD_OPT_LIST);
433 
434     /* For each export, send a NBD_REP_SERVER reply. */
435     QTAILQ_FOREACH(exp, &exports, next) {
436         if (nbd_negotiate_send_rep_list(client, exp, errp)) {
437             return -EINVAL;
438         }
439     }
440     /* Finish with a NBD_REP_ACK. */
441     return nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
442 }
443 
444 static void nbd_check_meta_export(NBDClient *client)
445 {
446     client->export_meta.valid &= client->exp == client->export_meta.exp;
447 }
448 
449 /* Send a reply to NBD_OPT_EXPORT_NAME.
450  * Return -errno on error, 0 on success. */
451 static int nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes,
452                                             Error **errp)
453 {
454     ERRP_GUARD();
455     g_autofree char *name = NULL;
456     char buf[NBD_REPLY_EXPORT_NAME_SIZE] = "";
457     size_t len;
458     int ret;
459     uint16_t myflags;
460 
461     /* Client sends:
462         [20 ..  xx]   export name (length bytes)
463        Server replies:
464         [ 0 ..   7]   size
465         [ 8 ..   9]   export flags
466         [10 .. 133]   reserved     (0) [unless no_zeroes]
467      */
468     trace_nbd_negotiate_handle_export_name();
469     if (client->optlen > NBD_MAX_STRING_SIZE) {
470         error_setg(errp, "Bad length received");
471         return -EINVAL;
472     }
473     name = g_malloc(client->optlen + 1);
474     if (nbd_read(client->ioc, name, client->optlen, "export name", errp) < 0) {
475         return -EIO;
476     }
477     name[client->optlen] = '\0';
478     client->optlen = 0;
479 
480     trace_nbd_negotiate_handle_export_name_request(name);
481 
482     client->exp = nbd_export_find(name);
483     if (!client->exp) {
484         error_setg(errp, "export not found");
485         return -EINVAL;
486     }
487 
488     myflags = client->exp->nbdflags;
489     if (client->structured_reply) {
490         myflags |= NBD_FLAG_SEND_DF;
491     }
492     trace_nbd_negotiate_new_style_size_flags(client->exp->size, myflags);
493     stq_be_p(buf, client->exp->size);
494     stw_be_p(buf + 8, myflags);
495     len = no_zeroes ? 10 : sizeof(buf);
496     ret = nbd_write(client->ioc, buf, len, errp);
497     if (ret < 0) {
498         error_prepend(errp, "write failed: ");
499         return ret;
500     }
501 
502     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
503     nbd_export_get(client->exp);
504     nbd_check_meta_export(client);
505 
506     return 0;
507 }
508 
509 /* Send a single NBD_REP_INFO, with a buffer @buf of @length bytes.
510  * The buffer does NOT include the info type prefix.
511  * Return -errno on error, 0 if ready to send more. */
512 static int nbd_negotiate_send_info(NBDClient *client,
513                                    uint16_t info, uint32_t length, void *buf,
514                                    Error **errp)
515 {
516     int rc;
517 
518     trace_nbd_negotiate_send_info(info, nbd_info_lookup(info), length);
519     rc = nbd_negotiate_send_rep_len(client, NBD_REP_INFO,
520                                     sizeof(info) + length, errp);
521     if (rc < 0) {
522         return rc;
523     }
524     info = cpu_to_be16(info);
525     if (nbd_write(client->ioc, &info, sizeof(info), errp) < 0) {
526         return -EIO;
527     }
528     if (nbd_write(client->ioc, buf, length, errp) < 0) {
529         return -EIO;
530     }
531     return 0;
532 }
533 
534 /* nbd_reject_length: Handle any unexpected payload.
535  * @fatal requests that we quit talking to the client, even if we are able
536  * to successfully send an error reply.
537  * Return:
538  * -errno  transmission error occurred or @fatal was requested, errp is set
539  * 0       error message successfully sent to client, errp is not set
540  */
541 static int nbd_reject_length(NBDClient *client, bool fatal, Error **errp)
542 {
543     int ret;
544 
545     assert(client->optlen);
546     ret = nbd_opt_invalid(client, errp, "option '%s' has unexpected length",
547                           nbd_opt_lookup(client->opt));
548     if (fatal && !ret) {
549         error_setg(errp, "option '%s' has unexpected length",
550                    nbd_opt_lookup(client->opt));
551         return -EINVAL;
552     }
553     return ret;
554 }
555 
556 /* Handle NBD_OPT_INFO and NBD_OPT_GO.
557  * Return -errno on error, 0 if ready for next option, and 1 to move
558  * into transmission phase.  */
559 static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
560 {
561     int rc;
562     g_autofree char *name = NULL;
563     NBDExport *exp;
564     uint16_t requests;
565     uint16_t request;
566     uint32_t namelen;
567     bool sendname = false;
568     bool blocksize = false;
569     uint32_t sizes[3];
570     char buf[sizeof(uint64_t) + sizeof(uint16_t)];
571     uint32_t check_align = 0;
572     uint16_t myflags;
573 
574     /* Client sends:
575         4 bytes: L, name length (can be 0)
576         L bytes: export name
577         2 bytes: N, number of requests (can be 0)
578         N * 2 bytes: N requests
579     */
580     rc = nbd_opt_read_name(client, &name, &namelen, errp);
581     if (rc <= 0) {
582         return rc;
583     }
584     trace_nbd_negotiate_handle_export_name_request(name);
585 
586     rc = nbd_opt_read(client, &requests, sizeof(requests), errp);
587     if (rc <= 0) {
588         return rc;
589     }
590     requests = be16_to_cpu(requests);
591     trace_nbd_negotiate_handle_info_requests(requests);
592     while (requests--) {
593         rc = nbd_opt_read(client, &request, sizeof(request), errp);
594         if (rc <= 0) {
595             return rc;
596         }
597         request = be16_to_cpu(request);
598         trace_nbd_negotiate_handle_info_request(request,
599                                                 nbd_info_lookup(request));
600         /* We care about NBD_INFO_NAME and NBD_INFO_BLOCK_SIZE;
601          * everything else is either a request we don't know or
602          * something we send regardless of request */
603         switch (request) {
604         case NBD_INFO_NAME:
605             sendname = true;
606             break;
607         case NBD_INFO_BLOCK_SIZE:
608             blocksize = true;
609             break;
610         }
611     }
612     if (client->optlen) {
613         return nbd_reject_length(client, false, errp);
614     }
615 
616     exp = nbd_export_find(name);
617     if (!exp) {
618         g_autofree char *sane_name = nbd_sanitize_name(name);
619 
620         return nbd_negotiate_send_rep_err(client, NBD_REP_ERR_UNKNOWN,
621                                           errp, "export '%s' not present",
622                                           sane_name);
623     }
624 
625     /* Don't bother sending NBD_INFO_NAME unless client requested it */
626     if (sendname) {
627         rc = nbd_negotiate_send_info(client, NBD_INFO_NAME, namelen, name,
628                                      errp);
629         if (rc < 0) {
630             return rc;
631         }
632     }
633 
634     /* Send NBD_INFO_DESCRIPTION only if available, regardless of
635      * client request */
636     if (exp->description) {
637         size_t len = strlen(exp->description);
638 
639         assert(len <= NBD_MAX_STRING_SIZE);
640         rc = nbd_negotiate_send_info(client, NBD_INFO_DESCRIPTION,
641                                      len, exp->description, errp);
642         if (rc < 0) {
643             return rc;
644         }
645     }
646 
647     /* Send NBD_INFO_BLOCK_SIZE always, but tweak the minimum size
648      * according to whether the client requested it, and according to
649      * whether this is OPT_INFO or OPT_GO. */
650     /* minimum - 1 for back-compat, or actual if client will obey it. */
651     if (client->opt == NBD_OPT_INFO || blocksize) {
652         check_align = sizes[0] = blk_get_request_alignment(exp->blk);
653     } else {
654         sizes[0] = 1;
655     }
656     assert(sizes[0] <= NBD_MAX_BUFFER_SIZE);
657     /* preferred - Hard-code to 4096 for now.
658      * TODO: is blk_bs(blk)->bl.opt_transfer appropriate? */
659     sizes[1] = MAX(4096, sizes[0]);
660     /* maximum - At most 32M, but smaller as appropriate. */
661     sizes[2] = MIN(blk_get_max_transfer(exp->blk), NBD_MAX_BUFFER_SIZE);
662     trace_nbd_negotiate_handle_info_block_size(sizes[0], sizes[1], sizes[2]);
663     sizes[0] = cpu_to_be32(sizes[0]);
664     sizes[1] = cpu_to_be32(sizes[1]);
665     sizes[2] = cpu_to_be32(sizes[2]);
666     rc = nbd_negotiate_send_info(client, NBD_INFO_BLOCK_SIZE,
667                                  sizeof(sizes), sizes, errp);
668     if (rc < 0) {
669         return rc;
670     }
671 
672     /* Send NBD_INFO_EXPORT always */
673     myflags = exp->nbdflags;
674     if (client->structured_reply) {
675         myflags |= NBD_FLAG_SEND_DF;
676     }
677     trace_nbd_negotiate_new_style_size_flags(exp->size, myflags);
678     stq_be_p(buf, exp->size);
679     stw_be_p(buf + 8, myflags);
680     rc = nbd_negotiate_send_info(client, NBD_INFO_EXPORT,
681                                  sizeof(buf), buf, errp);
682     if (rc < 0) {
683         return rc;
684     }
685 
686     /*
687      * If the client is just asking for NBD_OPT_INFO, but forgot to
688      * request block sizes in a situation that would impact
689      * performance, then return an error. But for NBD_OPT_GO, we
690      * tolerate all clients, regardless of alignments.
691      */
692     if (client->opt == NBD_OPT_INFO && !blocksize &&
693         blk_get_request_alignment(exp->blk) > 1) {
694         return nbd_negotiate_send_rep_err(client,
695                                           NBD_REP_ERR_BLOCK_SIZE_REQD,
696                                           errp,
697                                           "request NBD_INFO_BLOCK_SIZE to "
698                                           "use this export");
699     }
700 
701     /* Final reply */
702     rc = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
703     if (rc < 0) {
704         return rc;
705     }
706 
707     if (client->opt == NBD_OPT_GO) {
708         client->exp = exp;
709         client->check_align = check_align;
710         QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
711         nbd_export_get(client->exp);
712         nbd_check_meta_export(client);
713         rc = 1;
714     }
715     return rc;
716 }
717 
718 
719 /* Handle NBD_OPT_STARTTLS. Return NULL to drop connection, or else the
720  * new channel for all further (now-encrypted) communication. */
721 static QIOChannel *nbd_negotiate_handle_starttls(NBDClient *client,
722                                                  Error **errp)
723 {
724     QIOChannel *ioc;
725     QIOChannelTLS *tioc;
726     struct NBDTLSHandshakeData data = { 0 };
727 
728     assert(client->opt == NBD_OPT_STARTTLS);
729 
730     trace_nbd_negotiate_handle_starttls();
731     ioc = client->ioc;
732 
733     if (nbd_negotiate_send_rep(client, NBD_REP_ACK, errp) < 0) {
734         return NULL;
735     }
736 
737     tioc = qio_channel_tls_new_server(ioc,
738                                       client->tlscreds,
739                                       client->tlsauthz,
740                                       errp);
741     if (!tioc) {
742         return NULL;
743     }
744 
745     qio_channel_set_name(QIO_CHANNEL(tioc), "nbd-server-tls");
746     trace_nbd_negotiate_handle_starttls_handshake();
747     data.loop = g_main_loop_new(g_main_context_default(), FALSE);
748     qio_channel_tls_handshake(tioc,
749                               nbd_tls_handshake,
750                               &data,
751                               NULL,
752                               NULL);
753 
754     if (!data.complete) {
755         g_main_loop_run(data.loop);
756     }
757     g_main_loop_unref(data.loop);
758     if (data.error) {
759         object_unref(OBJECT(tioc));
760         error_propagate(errp, data.error);
761         return NULL;
762     }
763 
764     return QIO_CHANNEL(tioc);
765 }
766 
767 /* nbd_negotiate_send_meta_context
768  *
769  * Send one chunk of reply to NBD_OPT_{LIST,SET}_META_CONTEXT
770  *
771  * For NBD_OPT_LIST_META_CONTEXT @context_id is ignored, 0 is used instead.
772  */
773 static int nbd_negotiate_send_meta_context(NBDClient *client,
774                                            const char *context,
775                                            uint32_t context_id,
776                                            Error **errp)
777 {
778     NBDOptionReplyMetaContext opt;
779     struct iovec iov[] = {
780         {.iov_base = &opt, .iov_len = sizeof(opt)},
781         {.iov_base = (void *)context, .iov_len = strlen(context)}
782     };
783 
784     assert(iov[1].iov_len <= NBD_MAX_STRING_SIZE);
785     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
786         context_id = 0;
787     }
788 
789     trace_nbd_negotiate_meta_query_reply(context, context_id);
790     set_be_option_rep(&opt.h, client->opt, NBD_REP_META_CONTEXT,
791                       sizeof(opt) - sizeof(opt.h) + iov[1].iov_len);
792     stl_be_p(&opt.context_id, context_id);
793 
794     return qio_channel_writev_all(client->ioc, iov, 2, errp) < 0 ? -EIO : 0;
795 }
796 
797 /* Read strlen(@pattern) bytes, and set @match to true if they match @pattern.
798  * @match is never set to false.
799  *
800  * Return -errno on I/O error, 0 if option was completely handled by
801  * sending a reply about inconsistent lengths, or 1 on success.
802  *
803  * Note: return code = 1 doesn't mean that we've read exactly @pattern.
804  * It only means that there are no errors.
805  */
806 static int nbd_meta_pattern(NBDClient *client, const char *pattern, bool *match,
807                             Error **errp)
808 {
809     int ret;
810     char *query;
811     size_t len = strlen(pattern);
812 
813     assert(len);
814 
815     query = g_malloc(len);
816     ret = nbd_opt_read(client, query, len, errp);
817     if (ret <= 0) {
818         g_free(query);
819         return ret;
820     }
821 
822     if (strncmp(query, pattern, len) == 0) {
823         trace_nbd_negotiate_meta_query_parse(pattern);
824         *match = true;
825     } else {
826         trace_nbd_negotiate_meta_query_skip("pattern not matched");
827     }
828     g_free(query);
829 
830     return 1;
831 }
832 
833 /*
834  * Read @len bytes, and set @match to true if they match @pattern, or if @len
835  * is 0 and the client is performing _LIST_. @match is never set to false.
836  *
837  * Return -errno on I/O error, 0 if option was completely handled by
838  * sending a reply about inconsistent lengths, or 1 on success.
839  *
840  * Note: return code = 1 doesn't mean that we've read exactly @pattern.
841  * It only means that there are no errors.
842  */
843 static int nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
844                                      uint32_t len, bool *match, Error **errp)
845 {
846     if (len == 0) {
847         if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
848             *match = true;
849         }
850         trace_nbd_negotiate_meta_query_parse("empty");
851         return 1;
852     }
853 
854     if (len != strlen(pattern)) {
855         trace_nbd_negotiate_meta_query_skip("different lengths");
856         return nbd_opt_skip(client, len, errp);
857     }
858 
859     return nbd_meta_pattern(client, pattern, match, errp);
860 }
861 
862 /* nbd_meta_base_query
863  *
864  * Handle queries to 'base' namespace. For now, only the base:allocation
865  * context is available.  'len' is the amount of text remaining to be read from
866  * the current name, after the 'base:' portion has been stripped.
867  *
868  * Return -errno on I/O error, 0 if option was completely handled by
869  * sending a reply about inconsistent lengths, or 1 on success.
870  */
871 static int nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta,
872                                uint32_t len, Error **errp)
873 {
874     return nbd_meta_empty_or_pattern(client, "allocation", len,
875                                      &meta->base_allocation, errp);
876 }
877 
878 /* nbd_meta_bitmap_query
879  *
880  * Handle query to 'qemu:' namespace.
881  * @len is the amount of text remaining to be read from the current name, after
882  * the 'qemu:' portion has been stripped.
883  *
884  * Return -errno on I/O error, 0 if option was completely handled by
885  * sending a reply about inconsistent lengths, or 1 on success. */
886 static int nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
887                                uint32_t len, Error **errp)
888 {
889     bool dirty_bitmap = false;
890     size_t dirty_bitmap_len = strlen("dirty-bitmap:");
891     int ret;
892 
893     if (!meta->exp->export_bitmap) {
894         trace_nbd_negotiate_meta_query_skip("no dirty-bitmap exported");
895         return nbd_opt_skip(client, len, errp);
896     }
897 
898     if (len == 0) {
899         if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
900             meta->bitmap = true;
901         }
902         trace_nbd_negotiate_meta_query_parse("empty");
903         return 1;
904     }
905 
906     if (len < dirty_bitmap_len) {
907         trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
908         return nbd_opt_skip(client, len, errp);
909     }
910 
911     len -= dirty_bitmap_len;
912     ret = nbd_meta_pattern(client, "dirty-bitmap:", &dirty_bitmap, errp);
913     if (ret <= 0) {
914         return ret;
915     }
916     if (!dirty_bitmap) {
917         trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
918         return nbd_opt_skip(client, len, errp);
919     }
920 
921     trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
922 
923     return nbd_meta_empty_or_pattern(
924             client, meta->exp->export_bitmap_context +
925             strlen("qemu:dirty_bitmap:"), len, &meta->bitmap, errp);
926 }
927 
928 /* nbd_negotiate_meta_query
929  *
930  * Parse namespace name and call corresponding function to parse body of the
931  * query.
932  *
933  * The only supported namespaces are 'base' and 'qemu'.
934  *
935  * The function aims not wasting time and memory to read long unknown namespace
936  * names.
937  *
938  * Return -errno on I/O error, 0 if option was completely handled by
939  * sending a reply about inconsistent lengths, or 1 on success. */
940 static int nbd_negotiate_meta_query(NBDClient *client,
941                                     NBDExportMetaContexts *meta, Error **errp)
942 {
943     /*
944      * Both 'qemu' and 'base' namespaces have length = 5 including a
945      * colon. If another length namespace is later introduced, this
946      * should certainly be refactored.
947      */
948     int ret;
949     size_t ns_len = 5;
950     char ns[5];
951     uint32_t len;
952 
953     ret = nbd_opt_read(client, &len, sizeof(len), errp);
954     if (ret <= 0) {
955         return ret;
956     }
957     len = cpu_to_be32(len);
958 
959     if (len > NBD_MAX_STRING_SIZE) {
960         trace_nbd_negotiate_meta_query_skip("length too long");
961         return nbd_opt_skip(client, len, errp);
962     }
963     if (len < ns_len) {
964         trace_nbd_negotiate_meta_query_skip("length too short");
965         return nbd_opt_skip(client, len, errp);
966     }
967 
968     len -= ns_len;
969     ret = nbd_opt_read(client, ns, ns_len, errp);
970     if (ret <= 0) {
971         return ret;
972     }
973 
974     if (!strncmp(ns, "base:", ns_len)) {
975         trace_nbd_negotiate_meta_query_parse("base:");
976         return nbd_meta_base_query(client, meta, len, errp);
977     } else if (!strncmp(ns, "qemu:", ns_len)) {
978         trace_nbd_negotiate_meta_query_parse("qemu:");
979         return nbd_meta_qemu_query(client, meta, len, errp);
980     }
981 
982     trace_nbd_negotiate_meta_query_skip("unknown namespace");
983     return nbd_opt_skip(client, len, errp);
984 }
985 
986 /* nbd_negotiate_meta_queries
987  * Handle NBD_OPT_LIST_META_CONTEXT and NBD_OPT_SET_META_CONTEXT
988  *
989  * Return -errno on I/O error, or 0 if option was completely handled. */
990 static int nbd_negotiate_meta_queries(NBDClient *client,
991                                       NBDExportMetaContexts *meta, Error **errp)
992 {
993     int ret;
994     g_autofree char *export_name = NULL;
995     NBDExportMetaContexts local_meta;
996     uint32_t nb_queries;
997     int i;
998 
999     if (!client->structured_reply) {
1000         return nbd_opt_invalid(client, errp,
1001                                "request option '%s' when structured reply "
1002                                "is not negotiated",
1003                                nbd_opt_lookup(client->opt));
1004     }
1005 
1006     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
1007         /* Only change the caller's meta on SET. */
1008         meta = &local_meta;
1009     }
1010 
1011     memset(meta, 0, sizeof(*meta));
1012 
1013     ret = nbd_opt_read_name(client, &export_name, NULL, errp);
1014     if (ret <= 0) {
1015         return ret;
1016     }
1017 
1018     meta->exp = nbd_export_find(export_name);
1019     if (meta->exp == NULL) {
1020         g_autofree char *sane_name = nbd_sanitize_name(export_name);
1021 
1022         return nbd_opt_drop(client, NBD_REP_ERR_UNKNOWN, errp,
1023                             "export '%s' not present", sane_name);
1024     }
1025 
1026     ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), errp);
1027     if (ret <= 0) {
1028         return ret;
1029     }
1030     nb_queries = cpu_to_be32(nb_queries);
1031     trace_nbd_negotiate_meta_context(nbd_opt_lookup(client->opt),
1032                                      export_name, nb_queries);
1033 
1034     if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
1035         /* enable all known contexts */
1036         meta->base_allocation = true;
1037         meta->bitmap = !!meta->exp->export_bitmap;
1038     } else {
1039         for (i = 0; i < nb_queries; ++i) {
1040             ret = nbd_negotiate_meta_query(client, meta, errp);
1041             if (ret <= 0) {
1042                 return ret;
1043             }
1044         }
1045     }
1046 
1047     if (meta->base_allocation) {
1048         ret = nbd_negotiate_send_meta_context(client, "base:allocation",
1049                                               NBD_META_ID_BASE_ALLOCATION,
1050                                               errp);
1051         if (ret < 0) {
1052             return ret;
1053         }
1054     }
1055 
1056     if (meta->bitmap) {
1057         ret = nbd_negotiate_send_meta_context(client,
1058                                               meta->exp->export_bitmap_context,
1059                                               NBD_META_ID_DIRTY_BITMAP,
1060                                               errp);
1061         if (ret < 0) {
1062             return ret;
1063         }
1064     }
1065 
1066     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1067     if (ret == 0) {
1068         meta->valid = true;
1069     }
1070 
1071     return ret;
1072 }
1073 
1074 /* nbd_negotiate_options
1075  * Process all NBD_OPT_* client option commands, during fixed newstyle
1076  * negotiation.
1077  * Return:
1078  * -errno  on error, errp is set
1079  * 0       on successful negotiation, errp is not set
1080  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
1081  *         errp is not set
1082  */
1083 static int nbd_negotiate_options(NBDClient *client, Error **errp)
1084 {
1085     uint32_t flags;
1086     bool fixedNewstyle = false;
1087     bool no_zeroes = false;
1088 
1089     /* Client sends:
1090         [ 0 ..   3]   client flags
1091 
1092        Then we loop until NBD_OPT_EXPORT_NAME or NBD_OPT_GO:
1093         [ 0 ..   7]   NBD_OPTS_MAGIC
1094         [ 8 ..  11]   NBD option
1095         [12 ..  15]   Data length
1096         ...           Rest of request
1097 
1098         [ 0 ..   7]   NBD_OPTS_MAGIC
1099         [ 8 ..  11]   Second NBD option
1100         [12 ..  15]   Data length
1101         ...           Rest of request
1102     */
1103 
1104     if (nbd_read32(client->ioc, &flags, "flags", errp) < 0) {
1105         return -EIO;
1106     }
1107     trace_nbd_negotiate_options_flags(flags);
1108     if (flags & NBD_FLAG_C_FIXED_NEWSTYLE) {
1109         fixedNewstyle = true;
1110         flags &= ~NBD_FLAG_C_FIXED_NEWSTYLE;
1111     }
1112     if (flags & NBD_FLAG_C_NO_ZEROES) {
1113         no_zeroes = true;
1114         flags &= ~NBD_FLAG_C_NO_ZEROES;
1115     }
1116     if (flags != 0) {
1117         error_setg(errp, "Unknown client flags 0x%" PRIx32 " received", flags);
1118         return -EINVAL;
1119     }
1120 
1121     while (1) {
1122         int ret;
1123         uint32_t option, length;
1124         uint64_t magic;
1125 
1126         if (nbd_read64(client->ioc, &magic, "opts magic", errp) < 0) {
1127             return -EINVAL;
1128         }
1129         trace_nbd_negotiate_options_check_magic(magic);
1130         if (magic != NBD_OPTS_MAGIC) {
1131             error_setg(errp, "Bad magic received");
1132             return -EINVAL;
1133         }
1134 
1135         if (nbd_read32(client->ioc, &option, "option", errp) < 0) {
1136             return -EINVAL;
1137         }
1138         client->opt = option;
1139 
1140         if (nbd_read32(client->ioc, &length, "option length", errp) < 0) {
1141             return -EINVAL;
1142         }
1143         assert(!client->optlen);
1144         client->optlen = length;
1145 
1146         if (length > NBD_MAX_BUFFER_SIZE) {
1147             error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
1148                        length, NBD_MAX_BUFFER_SIZE);
1149             return -EINVAL;
1150         }
1151 
1152         trace_nbd_negotiate_options_check_option(option,
1153                                                  nbd_opt_lookup(option));
1154         if (client->tlscreds &&
1155             client->ioc == (QIOChannel *)client->sioc) {
1156             QIOChannel *tioc;
1157             if (!fixedNewstyle) {
1158                 error_setg(errp, "Unsupported option 0x%" PRIx32, option);
1159                 return -EINVAL;
1160             }
1161             switch (option) {
1162             case NBD_OPT_STARTTLS:
1163                 if (length) {
1164                     /* Unconditionally drop the connection if the client
1165                      * can't start a TLS negotiation correctly */
1166                     return nbd_reject_length(client, true, errp);
1167                 }
1168                 tioc = nbd_negotiate_handle_starttls(client, errp);
1169                 if (!tioc) {
1170                     return -EIO;
1171                 }
1172                 ret = 0;
1173                 object_unref(OBJECT(client->ioc));
1174                 client->ioc = QIO_CHANNEL(tioc);
1175                 break;
1176 
1177             case NBD_OPT_EXPORT_NAME:
1178                 /* No way to return an error to client, so drop connection */
1179                 error_setg(errp, "Option 0x%x not permitted before TLS",
1180                            option);
1181                 return -EINVAL;
1182 
1183             default:
1184                 /* Let the client keep trying, unless they asked to
1185                  * quit. Always try to give an error back to the
1186                  * client; but when replying to OPT_ABORT, be aware
1187                  * that the client may hang up before receiving the
1188                  * error, in which case we are fine ignoring the
1189                  * resulting EPIPE. */
1190                 ret = nbd_opt_drop(client, NBD_REP_ERR_TLS_REQD,
1191                                    option == NBD_OPT_ABORT ? NULL : errp,
1192                                    "Option 0x%" PRIx32
1193                                    " not permitted before TLS", option);
1194                 if (option == NBD_OPT_ABORT) {
1195                     return 1;
1196                 }
1197                 break;
1198             }
1199         } else if (fixedNewstyle) {
1200             switch (option) {
1201             case NBD_OPT_LIST:
1202                 if (length) {
1203                     ret = nbd_reject_length(client, false, errp);
1204                 } else {
1205                     ret = nbd_negotiate_handle_list(client, errp);
1206                 }
1207                 break;
1208 
1209             case NBD_OPT_ABORT:
1210                 /* NBD spec says we must try to reply before
1211                  * disconnecting, but that we must also tolerate
1212                  * guests that don't wait for our reply. */
1213                 nbd_negotiate_send_rep(client, NBD_REP_ACK, NULL);
1214                 return 1;
1215 
1216             case NBD_OPT_EXPORT_NAME:
1217                 return nbd_negotiate_handle_export_name(client, no_zeroes,
1218                                                         errp);
1219 
1220             case NBD_OPT_INFO:
1221             case NBD_OPT_GO:
1222                 ret = nbd_negotiate_handle_info(client, errp);
1223                 if (ret == 1) {
1224                     assert(option == NBD_OPT_GO);
1225                     return 0;
1226                 }
1227                 break;
1228 
1229             case NBD_OPT_STARTTLS:
1230                 if (length) {
1231                     ret = nbd_reject_length(client, false, errp);
1232                 } else if (client->tlscreds) {
1233                     ret = nbd_negotiate_send_rep_err(client,
1234                                                      NBD_REP_ERR_INVALID, errp,
1235                                                      "TLS already enabled");
1236                 } else {
1237                     ret = nbd_negotiate_send_rep_err(client,
1238                                                      NBD_REP_ERR_POLICY, errp,
1239                                                      "TLS not configured");
1240                 }
1241                 break;
1242 
1243             case NBD_OPT_STRUCTURED_REPLY:
1244                 if (length) {
1245                     ret = nbd_reject_length(client, false, errp);
1246                 } else if (client->structured_reply) {
1247                     ret = nbd_negotiate_send_rep_err(
1248                         client, NBD_REP_ERR_INVALID, errp,
1249                         "structured reply already negotiated");
1250                 } else {
1251                     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1252                     client->structured_reply = true;
1253                 }
1254                 break;
1255 
1256             case NBD_OPT_LIST_META_CONTEXT:
1257             case NBD_OPT_SET_META_CONTEXT:
1258                 ret = nbd_negotiate_meta_queries(client, &client->export_meta,
1259                                                  errp);
1260                 break;
1261 
1262             default:
1263                 ret = nbd_opt_drop(client, NBD_REP_ERR_UNSUP, errp,
1264                                    "Unsupported option %" PRIu32 " (%s)",
1265                                    option, nbd_opt_lookup(option));
1266                 break;
1267             }
1268         } else {
1269             /*
1270              * If broken new-style we should drop the connection
1271              * for anything except NBD_OPT_EXPORT_NAME
1272              */
1273             switch (option) {
1274             case NBD_OPT_EXPORT_NAME:
1275                 return nbd_negotiate_handle_export_name(client, no_zeroes,
1276                                                         errp);
1277 
1278             default:
1279                 error_setg(errp, "Unsupported option %" PRIu32 " (%s)",
1280                            option, nbd_opt_lookup(option));
1281                 return -EINVAL;
1282             }
1283         }
1284         if (ret < 0) {
1285             return ret;
1286         }
1287     }
1288 }
1289 
1290 /* nbd_negotiate
1291  * Return:
1292  * -errno  on error, errp is set
1293  * 0       on successful negotiation, errp is not set
1294  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
1295  *         errp is not set
1296  */
1297 static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
1298 {
1299     ERRP_GUARD();
1300     char buf[NBD_OLDSTYLE_NEGOTIATE_SIZE] = "";
1301     int ret;
1302 
1303     /* Old style negotiation header, no room for options
1304         [ 0 ..   7]   passwd       ("NBDMAGIC")
1305         [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
1306         [16 ..  23]   size
1307         [24 ..  27]   export flags (zero-extended)
1308         [28 .. 151]   reserved     (0)
1309 
1310        New style negotiation header, client can send options
1311         [ 0 ..   7]   passwd       ("NBDMAGIC")
1312         [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
1313         [16 ..  17]   server flags (0)
1314         ....options sent, ending in NBD_OPT_EXPORT_NAME or NBD_OPT_GO....
1315      */
1316 
1317     qio_channel_set_blocking(client->ioc, false, NULL);
1318 
1319     trace_nbd_negotiate_begin();
1320     memcpy(buf, "NBDMAGIC", 8);
1321 
1322     stq_be_p(buf + 8, NBD_OPTS_MAGIC);
1323     stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES);
1324 
1325     if (nbd_write(client->ioc, buf, 18, errp) < 0) {
1326         error_prepend(errp, "write failed: ");
1327         return -EINVAL;
1328     }
1329     ret = nbd_negotiate_options(client, errp);
1330     if (ret != 0) {
1331         if (ret < 0) {
1332             error_prepend(errp, "option negotiation failed: ");
1333         }
1334         return ret;
1335     }
1336 
1337     /* Attach the channel to the same AioContext as the export */
1338     if (client->exp && client->exp->ctx) {
1339         qio_channel_attach_aio_context(client->ioc, client->exp->ctx);
1340     }
1341 
1342     assert(!client->optlen);
1343     trace_nbd_negotiate_success();
1344 
1345     return 0;
1346 }
1347 
1348 static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
1349                                Error **errp)
1350 {
1351     uint8_t buf[NBD_REQUEST_SIZE];
1352     uint32_t magic;
1353     int ret;
1354 
1355     ret = nbd_read(ioc, buf, sizeof(buf), "request", errp);
1356     if (ret < 0) {
1357         return ret;
1358     }
1359 
1360     /* Request
1361        [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
1362        [ 4 ..  5]   flags   (NBD_CMD_FLAG_FUA, ...)
1363        [ 6 ..  7]   type    (NBD_CMD_READ, ...)
1364        [ 8 .. 15]   handle
1365        [16 .. 23]   from
1366        [24 .. 27]   len
1367      */
1368 
1369     magic = ldl_be_p(buf);
1370     request->flags  = lduw_be_p(buf + 4);
1371     request->type   = lduw_be_p(buf + 6);
1372     request->handle = ldq_be_p(buf + 8);
1373     request->from   = ldq_be_p(buf + 16);
1374     request->len    = ldl_be_p(buf + 24);
1375 
1376     trace_nbd_receive_request(magic, request->flags, request->type,
1377                               request->from, request->len);
1378 
1379     if (magic != NBD_REQUEST_MAGIC) {
1380         error_setg(errp, "invalid magic (got 0x%" PRIx32 ")", magic);
1381         return -EINVAL;
1382     }
1383     return 0;
1384 }
1385 
1386 #define MAX_NBD_REQUESTS 16
1387 
1388 void nbd_client_get(NBDClient *client)
1389 {
1390     client->refcount++;
1391 }
1392 
1393 void nbd_client_put(NBDClient *client)
1394 {
1395     if (--client->refcount == 0) {
1396         /* The last reference should be dropped by client->close,
1397          * which is called by client_close.
1398          */
1399         assert(client->closing);
1400 
1401         qio_channel_detach_aio_context(client->ioc);
1402         object_unref(OBJECT(client->sioc));
1403         object_unref(OBJECT(client->ioc));
1404         if (client->tlscreds) {
1405             object_unref(OBJECT(client->tlscreds));
1406         }
1407         g_free(client->tlsauthz);
1408         if (client->exp) {
1409             QTAILQ_REMOVE(&client->exp->clients, client, next);
1410             nbd_export_put(client->exp);
1411         }
1412         g_free(client);
1413     }
1414 }
1415 
1416 static void client_close(NBDClient *client, bool negotiated)
1417 {
1418     if (client->closing) {
1419         return;
1420     }
1421 
1422     client->closing = true;
1423 
1424     /* Force requests to finish.  They will drop their own references,
1425      * then we'll close the socket and free the NBDClient.
1426      */
1427     qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
1428                          NULL);
1429 
1430     /* Also tell the client, so that they release their reference.  */
1431     if (client->close_fn) {
1432         client->close_fn(client, negotiated);
1433     }
1434 }
1435 
1436 static NBDRequestData *nbd_request_get(NBDClient *client)
1437 {
1438     NBDRequestData *req;
1439 
1440     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
1441     client->nb_requests++;
1442 
1443     req = g_new0(NBDRequestData, 1);
1444     nbd_client_get(client);
1445     req->client = client;
1446     return req;
1447 }
1448 
1449 static void nbd_request_put(NBDRequestData *req)
1450 {
1451     NBDClient *client = req->client;
1452 
1453     if (req->data) {
1454         qemu_vfree(req->data);
1455     }
1456     g_free(req);
1457 
1458     client->nb_requests--;
1459     nbd_client_receive_next_request(client);
1460 
1461     nbd_client_put(client);
1462 }
1463 
1464 static void blk_aio_attached(AioContext *ctx, void *opaque)
1465 {
1466     NBDExport *exp = opaque;
1467     NBDClient *client;
1468 
1469     trace_nbd_blk_aio_attached(exp->name, ctx);
1470 
1471     exp->ctx = ctx;
1472 
1473     QTAILQ_FOREACH(client, &exp->clients, next) {
1474         qio_channel_attach_aio_context(client->ioc, ctx);
1475         if (client->recv_coroutine) {
1476             aio_co_schedule(ctx, client->recv_coroutine);
1477         }
1478         if (client->send_coroutine) {
1479             aio_co_schedule(ctx, client->send_coroutine);
1480         }
1481     }
1482 }
1483 
1484 static void blk_aio_detach(void *opaque)
1485 {
1486     NBDExport *exp = opaque;
1487     NBDClient *client;
1488 
1489     trace_nbd_blk_aio_detach(exp->name, exp->ctx);
1490 
1491     QTAILQ_FOREACH(client, &exp->clients, next) {
1492         qio_channel_detach_aio_context(client->ioc);
1493     }
1494 
1495     exp->ctx = NULL;
1496 }
1497 
1498 static void nbd_eject_notifier(Notifier *n, void *data)
1499 {
1500     NBDExport *exp = container_of(n, NBDExport, eject_notifier);
1501     AioContext *aio_context;
1502 
1503     aio_context = exp->ctx;
1504     aio_context_acquire(aio_context);
1505     nbd_export_close(exp);
1506     aio_context_release(aio_context);
1507 }
1508 
1509 void nbd_export_set_on_eject_blk(BlockExport *exp, BlockBackend *blk)
1510 {
1511     NBDExport *nbd_exp = container_of(exp, NBDExport, common);
1512     assert(exp->drv == &blk_exp_nbd);
1513     assert(nbd_exp->eject_notifier_blk == NULL);
1514 
1515     blk_ref(blk);
1516     nbd_exp->eject_notifier_blk = blk;
1517     nbd_exp->eject_notifier.notify = nbd_eject_notifier;
1518     blk_add_remove_bs_notifier(blk, &nbd_exp->eject_notifier);
1519 }
1520 
1521 NBDExport *nbd_export_new(BlockDriverState *bs,
1522                           const char *name, const char *desc,
1523                           const char *bitmap, bool readonly, bool shared,
1524                           void (*close)(NBDExport *), bool writethrough,
1525                           Error **errp)
1526 {
1527     AioContext *ctx;
1528     BlockBackend *blk;
1529     NBDExport *exp;
1530     int64_t size;
1531     uint64_t perm;
1532     int ret;
1533 
1534     size = bdrv_getlength(bs);
1535     if (size < 0) {
1536         error_setg_errno(errp, -size,
1537                          "Failed to determine the NBD export's length");
1538         return NULL;
1539     }
1540 
1541     exp = g_new0(NBDExport, 1);
1542     exp->common = (BlockExport) {
1543         .drv = &blk_exp_nbd,
1544     };
1545 
1546     /*
1547      * NBD exports are used for non-shared storage migration.  Make sure
1548      * that BDRV_O_INACTIVE is cleared and the image is ready for write
1549      * access since the export could be available before migration handover.
1550      * ctx was acquired in the caller.
1551      */
1552     assert(name && strlen(name) <= NBD_MAX_STRING_SIZE);
1553     ctx = bdrv_get_aio_context(bs);
1554     bdrv_invalidate_cache(bs, NULL);
1555 
1556     /* Don't allow resize while the NBD server is running, otherwise we don't
1557      * care what happens with the node. */
1558     perm = BLK_PERM_CONSISTENT_READ;
1559     if (!readonly) {
1560         perm |= BLK_PERM_WRITE;
1561     }
1562     blk = blk_new(ctx, perm,
1563                   BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
1564                   BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD);
1565     ret = blk_insert_bs(blk, bs, errp);
1566     if (ret < 0) {
1567         goto fail;
1568     }
1569     blk_set_enable_write_cache(blk, !writethrough);
1570     blk_set_allow_aio_context_change(blk, true);
1571 
1572     exp->refcount = 1;
1573     QTAILQ_INIT(&exp->clients);
1574     exp->blk = blk;
1575     exp->name = g_strdup(name);
1576     assert(!desc || strlen(desc) <= NBD_MAX_STRING_SIZE);
1577     exp->description = g_strdup(desc);
1578     exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH |
1579                      NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE);
1580     if (readonly) {
1581         exp->nbdflags |= NBD_FLAG_READ_ONLY;
1582         if (shared) {
1583             exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN;
1584         }
1585     } else {
1586         exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES |
1587                           NBD_FLAG_SEND_FAST_ZERO);
1588     }
1589     exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
1590 
1591     if (bitmap) {
1592         BdrvDirtyBitmap *bm = NULL;
1593 
1594         while (bs) {
1595             bm = bdrv_find_dirty_bitmap(bs, bitmap);
1596             if (bm != NULL) {
1597                 break;
1598             }
1599 
1600             bs = bdrv_filter_or_cow_bs(bs);
1601         }
1602 
1603         if (bm == NULL) {
1604             error_setg(errp, "Bitmap '%s' is not found", bitmap);
1605             goto fail;
1606         }
1607 
1608         if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
1609             goto fail;
1610         }
1611 
1612         if (readonly && bdrv_is_writable(bs) &&
1613             bdrv_dirty_bitmap_enabled(bm)) {
1614             error_setg(errp,
1615                        "Enabled bitmap '%s' incompatible with readonly export",
1616                        bitmap);
1617             goto fail;
1618         }
1619 
1620         bdrv_dirty_bitmap_set_busy(bm, true);
1621         exp->export_bitmap = bm;
1622         assert(strlen(bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
1623         exp->export_bitmap_context = g_strdup_printf("qemu:dirty-bitmap:%s",
1624                                                      bitmap);
1625         assert(strlen(exp->export_bitmap_context) < NBD_MAX_STRING_SIZE);
1626     }
1627 
1628     exp->close = close;
1629     exp->ctx = ctx;
1630     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
1631 
1632     QTAILQ_INSERT_TAIL(&exports, exp, next);
1633     nbd_export_get(exp);
1634     return exp;
1635 
1636 fail:
1637     blk_unref(blk);
1638     g_free(exp->name);
1639     g_free(exp->description);
1640     g_free(exp);
1641     return NULL;
1642 }
1643 
1644 NBDExport *nbd_export_find(const char *name)
1645 {
1646     NBDExport *exp;
1647     QTAILQ_FOREACH(exp, &exports, next) {
1648         if (strcmp(name, exp->name) == 0) {
1649             return exp;
1650         }
1651     }
1652 
1653     return NULL;
1654 }
1655 
1656 AioContext *
1657 nbd_export_aio_context(NBDExport *exp)
1658 {
1659     return exp->ctx;
1660 }
1661 
1662 void nbd_export_close(NBDExport *exp)
1663 {
1664     NBDClient *client, *next;
1665 
1666     nbd_export_get(exp);
1667     /*
1668      * TODO: Should we expand QMP NbdServerRemoveNode enum to allow a
1669      * close mode that stops advertising the export to new clients but
1670      * still permits existing clients to run to completion? Because of
1671      * that possibility, nbd_export_close() can be called more than
1672      * once on an export.
1673      */
1674     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
1675         client_close(client, true);
1676     }
1677     if (exp->name) {
1678         nbd_export_put(exp);
1679         g_free(exp->name);
1680         exp->name = NULL;
1681         QTAILQ_REMOVE(&exports, exp, next);
1682         QTAILQ_INSERT_TAIL(&closed_exports, exp, next);
1683     }
1684     g_free(exp->description);
1685     exp->description = NULL;
1686     nbd_export_put(exp);
1687 }
1688 
1689 void nbd_export_remove(NBDExport *exp, NbdServerRemoveMode mode, Error **errp)
1690 {
1691     ERRP_GUARD();
1692     if (mode == NBD_SERVER_REMOVE_MODE_HARD || QTAILQ_EMPTY(&exp->clients)) {
1693         nbd_export_close(exp);
1694         return;
1695     }
1696 
1697     assert(mode == NBD_SERVER_REMOVE_MODE_SAFE);
1698 
1699     error_setg(errp, "export '%s' still in use", exp->name);
1700     error_append_hint(errp, "Use mode='hard' to force client disconnect\n");
1701 }
1702 
1703 void nbd_export_get(NBDExport *exp)
1704 {
1705     assert(exp->refcount > 0);
1706     exp->refcount++;
1707 }
1708 
1709 void nbd_export_put(NBDExport *exp)
1710 {
1711     assert(exp->refcount > 0);
1712     if (exp->refcount == 1) {
1713         nbd_export_close(exp);
1714     }
1715 
1716     /* nbd_export_close() may theoretically reduce refcount to 0. It may happen
1717      * if someone calls nbd_export_put() on named export not through
1718      * nbd_export_set_name() when refcount is 1. So, let's assert that
1719      * it is > 0.
1720      */
1721     assert(exp->refcount > 0);
1722     if (--exp->refcount == 0) {
1723         assert(exp->name == NULL);
1724         assert(exp->description == NULL);
1725 
1726         if (exp->close) {
1727             exp->close(exp);
1728         }
1729 
1730         if (exp->blk) {
1731             if (exp->eject_notifier_blk) {
1732                 notifier_remove(&exp->eject_notifier);
1733                 blk_unref(exp->eject_notifier_blk);
1734             }
1735             blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
1736                                             blk_aio_detach, exp);
1737             blk_unref(exp->blk);
1738             exp->blk = NULL;
1739         }
1740 
1741         if (exp->export_bitmap) {
1742             bdrv_dirty_bitmap_set_busy(exp->export_bitmap, false);
1743             g_free(exp->export_bitmap_context);
1744         }
1745 
1746         QTAILQ_REMOVE(&closed_exports, exp, next);
1747         g_free(exp);
1748         aio_wait_kick();
1749     }
1750 }
1751 
1752 const BlockExportDriver blk_exp_nbd = {
1753     .type               = BLOCK_EXPORT_TYPE_NBD,
1754     .create             = nbd_export_create,
1755 };
1756 
1757 void nbd_export_close_all(void)
1758 {
1759     NBDExport *exp, *next;
1760     AioContext *aio_context;
1761 
1762     QTAILQ_FOREACH_SAFE(exp, &exports, next, next) {
1763         aio_context = exp->ctx;
1764         aio_context_acquire(aio_context);
1765         nbd_export_close(exp);
1766         aio_context_release(aio_context);
1767     }
1768 
1769     AIO_WAIT_WHILE(NULL, !(QTAILQ_EMPTY(&exports) &&
1770                            QTAILQ_EMPTY(&closed_exports)));
1771 }
1772 
1773 static int coroutine_fn nbd_co_send_iov(NBDClient *client, struct iovec *iov,
1774                                         unsigned niov, Error **errp)
1775 {
1776     int ret;
1777 
1778     g_assert(qemu_in_coroutine());
1779     qemu_co_mutex_lock(&client->send_lock);
1780     client->send_coroutine = qemu_coroutine_self();
1781 
1782     ret = qio_channel_writev_all(client->ioc, iov, niov, errp) < 0 ? -EIO : 0;
1783 
1784     client->send_coroutine = NULL;
1785     qemu_co_mutex_unlock(&client->send_lock);
1786 
1787     return ret;
1788 }
1789 
1790 static inline void set_be_simple_reply(NBDSimpleReply *reply, uint64_t error,
1791                                        uint64_t handle)
1792 {
1793     stl_be_p(&reply->magic, NBD_SIMPLE_REPLY_MAGIC);
1794     stl_be_p(&reply->error, error);
1795     stq_be_p(&reply->handle, handle);
1796 }
1797 
1798 static int nbd_co_send_simple_reply(NBDClient *client,
1799                                     uint64_t handle,
1800                                     uint32_t error,
1801                                     void *data,
1802                                     size_t len,
1803                                     Error **errp)
1804 {
1805     NBDSimpleReply reply;
1806     int nbd_err = system_errno_to_nbd_errno(error);
1807     struct iovec iov[] = {
1808         {.iov_base = &reply, .iov_len = sizeof(reply)},
1809         {.iov_base = data, .iov_len = len}
1810     };
1811 
1812     trace_nbd_co_send_simple_reply(handle, nbd_err, nbd_err_lookup(nbd_err),
1813                                    len);
1814     set_be_simple_reply(&reply, nbd_err, handle);
1815 
1816     return nbd_co_send_iov(client, iov, len ? 2 : 1, errp);
1817 }
1818 
1819 static inline void set_be_chunk(NBDStructuredReplyChunk *chunk, uint16_t flags,
1820                                 uint16_t type, uint64_t handle, uint32_t length)
1821 {
1822     stl_be_p(&chunk->magic, NBD_STRUCTURED_REPLY_MAGIC);
1823     stw_be_p(&chunk->flags, flags);
1824     stw_be_p(&chunk->type, type);
1825     stq_be_p(&chunk->handle, handle);
1826     stl_be_p(&chunk->length, length);
1827 }
1828 
1829 static int coroutine_fn nbd_co_send_structured_done(NBDClient *client,
1830                                                     uint64_t handle,
1831                                                     Error **errp)
1832 {
1833     NBDStructuredReplyChunk chunk;
1834     struct iovec iov[] = {
1835         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1836     };
1837 
1838     trace_nbd_co_send_structured_done(handle);
1839     set_be_chunk(&chunk, NBD_REPLY_FLAG_DONE, NBD_REPLY_TYPE_NONE, handle, 0);
1840 
1841     return nbd_co_send_iov(client, iov, 1, errp);
1842 }
1843 
1844 static int coroutine_fn nbd_co_send_structured_read(NBDClient *client,
1845                                                     uint64_t handle,
1846                                                     uint64_t offset,
1847                                                     void *data,
1848                                                     size_t size,
1849                                                     bool final,
1850                                                     Error **errp)
1851 {
1852     NBDStructuredReadData chunk;
1853     struct iovec iov[] = {
1854         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1855         {.iov_base = data, .iov_len = size}
1856     };
1857 
1858     assert(size);
1859     trace_nbd_co_send_structured_read(handle, offset, data, size);
1860     set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
1861                  NBD_REPLY_TYPE_OFFSET_DATA, handle,
1862                  sizeof(chunk) - sizeof(chunk.h) + size);
1863     stq_be_p(&chunk.offset, offset);
1864 
1865     return nbd_co_send_iov(client, iov, 2, errp);
1866 }
1867 
1868 static int coroutine_fn nbd_co_send_structured_error(NBDClient *client,
1869                                                      uint64_t handle,
1870                                                      uint32_t error,
1871                                                      const char *msg,
1872                                                      Error **errp)
1873 {
1874     NBDStructuredError chunk;
1875     int nbd_err = system_errno_to_nbd_errno(error);
1876     struct iovec iov[] = {
1877         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1878         {.iov_base = (char *)msg, .iov_len = msg ? strlen(msg) : 0},
1879     };
1880 
1881     assert(nbd_err);
1882     trace_nbd_co_send_structured_error(handle, nbd_err,
1883                                        nbd_err_lookup(nbd_err), msg ? msg : "");
1884     set_be_chunk(&chunk.h, NBD_REPLY_FLAG_DONE, NBD_REPLY_TYPE_ERROR, handle,
1885                  sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len);
1886     stl_be_p(&chunk.error, nbd_err);
1887     stw_be_p(&chunk.message_length, iov[1].iov_len);
1888 
1889     return nbd_co_send_iov(client, iov, 1 + !!iov[1].iov_len, errp);
1890 }
1891 
1892 /* Do a sparse read and send the structured reply to the client.
1893  * Returns -errno if sending fails. bdrv_block_status_above() failure is
1894  * reported to the client, at which point this function succeeds.
1895  */
1896 static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
1897                                                 uint64_t handle,
1898                                                 uint64_t offset,
1899                                                 uint8_t *data,
1900                                                 size_t size,
1901                                                 Error **errp)
1902 {
1903     int ret = 0;
1904     NBDExport *exp = client->exp;
1905     size_t progress = 0;
1906 
1907     while (progress < size) {
1908         int64_t pnum;
1909         int status = bdrv_block_status_above(blk_bs(exp->blk), NULL,
1910                                              offset + progress,
1911                                              size - progress, &pnum, NULL,
1912                                              NULL);
1913         bool final;
1914 
1915         if (status < 0) {
1916             char *msg = g_strdup_printf("unable to check for holes: %s",
1917                                         strerror(-status));
1918 
1919             ret = nbd_co_send_structured_error(client, handle, -status, msg,
1920                                                errp);
1921             g_free(msg);
1922             return ret;
1923         }
1924         assert(pnum && pnum <= size - progress);
1925         final = progress + pnum == size;
1926         if (status & BDRV_BLOCK_ZERO) {
1927             NBDStructuredReadHole chunk;
1928             struct iovec iov[] = {
1929                 {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1930             };
1931 
1932             trace_nbd_co_send_structured_read_hole(handle, offset + progress,
1933                                                    pnum);
1934             set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
1935                          NBD_REPLY_TYPE_OFFSET_HOLE,
1936                          handle, sizeof(chunk) - sizeof(chunk.h));
1937             stq_be_p(&chunk.offset, offset + progress);
1938             stl_be_p(&chunk.length, pnum);
1939             ret = nbd_co_send_iov(client, iov, 1, errp);
1940         } else {
1941             ret = blk_pread(exp->blk, offset + progress, data + progress, pnum);
1942             if (ret < 0) {
1943                 error_setg_errno(errp, -ret, "reading from file failed");
1944                 break;
1945             }
1946             ret = nbd_co_send_structured_read(client, handle, offset + progress,
1947                                               data + progress, pnum, final,
1948                                               errp);
1949         }
1950 
1951         if (ret < 0) {
1952             break;
1953         }
1954         progress += pnum;
1955     }
1956     return ret;
1957 }
1958 
1959 typedef struct NBDExtentArray {
1960     NBDExtent *extents;
1961     unsigned int nb_alloc;
1962     unsigned int count;
1963     uint64_t total_length;
1964     bool can_add;
1965     bool converted_to_be;
1966 } NBDExtentArray;
1967 
1968 static NBDExtentArray *nbd_extent_array_new(unsigned int nb_alloc)
1969 {
1970     NBDExtentArray *ea = g_new0(NBDExtentArray, 1);
1971 
1972     ea->nb_alloc = nb_alloc;
1973     ea->extents = g_new(NBDExtent, nb_alloc);
1974     ea->can_add = true;
1975 
1976     return ea;
1977 }
1978 
1979 static void nbd_extent_array_free(NBDExtentArray *ea)
1980 {
1981     g_free(ea->extents);
1982     g_free(ea);
1983 }
1984 G_DEFINE_AUTOPTR_CLEANUP_FUNC(NBDExtentArray, nbd_extent_array_free);
1985 
1986 /* Further modifications of the array after conversion are abandoned */
1987 static void nbd_extent_array_convert_to_be(NBDExtentArray *ea)
1988 {
1989     int i;
1990 
1991     assert(!ea->converted_to_be);
1992     ea->can_add = false;
1993     ea->converted_to_be = true;
1994 
1995     for (i = 0; i < ea->count; i++) {
1996         ea->extents[i].flags = cpu_to_be32(ea->extents[i].flags);
1997         ea->extents[i].length = cpu_to_be32(ea->extents[i].length);
1998     }
1999 }
2000 
2001 /*
2002  * Add extent to NBDExtentArray. If extent can't be added (no available space),
2003  * return -1.
2004  * For safety, when returning -1 for the first time, .can_add is set to false,
2005  * further call to nbd_extent_array_add() will crash.
2006  * (to avoid the situation, when after failing to add an extent (returned -1),
2007  * user miss this failure and add another extent, which is successfully added
2008  * (array is full, but new extent may be squashed into the last one), then we
2009  * have invalid array with skipped extent)
2010  */
2011 static int nbd_extent_array_add(NBDExtentArray *ea,
2012                                 uint32_t length, uint32_t flags)
2013 {
2014     assert(ea->can_add);
2015 
2016     if (!length) {
2017         return 0;
2018     }
2019 
2020     /* Extend previous extent if flags are the same */
2021     if (ea->count > 0 && flags == ea->extents[ea->count - 1].flags) {
2022         uint64_t sum = (uint64_t)length + ea->extents[ea->count - 1].length;
2023 
2024         if (sum <= UINT32_MAX) {
2025             ea->extents[ea->count - 1].length = sum;
2026             ea->total_length += length;
2027             return 0;
2028         }
2029     }
2030 
2031     if (ea->count >= ea->nb_alloc) {
2032         ea->can_add = false;
2033         return -1;
2034     }
2035 
2036     ea->total_length += length;
2037     ea->extents[ea->count] = (NBDExtent) {.length = length, .flags = flags};
2038     ea->count++;
2039 
2040     return 0;
2041 }
2042 
2043 static int blockstatus_to_extents(BlockDriverState *bs, uint64_t offset,
2044                                   uint64_t bytes, NBDExtentArray *ea)
2045 {
2046     while (bytes) {
2047         uint32_t flags;
2048         int64_t num;
2049         int ret = bdrv_block_status_above(bs, NULL, offset, bytes, &num,
2050                                           NULL, NULL);
2051 
2052         if (ret < 0) {
2053             return ret;
2054         }
2055 
2056         flags = (ret & BDRV_BLOCK_ALLOCATED ? 0 : NBD_STATE_HOLE) |
2057                 (ret & BDRV_BLOCK_ZERO      ? NBD_STATE_ZERO : 0);
2058 
2059         if (nbd_extent_array_add(ea, num, flags) < 0) {
2060             return 0;
2061         }
2062 
2063         offset += num;
2064         bytes -= num;
2065     }
2066 
2067     return 0;
2068 }
2069 
2070 /*
2071  * nbd_co_send_extents
2072  *
2073  * @ea is converted to BE by the function
2074  * @last controls whether NBD_REPLY_FLAG_DONE is sent.
2075  */
2076 static int nbd_co_send_extents(NBDClient *client, uint64_t handle,
2077                                NBDExtentArray *ea,
2078                                bool last, uint32_t context_id, Error **errp)
2079 {
2080     NBDStructuredMeta chunk;
2081     struct iovec iov[] = {
2082         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
2083         {.iov_base = ea->extents, .iov_len = ea->count * sizeof(ea->extents[0])}
2084     };
2085 
2086     nbd_extent_array_convert_to_be(ea);
2087 
2088     trace_nbd_co_send_extents(handle, ea->count, context_id, ea->total_length,
2089                               last);
2090     set_be_chunk(&chunk.h, last ? NBD_REPLY_FLAG_DONE : 0,
2091                  NBD_REPLY_TYPE_BLOCK_STATUS,
2092                  handle, sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len);
2093     stl_be_p(&chunk.context_id, context_id);
2094 
2095     return nbd_co_send_iov(client, iov, 2, errp);
2096 }
2097 
2098 /* Get block status from the exported device and send it to the client */
2099 static int nbd_co_send_block_status(NBDClient *client, uint64_t handle,
2100                                     BlockDriverState *bs, uint64_t offset,
2101                                     uint32_t length, bool dont_fragment,
2102                                     bool last, uint32_t context_id,
2103                                     Error **errp)
2104 {
2105     int ret;
2106     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2107     g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
2108 
2109     ret = blockstatus_to_extents(bs, offset, length, ea);
2110     if (ret < 0) {
2111         return nbd_co_send_structured_error(
2112                 client, handle, -ret, "can't get block status", errp);
2113     }
2114 
2115     return nbd_co_send_extents(client, handle, ea, last, context_id, errp);
2116 }
2117 
2118 /* Populate @ea from a dirty bitmap. */
2119 static void bitmap_to_extents(BdrvDirtyBitmap *bitmap,
2120                               uint64_t offset, uint64_t length,
2121                               NBDExtentArray *es)
2122 {
2123     int64_t start, dirty_start, dirty_count;
2124     int64_t end = offset + length;
2125     bool full = false;
2126 
2127     bdrv_dirty_bitmap_lock(bitmap);
2128 
2129     for (start = offset;
2130          bdrv_dirty_bitmap_next_dirty_area(bitmap, start, end, INT32_MAX,
2131                                            &dirty_start, &dirty_count);
2132          start = dirty_start + dirty_count)
2133     {
2134         if ((nbd_extent_array_add(es, dirty_start - start, 0) < 0) ||
2135             (nbd_extent_array_add(es, dirty_count, NBD_STATE_DIRTY) < 0))
2136         {
2137             full = true;
2138             break;
2139         }
2140     }
2141 
2142     if (!full) {
2143         /* last non dirty extent */
2144         nbd_extent_array_add(es, end - start, 0);
2145     }
2146 
2147     bdrv_dirty_bitmap_unlock(bitmap);
2148 }
2149 
2150 static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
2151                               BdrvDirtyBitmap *bitmap, uint64_t offset,
2152                               uint32_t length, bool dont_fragment, bool last,
2153                               uint32_t context_id, Error **errp)
2154 {
2155     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2156     g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
2157 
2158     bitmap_to_extents(bitmap, offset, length, ea);
2159 
2160     return nbd_co_send_extents(client, handle, ea, last, context_id, errp);
2161 }
2162 
2163 /* nbd_co_receive_request
2164  * Collect a client request. Return 0 if request looks valid, -EIO to drop
2165  * connection right away, and any other negative value to report an error to
2166  * the client (although the caller may still need to disconnect after reporting
2167  * the error).
2168  */
2169 static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
2170                                   Error **errp)
2171 {
2172     NBDClient *client = req->client;
2173     int valid_flags;
2174 
2175     g_assert(qemu_in_coroutine());
2176     assert(client->recv_coroutine == qemu_coroutine_self());
2177     if (nbd_receive_request(client->ioc, request, errp) < 0) {
2178         return -EIO;
2179     }
2180 
2181     trace_nbd_co_receive_request_decode_type(request->handle, request->type,
2182                                              nbd_cmd_lookup(request->type));
2183 
2184     if (request->type != NBD_CMD_WRITE) {
2185         /* No payload, we are ready to read the next request.  */
2186         req->complete = true;
2187     }
2188 
2189     if (request->type == NBD_CMD_DISC) {
2190         /* Special case: we're going to disconnect without a reply,
2191          * whether or not flags, from, or len are bogus */
2192         return -EIO;
2193     }
2194 
2195     if (request->type == NBD_CMD_READ || request->type == NBD_CMD_WRITE ||
2196         request->type == NBD_CMD_CACHE)
2197     {
2198         if (request->len > NBD_MAX_BUFFER_SIZE) {
2199             error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
2200                        request->len, NBD_MAX_BUFFER_SIZE);
2201             return -EINVAL;
2202         }
2203 
2204         if (request->type != NBD_CMD_CACHE) {
2205             req->data = blk_try_blockalign(client->exp->blk, request->len);
2206             if (req->data == NULL) {
2207                 error_setg(errp, "No memory");
2208                 return -ENOMEM;
2209             }
2210         }
2211     }
2212 
2213     if (request->type == NBD_CMD_WRITE) {
2214         if (nbd_read(client->ioc, req->data, request->len, "CMD_WRITE data",
2215                      errp) < 0)
2216         {
2217             return -EIO;
2218         }
2219         req->complete = true;
2220 
2221         trace_nbd_co_receive_request_payload_received(request->handle,
2222                                                       request->len);
2223     }
2224 
2225     /* Sanity checks. */
2226     if (client->exp->nbdflags & NBD_FLAG_READ_ONLY &&
2227         (request->type == NBD_CMD_WRITE ||
2228          request->type == NBD_CMD_WRITE_ZEROES ||
2229          request->type == NBD_CMD_TRIM)) {
2230         error_setg(errp, "Export is read-only");
2231         return -EROFS;
2232     }
2233     if (request->from > client->exp->size ||
2234         request->len > client->exp->size - request->from) {
2235         error_setg(errp, "operation past EOF; From: %" PRIu64 ", Len: %" PRIu32
2236                    ", Size: %" PRIu64, request->from, request->len,
2237                    client->exp->size);
2238         return (request->type == NBD_CMD_WRITE ||
2239                 request->type == NBD_CMD_WRITE_ZEROES) ? -ENOSPC : -EINVAL;
2240     }
2241     if (client->check_align && !QEMU_IS_ALIGNED(request->from | request->len,
2242                                                 client->check_align)) {
2243         /*
2244          * The block layer gracefully handles unaligned requests, but
2245          * it's still worth tracing client non-compliance
2246          */
2247         trace_nbd_co_receive_align_compliance(nbd_cmd_lookup(request->type),
2248                                               request->from,
2249                                               request->len,
2250                                               client->check_align);
2251     }
2252     valid_flags = NBD_CMD_FLAG_FUA;
2253     if (request->type == NBD_CMD_READ && client->structured_reply) {
2254         valid_flags |= NBD_CMD_FLAG_DF;
2255     } else if (request->type == NBD_CMD_WRITE_ZEROES) {
2256         valid_flags |= NBD_CMD_FLAG_NO_HOLE | NBD_CMD_FLAG_FAST_ZERO;
2257     } else if (request->type == NBD_CMD_BLOCK_STATUS) {
2258         valid_flags |= NBD_CMD_FLAG_REQ_ONE;
2259     }
2260     if (request->flags & ~valid_flags) {
2261         error_setg(errp, "unsupported flags for command %s (got 0x%x)",
2262                    nbd_cmd_lookup(request->type), request->flags);
2263         return -EINVAL;
2264     }
2265 
2266     return 0;
2267 }
2268 
2269 /* Send simple reply without a payload, or a structured error
2270  * @error_msg is ignored if @ret >= 0
2271  * Returns 0 if connection is still live, -errno on failure to talk to client
2272  */
2273 static coroutine_fn int nbd_send_generic_reply(NBDClient *client,
2274                                                uint64_t handle,
2275                                                int ret,
2276                                                const char *error_msg,
2277                                                Error **errp)
2278 {
2279     if (client->structured_reply && ret < 0) {
2280         return nbd_co_send_structured_error(client, handle, -ret, error_msg,
2281                                             errp);
2282     } else {
2283         return nbd_co_send_simple_reply(client, handle, ret < 0 ? -ret : 0,
2284                                         NULL, 0, errp);
2285     }
2286 }
2287 
2288 /* Handle NBD_CMD_READ request.
2289  * Return -errno if sending fails. Other errors are reported directly to the
2290  * client as an error reply. */
2291 static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
2292                                         uint8_t *data, Error **errp)
2293 {
2294     int ret;
2295     NBDExport *exp = client->exp;
2296 
2297     assert(request->type == NBD_CMD_READ);
2298 
2299     /* XXX: NBD Protocol only documents use of FUA with WRITE */
2300     if (request->flags & NBD_CMD_FLAG_FUA) {
2301         ret = blk_co_flush(exp->blk);
2302         if (ret < 0) {
2303             return nbd_send_generic_reply(client, request->handle, ret,
2304                                           "flush failed", errp);
2305         }
2306     }
2307 
2308     if (client->structured_reply && !(request->flags & NBD_CMD_FLAG_DF) &&
2309         request->len)
2310     {
2311         return nbd_co_send_sparse_read(client, request->handle, request->from,
2312                                        data, request->len, errp);
2313     }
2314 
2315     ret = blk_pread(exp->blk, request->from, data, request->len);
2316     if (ret < 0) {
2317         return nbd_send_generic_reply(client, request->handle, ret,
2318                                       "reading from file failed", errp);
2319     }
2320 
2321     if (client->structured_reply) {
2322         if (request->len) {
2323             return nbd_co_send_structured_read(client, request->handle,
2324                                                request->from, data,
2325                                                request->len, true, errp);
2326         } else {
2327             return nbd_co_send_structured_done(client, request->handle, errp);
2328         }
2329     } else {
2330         return nbd_co_send_simple_reply(client, request->handle, 0,
2331                                         data, request->len, errp);
2332     }
2333 }
2334 
2335 /*
2336  * nbd_do_cmd_cache
2337  *
2338  * Handle NBD_CMD_CACHE request.
2339  * Return -errno if sending fails. Other errors are reported directly to the
2340  * client as an error reply.
2341  */
2342 static coroutine_fn int nbd_do_cmd_cache(NBDClient *client, NBDRequest *request,
2343                                          Error **errp)
2344 {
2345     int ret;
2346     NBDExport *exp = client->exp;
2347 
2348     assert(request->type == NBD_CMD_CACHE);
2349 
2350     ret = blk_co_preadv(exp->blk, request->from, request->len,
2351                         NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
2352 
2353     return nbd_send_generic_reply(client, request->handle, ret,
2354                                   "caching data failed", errp);
2355 }
2356 
2357 /* Handle NBD request.
2358  * Return -errno if sending fails. Other errors are reported directly to the
2359  * client as an error reply. */
2360 static coroutine_fn int nbd_handle_request(NBDClient *client,
2361                                            NBDRequest *request,
2362                                            uint8_t *data, Error **errp)
2363 {
2364     int ret;
2365     int flags;
2366     NBDExport *exp = client->exp;
2367     char *msg;
2368 
2369     switch (request->type) {
2370     case NBD_CMD_CACHE:
2371         return nbd_do_cmd_cache(client, request, errp);
2372 
2373     case NBD_CMD_READ:
2374         return nbd_do_cmd_read(client, request, data, errp);
2375 
2376     case NBD_CMD_WRITE:
2377         flags = 0;
2378         if (request->flags & NBD_CMD_FLAG_FUA) {
2379             flags |= BDRV_REQ_FUA;
2380         }
2381         ret = blk_pwrite(exp->blk, request->from, data, request->len, flags);
2382         return nbd_send_generic_reply(client, request->handle, ret,
2383                                       "writing to file failed", errp);
2384 
2385     case NBD_CMD_WRITE_ZEROES:
2386         flags = 0;
2387         if (request->flags & NBD_CMD_FLAG_FUA) {
2388             flags |= BDRV_REQ_FUA;
2389         }
2390         if (!(request->flags & NBD_CMD_FLAG_NO_HOLE)) {
2391             flags |= BDRV_REQ_MAY_UNMAP;
2392         }
2393         if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
2394             flags |= BDRV_REQ_NO_FALLBACK;
2395         }
2396         ret = 0;
2397         /* FIXME simplify this when blk_pwrite_zeroes switches to 64-bit */
2398         while (ret >= 0 && request->len) {
2399             int align = client->check_align ?: 1;
2400             int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES,
2401                                                         align));
2402             ret = blk_pwrite_zeroes(exp->blk, request->from, len, flags);
2403             request->len -= len;
2404             request->from += len;
2405         }
2406         return nbd_send_generic_reply(client, request->handle, ret,
2407                                       "writing to file failed", errp);
2408 
2409     case NBD_CMD_DISC:
2410         /* unreachable, thanks to special case in nbd_co_receive_request() */
2411         abort();
2412 
2413     case NBD_CMD_FLUSH:
2414         ret = blk_co_flush(exp->blk);
2415         return nbd_send_generic_reply(client, request->handle, ret,
2416                                       "flush failed", errp);
2417 
2418     case NBD_CMD_TRIM:
2419         ret = 0;
2420         /* FIXME simplify this when blk_co_pdiscard switches to 64-bit */
2421         while (ret >= 0 && request->len) {
2422             int align = client->check_align ?: 1;
2423             int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES,
2424                                                         align));
2425             ret = blk_co_pdiscard(exp->blk, request->from, len);
2426             request->len -= len;
2427             request->from += len;
2428         }
2429         if (ret >= 0 && request->flags & NBD_CMD_FLAG_FUA) {
2430             ret = blk_co_flush(exp->blk);
2431         }
2432         return nbd_send_generic_reply(client, request->handle, ret,
2433                                       "discard failed", errp);
2434 
2435     case NBD_CMD_BLOCK_STATUS:
2436         if (!request->len) {
2437             return nbd_send_generic_reply(client, request->handle, -EINVAL,
2438                                           "need non-zero length", errp);
2439         }
2440         if (client->export_meta.valid &&
2441             (client->export_meta.base_allocation ||
2442              client->export_meta.bitmap))
2443         {
2444             bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE;
2445 
2446             if (client->export_meta.base_allocation) {
2447                 ret = nbd_co_send_block_status(client, request->handle,
2448                                                blk_bs(exp->blk), request->from,
2449                                                request->len, dont_fragment,
2450                                                !client->export_meta.bitmap,
2451                                                NBD_META_ID_BASE_ALLOCATION,
2452                                                errp);
2453                 if (ret < 0) {
2454                     return ret;
2455                 }
2456             }
2457 
2458             if (client->export_meta.bitmap) {
2459                 ret = nbd_co_send_bitmap(client, request->handle,
2460                                          client->exp->export_bitmap,
2461                                          request->from, request->len,
2462                                          dont_fragment,
2463                                          true, NBD_META_ID_DIRTY_BITMAP, errp);
2464                 if (ret < 0) {
2465                     return ret;
2466                 }
2467             }
2468 
2469             return 0;
2470         } else {
2471             return nbd_send_generic_reply(client, request->handle, -EINVAL,
2472                                           "CMD_BLOCK_STATUS not negotiated",
2473                                           errp);
2474         }
2475 
2476     default:
2477         msg = g_strdup_printf("invalid request type (%" PRIu32 ") received",
2478                               request->type);
2479         ret = nbd_send_generic_reply(client, request->handle, -EINVAL, msg,
2480                                      errp);
2481         g_free(msg);
2482         return ret;
2483     }
2484 }
2485 
2486 /* Owns a reference to the NBDClient passed as opaque.  */
2487 static coroutine_fn void nbd_trip(void *opaque)
2488 {
2489     NBDClient *client = opaque;
2490     NBDRequestData *req;
2491     NBDRequest request = { 0 };    /* GCC thinks it can be used uninitialized */
2492     int ret;
2493     Error *local_err = NULL;
2494 
2495     trace_nbd_trip();
2496     if (client->closing) {
2497         nbd_client_put(client);
2498         return;
2499     }
2500 
2501     req = nbd_request_get(client);
2502     ret = nbd_co_receive_request(req, &request, &local_err);
2503     client->recv_coroutine = NULL;
2504 
2505     if (client->closing) {
2506         /*
2507          * The client may be closed when we are blocked in
2508          * nbd_co_receive_request()
2509          */
2510         goto done;
2511     }
2512 
2513     nbd_client_receive_next_request(client);
2514     if (ret == -EIO) {
2515         goto disconnect;
2516     }
2517 
2518     if (ret < 0) {
2519         /* It wans't -EIO, so, according to nbd_co_receive_request()
2520          * semantics, we should return the error to the client. */
2521         Error *export_err = local_err;
2522 
2523         local_err = NULL;
2524         ret = nbd_send_generic_reply(client, request.handle, -EINVAL,
2525                                      error_get_pretty(export_err), &local_err);
2526         error_free(export_err);
2527     } else {
2528         ret = nbd_handle_request(client, &request, req->data, &local_err);
2529     }
2530     if (ret < 0) {
2531         error_prepend(&local_err, "Failed to send reply: ");
2532         goto disconnect;
2533     }
2534 
2535     /* We must disconnect after NBD_CMD_WRITE if we did not
2536      * read the payload.
2537      */
2538     if (!req->complete) {
2539         error_setg(&local_err, "Request handling failed in intermediate state");
2540         goto disconnect;
2541     }
2542 
2543 done:
2544     nbd_request_put(req);
2545     nbd_client_put(client);
2546     return;
2547 
2548 disconnect:
2549     if (local_err) {
2550         error_reportf_err(local_err, "Disconnect client, due to: ");
2551     }
2552     nbd_request_put(req);
2553     client_close(client, true);
2554     nbd_client_put(client);
2555 }
2556 
2557 static void nbd_client_receive_next_request(NBDClient *client)
2558 {
2559     if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
2560         nbd_client_get(client);
2561         client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
2562         aio_co_schedule(client->exp->ctx, client->recv_coroutine);
2563     }
2564 }
2565 
2566 static coroutine_fn void nbd_co_client_start(void *opaque)
2567 {
2568     NBDClient *client = opaque;
2569     Error *local_err = NULL;
2570 
2571     qemu_co_mutex_init(&client->send_lock);
2572 
2573     if (nbd_negotiate(client, &local_err)) {
2574         if (local_err) {
2575             error_report_err(local_err);
2576         }
2577         client_close(client, false);
2578         return;
2579     }
2580 
2581     nbd_client_receive_next_request(client);
2582 }
2583 
2584 /*
2585  * Create a new client listener using the given channel @sioc.
2586  * Begin servicing it in a coroutine.  When the connection closes, call
2587  * @close_fn with an indication of whether the client completed negotiation.
2588  */
2589 void nbd_client_new(QIOChannelSocket *sioc,
2590                     QCryptoTLSCreds *tlscreds,
2591                     const char *tlsauthz,
2592                     void (*close_fn)(NBDClient *, bool))
2593 {
2594     NBDClient *client;
2595     Coroutine *co;
2596 
2597     client = g_new0(NBDClient, 1);
2598     client->refcount = 1;
2599     client->tlscreds = tlscreds;
2600     if (tlscreds) {
2601         object_ref(OBJECT(client->tlscreds));
2602     }
2603     client->tlsauthz = g_strdup(tlsauthz);
2604     client->sioc = sioc;
2605     object_ref(OBJECT(client->sioc));
2606     client->ioc = QIO_CHANNEL(sioc);
2607     object_ref(OBJECT(client->ioc));
2608     client->close_fn = close_fn;
2609 
2610     co = qemu_coroutine_create(nbd_co_client_start, client);
2611     qemu_coroutine_enter(co);
2612 }
2613