xref: /openbmc/qemu/nbd/server.c (revision 2055dbc1)
1 /*
2  *  Copyright (C) 2016-2018 Red Hat, Inc.
3  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
4  *
5  *  Network Block Device Server Side
6  *
7  *  This program is free software; you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License as published by
9  *  the Free Software Foundation; under version 2 of the License.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "qemu/osdep.h"
21 #include "qapi/error.h"
22 #include "qemu/queue.h"
23 #include "trace.h"
24 #include "nbd-internal.h"
25 #include "qemu/units.h"
26 
27 #define NBD_META_ID_BASE_ALLOCATION 0
28 #define NBD_META_ID_DIRTY_BITMAP 1
29 
30 /*
31  * NBD_MAX_BLOCK_STATUS_EXTENTS: 1 MiB of extents data. An empirical
32  * constant. If an increase is needed, note that the NBD protocol
33  * recommends no larger than 32 mb, so that the client won't consider
34  * the reply as a denial of service attack.
35  */
36 #define NBD_MAX_BLOCK_STATUS_EXTENTS (1 * MiB / 8)
37 
38 static int system_errno_to_nbd_errno(int err)
39 {
40     switch (err) {
41     case 0:
42         return NBD_SUCCESS;
43     case EPERM:
44     case EROFS:
45         return NBD_EPERM;
46     case EIO:
47         return NBD_EIO;
48     case ENOMEM:
49         return NBD_ENOMEM;
50 #ifdef EDQUOT
51     case EDQUOT:
52 #endif
53     case EFBIG:
54     case ENOSPC:
55         return NBD_ENOSPC;
56     case EOVERFLOW:
57         return NBD_EOVERFLOW;
58     case ENOTSUP:
59 #if ENOTSUP != EOPNOTSUPP
60     case EOPNOTSUPP:
61 #endif
62         return NBD_ENOTSUP;
63     case ESHUTDOWN:
64         return NBD_ESHUTDOWN;
65     case EINVAL:
66     default:
67         return NBD_EINVAL;
68     }
69 }
70 
71 /* Definitions for opaque data types */
72 
73 typedef struct NBDRequestData NBDRequestData;
74 
75 struct NBDRequestData {
76     QSIMPLEQ_ENTRY(NBDRequestData) entry;
77     NBDClient *client;
78     uint8_t *data;
79     bool complete;
80 };
81 
82 struct NBDExport {
83     int refcount;
84     void (*close)(NBDExport *exp);
85 
86     BlockBackend *blk;
87     char *name;
88     char *description;
89     uint64_t dev_offset;
90     uint64_t size;
91     uint16_t nbdflags;
92     QTAILQ_HEAD(, NBDClient) clients;
93     QTAILQ_ENTRY(NBDExport) next;
94 
95     AioContext *ctx;
96 
97     BlockBackend *eject_notifier_blk;
98     Notifier eject_notifier;
99 
100     BdrvDirtyBitmap *export_bitmap;
101     char *export_bitmap_context;
102 };
103 
104 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
105 
106 /* NBDExportMetaContexts represents a list of contexts to be exported,
107  * as selected by NBD_OPT_SET_META_CONTEXT. Also used for
108  * NBD_OPT_LIST_META_CONTEXT. */
109 typedef struct NBDExportMetaContexts {
110     NBDExport *exp;
111     bool valid; /* means that negotiation of the option finished without
112                    errors */
113     bool base_allocation; /* export base:allocation context (block status) */
114     bool bitmap; /* export qemu:dirty-bitmap:<export bitmap name> */
115 } NBDExportMetaContexts;
116 
117 struct NBDClient {
118     int refcount;
119     void (*close_fn)(NBDClient *client, bool negotiated);
120 
121     NBDExport *exp;
122     QCryptoTLSCreds *tlscreds;
123     char *tlsauthz;
124     QIOChannelSocket *sioc; /* The underlying data channel */
125     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
126 
127     Coroutine *recv_coroutine;
128 
129     CoMutex send_lock;
130     Coroutine *send_coroutine;
131 
132     QTAILQ_ENTRY(NBDClient) next;
133     int nb_requests;
134     bool closing;
135 
136     uint32_t check_align; /* If non-zero, check for aligned client requests */
137 
138     bool structured_reply;
139     NBDExportMetaContexts export_meta;
140 
141     uint32_t opt; /* Current option being negotiated */
142     uint32_t optlen; /* remaining length of data in ioc for the option being
143                         negotiated now */
144 };
145 
146 static void nbd_client_receive_next_request(NBDClient *client);
147 
148 /* Basic flow for negotiation
149 
150    Server         Client
151    Negotiate
152 
153    or
154 
155    Server         Client
156    Negotiate #1
157                   Option
158    Negotiate #2
159 
160    ----
161 
162    followed by
163 
164    Server         Client
165                   Request
166    Response
167                   Request
168    Response
169                   ...
170    ...
171                   Request (type == 2)
172 
173 */
174 
175 static inline void set_be_option_rep(NBDOptionReply *rep, uint32_t option,
176                                      uint32_t type, uint32_t length)
177 {
178     stq_be_p(&rep->magic, NBD_REP_MAGIC);
179     stl_be_p(&rep->option, option);
180     stl_be_p(&rep->type, type);
181     stl_be_p(&rep->length, length);
182 }
183 
184 /* Send a reply header, including length, but no payload.
185  * Return -errno on error, 0 on success. */
186 static int nbd_negotiate_send_rep_len(NBDClient *client, uint32_t type,
187                                       uint32_t len, Error **errp)
188 {
189     NBDOptionReply rep;
190 
191     trace_nbd_negotiate_send_rep_len(client->opt, nbd_opt_lookup(client->opt),
192                                      type, nbd_rep_lookup(type), len);
193 
194     assert(len < NBD_MAX_BUFFER_SIZE);
195 
196     set_be_option_rep(&rep, client->opt, type, len);
197     return nbd_write(client->ioc, &rep, sizeof(rep), errp);
198 }
199 
200 /* Send a reply header with default 0 length.
201  * Return -errno on error, 0 on success. */
202 static int nbd_negotiate_send_rep(NBDClient *client, uint32_t type,
203                                   Error **errp)
204 {
205     return nbd_negotiate_send_rep_len(client, type, 0, errp);
206 }
207 
208 /* Send an error reply.
209  * Return -errno on error, 0 on success. */
210 static int GCC_FMT_ATTR(4, 0)
211 nbd_negotiate_send_rep_verr(NBDClient *client, uint32_t type,
212                             Error **errp, const char *fmt, va_list va)
213 {
214     g_autofree char *msg = NULL;
215     int ret;
216     size_t len;
217 
218     msg = g_strdup_vprintf(fmt, va);
219     len = strlen(msg);
220     assert(len < NBD_MAX_STRING_SIZE);
221     trace_nbd_negotiate_send_rep_err(msg);
222     ret = nbd_negotiate_send_rep_len(client, type, len, errp);
223     if (ret < 0) {
224         return ret;
225     }
226     if (nbd_write(client->ioc, msg, len, errp) < 0) {
227         error_prepend(errp, "write failed (error message): ");
228         return -EIO;
229     }
230 
231     return 0;
232 }
233 
234 /*
235  * Return a malloc'd copy of @name suitable for use in an error reply.
236  */
237 static char *
238 nbd_sanitize_name(const char *name)
239 {
240     if (strnlen(name, 80) < 80) {
241         return g_strdup(name);
242     }
243     /* XXX Should we also try to sanitize any control characters? */
244     return g_strdup_printf("%.80s...", name);
245 }
246 
247 /* Send an error reply.
248  * Return -errno on error, 0 on success. */
249 static int GCC_FMT_ATTR(4, 5)
250 nbd_negotiate_send_rep_err(NBDClient *client, uint32_t type,
251                            Error **errp, const char *fmt, ...)
252 {
253     va_list va;
254     int ret;
255 
256     va_start(va, fmt);
257     ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
258     va_end(va);
259     return ret;
260 }
261 
262 /* Drop remainder of the current option, and send a reply with the
263  * given error type and message. Return -errno on read or write
264  * failure; or 0 if connection is still live. */
265 static int GCC_FMT_ATTR(4, 0)
266 nbd_opt_vdrop(NBDClient *client, uint32_t type, Error **errp,
267               const char *fmt, va_list va)
268 {
269     int ret = nbd_drop(client->ioc, client->optlen, errp);
270 
271     client->optlen = 0;
272     if (!ret) {
273         ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
274     }
275     return ret;
276 }
277 
278 static int GCC_FMT_ATTR(4, 5)
279 nbd_opt_drop(NBDClient *client, uint32_t type, Error **errp,
280              const char *fmt, ...)
281 {
282     int ret;
283     va_list va;
284 
285     va_start(va, fmt);
286     ret = nbd_opt_vdrop(client, type, errp, fmt, va);
287     va_end(va);
288 
289     return ret;
290 }
291 
292 static int GCC_FMT_ATTR(3, 4)
293 nbd_opt_invalid(NBDClient *client, Error **errp, const char *fmt, ...)
294 {
295     int ret;
296     va_list va;
297 
298     va_start(va, fmt);
299     ret = nbd_opt_vdrop(client, NBD_REP_ERR_INVALID, errp, fmt, va);
300     va_end(va);
301 
302     return ret;
303 }
304 
305 /* Read size bytes from the unparsed payload of the current option.
306  * Return -errno on I/O error, 0 if option was completely handled by
307  * sending a reply about inconsistent lengths, or 1 on success. */
308 static int nbd_opt_read(NBDClient *client, void *buffer, size_t size,
309                         Error **errp)
310 {
311     if (size > client->optlen) {
312         return nbd_opt_invalid(client, errp,
313                                "Inconsistent lengths in option %s",
314                                nbd_opt_lookup(client->opt));
315     }
316     client->optlen -= size;
317     return qio_channel_read_all(client->ioc, buffer, size, errp) < 0 ? -EIO : 1;
318 }
319 
320 /* Drop size bytes from the unparsed payload of the current option.
321  * Return -errno on I/O error, 0 if option was completely handled by
322  * sending a reply about inconsistent lengths, or 1 on success. */
323 static int nbd_opt_skip(NBDClient *client, size_t size, Error **errp)
324 {
325     if (size > client->optlen) {
326         return nbd_opt_invalid(client, errp,
327                                "Inconsistent lengths in option %s",
328                                nbd_opt_lookup(client->opt));
329     }
330     client->optlen -= size;
331     return nbd_drop(client->ioc, size, errp) < 0 ? -EIO : 1;
332 }
333 
334 /* nbd_opt_read_name
335  *
336  * Read a string with the format:
337  *   uint32_t len     (<= NBD_MAX_STRING_SIZE)
338  *   len bytes string (not 0-terminated)
339  *
340  * On success, @name will be allocated.
341  * If @length is non-null, it will be set to the actual string length.
342  *
343  * Return -errno on I/O error, 0 if option was completely handled by
344  * sending a reply about inconsistent lengths, or 1 on success.
345  */
346 static int nbd_opt_read_name(NBDClient *client, char **name, uint32_t *length,
347                              Error **errp)
348 {
349     int ret;
350     uint32_t len;
351     g_autofree char *local_name = NULL;
352 
353     *name = NULL;
354     ret = nbd_opt_read(client, &len, sizeof(len), errp);
355     if (ret <= 0) {
356         return ret;
357     }
358     len = cpu_to_be32(len);
359 
360     if (len > NBD_MAX_STRING_SIZE) {
361         return nbd_opt_invalid(client, errp,
362                                "Invalid name length: %" PRIu32, len);
363     }
364 
365     local_name = g_malloc(len + 1);
366     ret = nbd_opt_read(client, local_name, len, errp);
367     if (ret <= 0) {
368         return ret;
369     }
370     local_name[len] = '\0';
371 
372     if (length) {
373         *length = len;
374     }
375     *name = g_steal_pointer(&local_name);
376 
377     return 1;
378 }
379 
380 /* Send a single NBD_REP_SERVER reply to NBD_OPT_LIST, including payload.
381  * Return -errno on error, 0 on success. */
382 static int nbd_negotiate_send_rep_list(NBDClient *client, NBDExport *exp,
383                                        Error **errp)
384 {
385     size_t name_len, desc_len;
386     uint32_t len;
387     const char *name = exp->name ? exp->name : "";
388     const char *desc = exp->description ? exp->description : "";
389     QIOChannel *ioc = client->ioc;
390     int ret;
391 
392     trace_nbd_negotiate_send_rep_list(name, desc);
393     name_len = strlen(name);
394     desc_len = strlen(desc);
395     assert(name_len <= NBD_MAX_STRING_SIZE && desc_len <= NBD_MAX_STRING_SIZE);
396     len = name_len + desc_len + sizeof(len);
397     ret = nbd_negotiate_send_rep_len(client, NBD_REP_SERVER, len, errp);
398     if (ret < 0) {
399         return ret;
400     }
401 
402     len = cpu_to_be32(name_len);
403     if (nbd_write(ioc, &len, sizeof(len), errp) < 0) {
404         error_prepend(errp, "write failed (name length): ");
405         return -EINVAL;
406     }
407 
408     if (nbd_write(ioc, name, name_len, errp) < 0) {
409         error_prepend(errp, "write failed (name buffer): ");
410         return -EINVAL;
411     }
412 
413     if (nbd_write(ioc, desc, desc_len, errp) < 0) {
414         error_prepend(errp, "write failed (description buffer): ");
415         return -EINVAL;
416     }
417 
418     return 0;
419 }
420 
421 /* Process the NBD_OPT_LIST command, with a potential series of replies.
422  * Return -errno on error, 0 on success. */
423 static int nbd_negotiate_handle_list(NBDClient *client, Error **errp)
424 {
425     NBDExport *exp;
426     assert(client->opt == NBD_OPT_LIST);
427 
428     /* For each export, send a NBD_REP_SERVER reply. */
429     QTAILQ_FOREACH(exp, &exports, next) {
430         if (nbd_negotiate_send_rep_list(client, exp, errp)) {
431             return -EINVAL;
432         }
433     }
434     /* Finish with a NBD_REP_ACK. */
435     return nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
436 }
437 
438 static void nbd_check_meta_export(NBDClient *client)
439 {
440     client->export_meta.valid &= client->exp == client->export_meta.exp;
441 }
442 
443 /* Send a reply to NBD_OPT_EXPORT_NAME.
444  * Return -errno on error, 0 on success. */
445 static int nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes,
446                                             Error **errp)
447 {
448     g_autofree char *name = NULL;
449     char buf[NBD_REPLY_EXPORT_NAME_SIZE] = "";
450     size_t len;
451     int ret;
452     uint16_t myflags;
453 
454     /* Client sends:
455         [20 ..  xx]   export name (length bytes)
456        Server replies:
457         [ 0 ..   7]   size
458         [ 8 ..   9]   export flags
459         [10 .. 133]   reserved     (0) [unless no_zeroes]
460      */
461     trace_nbd_negotiate_handle_export_name();
462     if (client->optlen > NBD_MAX_STRING_SIZE) {
463         error_setg(errp, "Bad length received");
464         return -EINVAL;
465     }
466     name = g_malloc(client->optlen + 1);
467     if (nbd_read(client->ioc, name, client->optlen, "export name", errp) < 0) {
468         return -EIO;
469     }
470     name[client->optlen] = '\0';
471     client->optlen = 0;
472 
473     trace_nbd_negotiate_handle_export_name_request(name);
474 
475     client->exp = nbd_export_find(name);
476     if (!client->exp) {
477         error_setg(errp, "export not found");
478         return -EINVAL;
479     }
480 
481     myflags = client->exp->nbdflags;
482     if (client->structured_reply) {
483         myflags |= NBD_FLAG_SEND_DF;
484     }
485     trace_nbd_negotiate_new_style_size_flags(client->exp->size, myflags);
486     stq_be_p(buf, client->exp->size);
487     stw_be_p(buf + 8, myflags);
488     len = no_zeroes ? 10 : sizeof(buf);
489     ret = nbd_write(client->ioc, buf, len, errp);
490     if (ret < 0) {
491         error_prepend(errp, "write failed: ");
492         return ret;
493     }
494 
495     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
496     nbd_export_get(client->exp);
497     nbd_check_meta_export(client);
498 
499     return 0;
500 }
501 
502 /* Send a single NBD_REP_INFO, with a buffer @buf of @length bytes.
503  * The buffer does NOT include the info type prefix.
504  * Return -errno on error, 0 if ready to send more. */
505 static int nbd_negotiate_send_info(NBDClient *client,
506                                    uint16_t info, uint32_t length, void *buf,
507                                    Error **errp)
508 {
509     int rc;
510 
511     trace_nbd_negotiate_send_info(info, nbd_info_lookup(info), length);
512     rc = nbd_negotiate_send_rep_len(client, NBD_REP_INFO,
513                                     sizeof(info) + length, errp);
514     if (rc < 0) {
515         return rc;
516     }
517     info = cpu_to_be16(info);
518     if (nbd_write(client->ioc, &info, sizeof(info), errp) < 0) {
519         return -EIO;
520     }
521     if (nbd_write(client->ioc, buf, length, errp) < 0) {
522         return -EIO;
523     }
524     return 0;
525 }
526 
527 /* nbd_reject_length: Handle any unexpected payload.
528  * @fatal requests that we quit talking to the client, even if we are able
529  * to successfully send an error reply.
530  * Return:
531  * -errno  transmission error occurred or @fatal was requested, errp is set
532  * 0       error message successfully sent to client, errp is not set
533  */
534 static int nbd_reject_length(NBDClient *client, bool fatal, Error **errp)
535 {
536     int ret;
537 
538     assert(client->optlen);
539     ret = nbd_opt_invalid(client, errp, "option '%s' has unexpected length",
540                           nbd_opt_lookup(client->opt));
541     if (fatal && !ret) {
542         error_setg(errp, "option '%s' has unexpected length",
543                    nbd_opt_lookup(client->opt));
544         return -EINVAL;
545     }
546     return ret;
547 }
548 
549 /* Handle NBD_OPT_INFO and NBD_OPT_GO.
550  * Return -errno on error, 0 if ready for next option, and 1 to move
551  * into transmission phase.  */
552 static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
553 {
554     int rc;
555     g_autofree char *name = NULL;
556     NBDExport *exp;
557     uint16_t requests;
558     uint16_t request;
559     uint32_t namelen;
560     bool sendname = false;
561     bool blocksize = false;
562     uint32_t sizes[3];
563     char buf[sizeof(uint64_t) + sizeof(uint16_t)];
564     uint32_t check_align = 0;
565     uint16_t myflags;
566 
567     /* Client sends:
568         4 bytes: L, name length (can be 0)
569         L bytes: export name
570         2 bytes: N, number of requests (can be 0)
571         N * 2 bytes: N requests
572     */
573     rc = nbd_opt_read_name(client, &name, &namelen, errp);
574     if (rc <= 0) {
575         return rc;
576     }
577     trace_nbd_negotiate_handle_export_name_request(name);
578 
579     rc = nbd_opt_read(client, &requests, sizeof(requests), errp);
580     if (rc <= 0) {
581         return rc;
582     }
583     requests = be16_to_cpu(requests);
584     trace_nbd_negotiate_handle_info_requests(requests);
585     while (requests--) {
586         rc = nbd_opt_read(client, &request, sizeof(request), errp);
587         if (rc <= 0) {
588             return rc;
589         }
590         request = be16_to_cpu(request);
591         trace_nbd_negotiate_handle_info_request(request,
592                                                 nbd_info_lookup(request));
593         /* We care about NBD_INFO_NAME and NBD_INFO_BLOCK_SIZE;
594          * everything else is either a request we don't know or
595          * something we send regardless of request */
596         switch (request) {
597         case NBD_INFO_NAME:
598             sendname = true;
599             break;
600         case NBD_INFO_BLOCK_SIZE:
601             blocksize = true;
602             break;
603         }
604     }
605     if (client->optlen) {
606         return nbd_reject_length(client, false, errp);
607     }
608 
609     exp = nbd_export_find(name);
610     if (!exp) {
611         g_autofree char *sane_name = nbd_sanitize_name(name);
612 
613         return nbd_negotiate_send_rep_err(client, NBD_REP_ERR_UNKNOWN,
614                                           errp, "export '%s' not present",
615                                           sane_name);
616     }
617 
618     /* Don't bother sending NBD_INFO_NAME unless client requested it */
619     if (sendname) {
620         rc = nbd_negotiate_send_info(client, NBD_INFO_NAME, namelen, name,
621                                      errp);
622         if (rc < 0) {
623             return rc;
624         }
625     }
626 
627     /* Send NBD_INFO_DESCRIPTION only if available, regardless of
628      * client request */
629     if (exp->description) {
630         size_t len = strlen(exp->description);
631 
632         assert(len <= NBD_MAX_STRING_SIZE);
633         rc = nbd_negotiate_send_info(client, NBD_INFO_DESCRIPTION,
634                                      len, exp->description, errp);
635         if (rc < 0) {
636             return rc;
637         }
638     }
639 
640     /* Send NBD_INFO_BLOCK_SIZE always, but tweak the minimum size
641      * according to whether the client requested it, and according to
642      * whether this is OPT_INFO or OPT_GO. */
643     /* minimum - 1 for back-compat, or actual if client will obey it. */
644     if (client->opt == NBD_OPT_INFO || blocksize) {
645         check_align = sizes[0] = blk_get_request_alignment(exp->blk);
646     } else {
647         sizes[0] = 1;
648     }
649     assert(sizes[0] <= NBD_MAX_BUFFER_SIZE);
650     /* preferred - Hard-code to 4096 for now.
651      * TODO: is blk_bs(blk)->bl.opt_transfer appropriate? */
652     sizes[1] = MAX(4096, sizes[0]);
653     /* maximum - At most 32M, but smaller as appropriate. */
654     sizes[2] = MIN(blk_get_max_transfer(exp->blk), NBD_MAX_BUFFER_SIZE);
655     trace_nbd_negotiate_handle_info_block_size(sizes[0], sizes[1], sizes[2]);
656     sizes[0] = cpu_to_be32(sizes[0]);
657     sizes[1] = cpu_to_be32(sizes[1]);
658     sizes[2] = cpu_to_be32(sizes[2]);
659     rc = nbd_negotiate_send_info(client, NBD_INFO_BLOCK_SIZE,
660                                  sizeof(sizes), sizes, errp);
661     if (rc < 0) {
662         return rc;
663     }
664 
665     /* Send NBD_INFO_EXPORT always */
666     myflags = exp->nbdflags;
667     if (client->structured_reply) {
668         myflags |= NBD_FLAG_SEND_DF;
669     }
670     trace_nbd_negotiate_new_style_size_flags(exp->size, myflags);
671     stq_be_p(buf, exp->size);
672     stw_be_p(buf + 8, myflags);
673     rc = nbd_negotiate_send_info(client, NBD_INFO_EXPORT,
674                                  sizeof(buf), buf, errp);
675     if (rc < 0) {
676         return rc;
677     }
678 
679     /*
680      * If the client is just asking for NBD_OPT_INFO, but forgot to
681      * request block sizes in a situation that would impact
682      * performance, then return an error. But for NBD_OPT_GO, we
683      * tolerate all clients, regardless of alignments.
684      */
685     if (client->opt == NBD_OPT_INFO && !blocksize &&
686         blk_get_request_alignment(exp->blk) > 1) {
687         return nbd_negotiate_send_rep_err(client,
688                                           NBD_REP_ERR_BLOCK_SIZE_REQD,
689                                           errp,
690                                           "request NBD_INFO_BLOCK_SIZE to "
691                                           "use this export");
692     }
693 
694     /* Final reply */
695     rc = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
696     if (rc < 0) {
697         return rc;
698     }
699 
700     if (client->opt == NBD_OPT_GO) {
701         client->exp = exp;
702         client->check_align = check_align;
703         QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
704         nbd_export_get(client->exp);
705         nbd_check_meta_export(client);
706         rc = 1;
707     }
708     return rc;
709 }
710 
711 
712 /* Handle NBD_OPT_STARTTLS. Return NULL to drop connection, or else the
713  * new channel for all further (now-encrypted) communication. */
714 static QIOChannel *nbd_negotiate_handle_starttls(NBDClient *client,
715                                                  Error **errp)
716 {
717     QIOChannel *ioc;
718     QIOChannelTLS *tioc;
719     struct NBDTLSHandshakeData data = { 0 };
720 
721     assert(client->opt == NBD_OPT_STARTTLS);
722 
723     trace_nbd_negotiate_handle_starttls();
724     ioc = client->ioc;
725 
726     if (nbd_negotiate_send_rep(client, NBD_REP_ACK, errp) < 0) {
727         return NULL;
728     }
729 
730     tioc = qio_channel_tls_new_server(ioc,
731                                       client->tlscreds,
732                                       client->tlsauthz,
733                                       errp);
734     if (!tioc) {
735         return NULL;
736     }
737 
738     qio_channel_set_name(QIO_CHANNEL(tioc), "nbd-server-tls");
739     trace_nbd_negotiate_handle_starttls_handshake();
740     data.loop = g_main_loop_new(g_main_context_default(), FALSE);
741     qio_channel_tls_handshake(tioc,
742                               nbd_tls_handshake,
743                               &data,
744                               NULL,
745                               NULL);
746 
747     if (!data.complete) {
748         g_main_loop_run(data.loop);
749     }
750     g_main_loop_unref(data.loop);
751     if (data.error) {
752         object_unref(OBJECT(tioc));
753         error_propagate(errp, data.error);
754         return NULL;
755     }
756 
757     return QIO_CHANNEL(tioc);
758 }
759 
760 /* nbd_negotiate_send_meta_context
761  *
762  * Send one chunk of reply to NBD_OPT_{LIST,SET}_META_CONTEXT
763  *
764  * For NBD_OPT_LIST_META_CONTEXT @context_id is ignored, 0 is used instead.
765  */
766 static int nbd_negotiate_send_meta_context(NBDClient *client,
767                                            const char *context,
768                                            uint32_t context_id,
769                                            Error **errp)
770 {
771     NBDOptionReplyMetaContext opt;
772     struct iovec iov[] = {
773         {.iov_base = &opt, .iov_len = sizeof(opt)},
774         {.iov_base = (void *)context, .iov_len = strlen(context)}
775     };
776 
777     assert(iov[1].iov_len <= NBD_MAX_STRING_SIZE);
778     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
779         context_id = 0;
780     }
781 
782     trace_nbd_negotiate_meta_query_reply(context, context_id);
783     set_be_option_rep(&opt.h, client->opt, NBD_REP_META_CONTEXT,
784                       sizeof(opt) - sizeof(opt.h) + iov[1].iov_len);
785     stl_be_p(&opt.context_id, context_id);
786 
787     return qio_channel_writev_all(client->ioc, iov, 2, errp) < 0 ? -EIO : 0;
788 }
789 
790 /* Read strlen(@pattern) bytes, and set @match to true if they match @pattern.
791  * @match is never set to false.
792  *
793  * Return -errno on I/O error, 0 if option was completely handled by
794  * sending a reply about inconsistent lengths, or 1 on success.
795  *
796  * Note: return code = 1 doesn't mean that we've read exactly @pattern.
797  * It only means that there are no errors.
798  */
799 static int nbd_meta_pattern(NBDClient *client, const char *pattern, bool *match,
800                             Error **errp)
801 {
802     int ret;
803     char *query;
804     size_t len = strlen(pattern);
805 
806     assert(len);
807 
808     query = g_malloc(len);
809     ret = nbd_opt_read(client, query, len, errp);
810     if (ret <= 0) {
811         g_free(query);
812         return ret;
813     }
814 
815     if (strncmp(query, pattern, len) == 0) {
816         trace_nbd_negotiate_meta_query_parse(pattern);
817         *match = true;
818     } else {
819         trace_nbd_negotiate_meta_query_skip("pattern not matched");
820     }
821     g_free(query);
822 
823     return 1;
824 }
825 
826 /*
827  * Read @len bytes, and set @match to true if they match @pattern, or if @len
828  * is 0 and the client is performing _LIST_. @match is never set to false.
829  *
830  * Return -errno on I/O error, 0 if option was completely handled by
831  * sending a reply about inconsistent lengths, or 1 on success.
832  *
833  * Note: return code = 1 doesn't mean that we've read exactly @pattern.
834  * It only means that there are no errors.
835  */
836 static int nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
837                                      uint32_t len, bool *match, Error **errp)
838 {
839     if (len == 0) {
840         if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
841             *match = true;
842         }
843         trace_nbd_negotiate_meta_query_parse("empty");
844         return 1;
845     }
846 
847     if (len != strlen(pattern)) {
848         trace_nbd_negotiate_meta_query_skip("different lengths");
849         return nbd_opt_skip(client, len, errp);
850     }
851 
852     return nbd_meta_pattern(client, pattern, match, errp);
853 }
854 
855 /* nbd_meta_base_query
856  *
857  * Handle queries to 'base' namespace. For now, only the base:allocation
858  * context is available.  'len' is the amount of text remaining to be read from
859  * the current name, after the 'base:' portion has been stripped.
860  *
861  * Return -errno on I/O error, 0 if option was completely handled by
862  * sending a reply about inconsistent lengths, or 1 on success.
863  */
864 static int nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta,
865                                uint32_t len, Error **errp)
866 {
867     return nbd_meta_empty_or_pattern(client, "allocation", len,
868                                      &meta->base_allocation, errp);
869 }
870 
871 /* nbd_meta_bitmap_query
872  *
873  * Handle query to 'qemu:' namespace.
874  * @len is the amount of text remaining to be read from the current name, after
875  * the 'qemu:' portion has been stripped.
876  *
877  * Return -errno on I/O error, 0 if option was completely handled by
878  * sending a reply about inconsistent lengths, or 1 on success. */
879 static int nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
880                                uint32_t len, Error **errp)
881 {
882     bool dirty_bitmap = false;
883     size_t dirty_bitmap_len = strlen("dirty-bitmap:");
884     int ret;
885 
886     if (!meta->exp->export_bitmap) {
887         trace_nbd_negotiate_meta_query_skip("no dirty-bitmap exported");
888         return nbd_opt_skip(client, len, errp);
889     }
890 
891     if (len == 0) {
892         if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
893             meta->bitmap = true;
894         }
895         trace_nbd_negotiate_meta_query_parse("empty");
896         return 1;
897     }
898 
899     if (len < dirty_bitmap_len) {
900         trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
901         return nbd_opt_skip(client, len, errp);
902     }
903 
904     len -= dirty_bitmap_len;
905     ret = nbd_meta_pattern(client, "dirty-bitmap:", &dirty_bitmap, errp);
906     if (ret <= 0) {
907         return ret;
908     }
909     if (!dirty_bitmap) {
910         trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
911         return nbd_opt_skip(client, len, errp);
912     }
913 
914     trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
915 
916     return nbd_meta_empty_or_pattern(
917             client, meta->exp->export_bitmap_context +
918             strlen("qemu:dirty_bitmap:"), len, &meta->bitmap, errp);
919 }
920 
921 /* nbd_negotiate_meta_query
922  *
923  * Parse namespace name and call corresponding function to parse body of the
924  * query.
925  *
926  * The only supported namespaces are 'base' and 'qemu'.
927  *
928  * The function aims not wasting time and memory to read long unknown namespace
929  * names.
930  *
931  * Return -errno on I/O error, 0 if option was completely handled by
932  * sending a reply about inconsistent lengths, or 1 on success. */
933 static int nbd_negotiate_meta_query(NBDClient *client,
934                                     NBDExportMetaContexts *meta, Error **errp)
935 {
936     /*
937      * Both 'qemu' and 'base' namespaces have length = 5 including a
938      * colon. If another length namespace is later introduced, this
939      * should certainly be refactored.
940      */
941     int ret;
942     size_t ns_len = 5;
943     char ns[5];
944     uint32_t len;
945 
946     ret = nbd_opt_read(client, &len, sizeof(len), errp);
947     if (ret <= 0) {
948         return ret;
949     }
950     len = cpu_to_be32(len);
951 
952     if (len > NBD_MAX_STRING_SIZE) {
953         trace_nbd_negotiate_meta_query_skip("length too long");
954         return nbd_opt_skip(client, len, errp);
955     }
956     if (len < ns_len) {
957         trace_nbd_negotiate_meta_query_skip("length too short");
958         return nbd_opt_skip(client, len, errp);
959     }
960 
961     len -= ns_len;
962     ret = nbd_opt_read(client, ns, ns_len, errp);
963     if (ret <= 0) {
964         return ret;
965     }
966 
967     if (!strncmp(ns, "base:", ns_len)) {
968         trace_nbd_negotiate_meta_query_parse("base:");
969         return nbd_meta_base_query(client, meta, len, errp);
970     } else if (!strncmp(ns, "qemu:", ns_len)) {
971         trace_nbd_negotiate_meta_query_parse("qemu:");
972         return nbd_meta_qemu_query(client, meta, len, errp);
973     }
974 
975     trace_nbd_negotiate_meta_query_skip("unknown namespace");
976     return nbd_opt_skip(client, len, errp);
977 }
978 
979 /* nbd_negotiate_meta_queries
980  * Handle NBD_OPT_LIST_META_CONTEXT and NBD_OPT_SET_META_CONTEXT
981  *
982  * Return -errno on I/O error, or 0 if option was completely handled. */
983 static int nbd_negotiate_meta_queries(NBDClient *client,
984                                       NBDExportMetaContexts *meta, Error **errp)
985 {
986     int ret;
987     g_autofree char *export_name = NULL;
988     NBDExportMetaContexts local_meta;
989     uint32_t nb_queries;
990     int i;
991 
992     if (!client->structured_reply) {
993         return nbd_opt_invalid(client, errp,
994                                "request option '%s' when structured reply "
995                                "is not negotiated",
996                                nbd_opt_lookup(client->opt));
997     }
998 
999     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
1000         /* Only change the caller's meta on SET. */
1001         meta = &local_meta;
1002     }
1003 
1004     memset(meta, 0, sizeof(*meta));
1005 
1006     ret = nbd_opt_read_name(client, &export_name, NULL, errp);
1007     if (ret <= 0) {
1008         return ret;
1009     }
1010 
1011     meta->exp = nbd_export_find(export_name);
1012     if (meta->exp == NULL) {
1013         g_autofree char *sane_name = nbd_sanitize_name(export_name);
1014 
1015         return nbd_opt_drop(client, NBD_REP_ERR_UNKNOWN, errp,
1016                             "export '%s' not present", sane_name);
1017     }
1018 
1019     ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), errp);
1020     if (ret <= 0) {
1021         return ret;
1022     }
1023     nb_queries = cpu_to_be32(nb_queries);
1024     trace_nbd_negotiate_meta_context(nbd_opt_lookup(client->opt),
1025                                      export_name, nb_queries);
1026 
1027     if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
1028         /* enable all known contexts */
1029         meta->base_allocation = true;
1030         meta->bitmap = !!meta->exp->export_bitmap;
1031     } else {
1032         for (i = 0; i < nb_queries; ++i) {
1033             ret = nbd_negotiate_meta_query(client, meta, errp);
1034             if (ret <= 0) {
1035                 return ret;
1036             }
1037         }
1038     }
1039 
1040     if (meta->base_allocation) {
1041         ret = nbd_negotiate_send_meta_context(client, "base:allocation",
1042                                               NBD_META_ID_BASE_ALLOCATION,
1043                                               errp);
1044         if (ret < 0) {
1045             return ret;
1046         }
1047     }
1048 
1049     if (meta->bitmap) {
1050         ret = nbd_negotiate_send_meta_context(client,
1051                                               meta->exp->export_bitmap_context,
1052                                               NBD_META_ID_DIRTY_BITMAP,
1053                                               errp);
1054         if (ret < 0) {
1055             return ret;
1056         }
1057     }
1058 
1059     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1060     if (ret == 0) {
1061         meta->valid = true;
1062     }
1063 
1064     return ret;
1065 }
1066 
1067 /* nbd_negotiate_options
1068  * Process all NBD_OPT_* client option commands, during fixed newstyle
1069  * negotiation.
1070  * Return:
1071  * -errno  on error, errp is set
1072  * 0       on successful negotiation, errp is not set
1073  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
1074  *         errp is not set
1075  */
1076 static int nbd_negotiate_options(NBDClient *client, Error **errp)
1077 {
1078     uint32_t flags;
1079     bool fixedNewstyle = false;
1080     bool no_zeroes = false;
1081 
1082     /* Client sends:
1083         [ 0 ..   3]   client flags
1084 
1085        Then we loop until NBD_OPT_EXPORT_NAME or NBD_OPT_GO:
1086         [ 0 ..   7]   NBD_OPTS_MAGIC
1087         [ 8 ..  11]   NBD option
1088         [12 ..  15]   Data length
1089         ...           Rest of request
1090 
1091         [ 0 ..   7]   NBD_OPTS_MAGIC
1092         [ 8 ..  11]   Second NBD option
1093         [12 ..  15]   Data length
1094         ...           Rest of request
1095     */
1096 
1097     if (nbd_read32(client->ioc, &flags, "flags", errp) < 0) {
1098         return -EIO;
1099     }
1100     trace_nbd_negotiate_options_flags(flags);
1101     if (flags & NBD_FLAG_C_FIXED_NEWSTYLE) {
1102         fixedNewstyle = true;
1103         flags &= ~NBD_FLAG_C_FIXED_NEWSTYLE;
1104     }
1105     if (flags & NBD_FLAG_C_NO_ZEROES) {
1106         no_zeroes = true;
1107         flags &= ~NBD_FLAG_C_NO_ZEROES;
1108     }
1109     if (flags != 0) {
1110         error_setg(errp, "Unknown client flags 0x%" PRIx32 " received", flags);
1111         return -EINVAL;
1112     }
1113 
1114     while (1) {
1115         int ret;
1116         uint32_t option, length;
1117         uint64_t magic;
1118 
1119         if (nbd_read64(client->ioc, &magic, "opts magic", errp) < 0) {
1120             return -EINVAL;
1121         }
1122         trace_nbd_negotiate_options_check_magic(magic);
1123         if (magic != NBD_OPTS_MAGIC) {
1124             error_setg(errp, "Bad magic received");
1125             return -EINVAL;
1126         }
1127 
1128         if (nbd_read32(client->ioc, &option, "option", errp) < 0) {
1129             return -EINVAL;
1130         }
1131         client->opt = option;
1132 
1133         if (nbd_read32(client->ioc, &length, "option length", errp) < 0) {
1134             return -EINVAL;
1135         }
1136         assert(!client->optlen);
1137         client->optlen = length;
1138 
1139         if (length > NBD_MAX_BUFFER_SIZE) {
1140             error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
1141                        length, NBD_MAX_BUFFER_SIZE);
1142             return -EINVAL;
1143         }
1144 
1145         trace_nbd_negotiate_options_check_option(option,
1146                                                  nbd_opt_lookup(option));
1147         if (client->tlscreds &&
1148             client->ioc == (QIOChannel *)client->sioc) {
1149             QIOChannel *tioc;
1150             if (!fixedNewstyle) {
1151                 error_setg(errp, "Unsupported option 0x%" PRIx32, option);
1152                 return -EINVAL;
1153             }
1154             switch (option) {
1155             case NBD_OPT_STARTTLS:
1156                 if (length) {
1157                     /* Unconditionally drop the connection if the client
1158                      * can't start a TLS negotiation correctly */
1159                     return nbd_reject_length(client, true, errp);
1160                 }
1161                 tioc = nbd_negotiate_handle_starttls(client, errp);
1162                 if (!tioc) {
1163                     return -EIO;
1164                 }
1165                 ret = 0;
1166                 object_unref(OBJECT(client->ioc));
1167                 client->ioc = QIO_CHANNEL(tioc);
1168                 break;
1169 
1170             case NBD_OPT_EXPORT_NAME:
1171                 /* No way to return an error to client, so drop connection */
1172                 error_setg(errp, "Option 0x%x not permitted before TLS",
1173                            option);
1174                 return -EINVAL;
1175 
1176             default:
1177                 /* Let the client keep trying, unless they asked to
1178                  * quit. Always try to give an error back to the
1179                  * client; but when replying to OPT_ABORT, be aware
1180                  * that the client may hang up before receiving the
1181                  * error, in which case we are fine ignoring the
1182                  * resulting EPIPE. */
1183                 ret = nbd_opt_drop(client, NBD_REP_ERR_TLS_REQD,
1184                                    option == NBD_OPT_ABORT ? NULL : errp,
1185                                    "Option 0x%" PRIx32
1186                                    " not permitted before TLS", option);
1187                 if (option == NBD_OPT_ABORT) {
1188                     return 1;
1189                 }
1190                 break;
1191             }
1192         } else if (fixedNewstyle) {
1193             switch (option) {
1194             case NBD_OPT_LIST:
1195                 if (length) {
1196                     ret = nbd_reject_length(client, false, errp);
1197                 } else {
1198                     ret = nbd_negotiate_handle_list(client, errp);
1199                 }
1200                 break;
1201 
1202             case NBD_OPT_ABORT:
1203                 /* NBD spec says we must try to reply before
1204                  * disconnecting, but that we must also tolerate
1205                  * guests that don't wait for our reply. */
1206                 nbd_negotiate_send_rep(client, NBD_REP_ACK, NULL);
1207                 return 1;
1208 
1209             case NBD_OPT_EXPORT_NAME:
1210                 return nbd_negotiate_handle_export_name(client, no_zeroes,
1211                                                         errp);
1212 
1213             case NBD_OPT_INFO:
1214             case NBD_OPT_GO:
1215                 ret = nbd_negotiate_handle_info(client, errp);
1216                 if (ret == 1) {
1217                     assert(option == NBD_OPT_GO);
1218                     return 0;
1219                 }
1220                 break;
1221 
1222             case NBD_OPT_STARTTLS:
1223                 if (length) {
1224                     ret = nbd_reject_length(client, false, errp);
1225                 } else if (client->tlscreds) {
1226                     ret = nbd_negotiate_send_rep_err(client,
1227                                                      NBD_REP_ERR_INVALID, errp,
1228                                                      "TLS already enabled");
1229                 } else {
1230                     ret = nbd_negotiate_send_rep_err(client,
1231                                                      NBD_REP_ERR_POLICY, errp,
1232                                                      "TLS not configured");
1233                 }
1234                 break;
1235 
1236             case NBD_OPT_STRUCTURED_REPLY:
1237                 if (length) {
1238                     ret = nbd_reject_length(client, false, errp);
1239                 } else if (client->structured_reply) {
1240                     ret = nbd_negotiate_send_rep_err(
1241                         client, NBD_REP_ERR_INVALID, errp,
1242                         "structured reply already negotiated");
1243                 } else {
1244                     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1245                     client->structured_reply = true;
1246                 }
1247                 break;
1248 
1249             case NBD_OPT_LIST_META_CONTEXT:
1250             case NBD_OPT_SET_META_CONTEXT:
1251                 ret = nbd_negotiate_meta_queries(client, &client->export_meta,
1252                                                  errp);
1253                 break;
1254 
1255             default:
1256                 ret = nbd_opt_drop(client, NBD_REP_ERR_UNSUP, errp,
1257                                    "Unsupported option %" PRIu32 " (%s)",
1258                                    option, nbd_opt_lookup(option));
1259                 break;
1260             }
1261         } else {
1262             /*
1263              * If broken new-style we should drop the connection
1264              * for anything except NBD_OPT_EXPORT_NAME
1265              */
1266             switch (option) {
1267             case NBD_OPT_EXPORT_NAME:
1268                 return nbd_negotiate_handle_export_name(client, no_zeroes,
1269                                                         errp);
1270 
1271             default:
1272                 error_setg(errp, "Unsupported option %" PRIu32 " (%s)",
1273                            option, nbd_opt_lookup(option));
1274                 return -EINVAL;
1275             }
1276         }
1277         if (ret < 0) {
1278             return ret;
1279         }
1280     }
1281 }
1282 
1283 /* nbd_negotiate
1284  * Return:
1285  * -errno  on error, errp is set
1286  * 0       on successful negotiation, errp is not set
1287  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
1288  *         errp is not set
1289  */
1290 static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
1291 {
1292     char buf[NBD_OLDSTYLE_NEGOTIATE_SIZE] = "";
1293     int ret;
1294 
1295     /* Old style negotiation header, no room for options
1296         [ 0 ..   7]   passwd       ("NBDMAGIC")
1297         [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
1298         [16 ..  23]   size
1299         [24 ..  27]   export flags (zero-extended)
1300         [28 .. 151]   reserved     (0)
1301 
1302        New style negotiation header, client can send options
1303         [ 0 ..   7]   passwd       ("NBDMAGIC")
1304         [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
1305         [16 ..  17]   server flags (0)
1306         ....options sent, ending in NBD_OPT_EXPORT_NAME or NBD_OPT_GO....
1307      */
1308 
1309     qio_channel_set_blocking(client->ioc, false, NULL);
1310 
1311     trace_nbd_negotiate_begin();
1312     memcpy(buf, "NBDMAGIC", 8);
1313 
1314     stq_be_p(buf + 8, NBD_OPTS_MAGIC);
1315     stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES);
1316 
1317     if (nbd_write(client->ioc, buf, 18, errp) < 0) {
1318         error_prepend(errp, "write failed: ");
1319         return -EINVAL;
1320     }
1321     ret = nbd_negotiate_options(client, errp);
1322     if (ret != 0) {
1323         if (ret < 0) {
1324             error_prepend(errp, "option negotiation failed: ");
1325         }
1326         return ret;
1327     }
1328 
1329     /* Attach the channel to the same AioContext as the export */
1330     if (client->exp && client->exp->ctx) {
1331         qio_channel_attach_aio_context(client->ioc, client->exp->ctx);
1332     }
1333 
1334     assert(!client->optlen);
1335     trace_nbd_negotiate_success();
1336 
1337     return 0;
1338 }
1339 
1340 static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
1341                                Error **errp)
1342 {
1343     uint8_t buf[NBD_REQUEST_SIZE];
1344     uint32_t magic;
1345     int ret;
1346 
1347     ret = nbd_read(ioc, buf, sizeof(buf), "request", errp);
1348     if (ret < 0) {
1349         return ret;
1350     }
1351 
1352     /* Request
1353        [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
1354        [ 4 ..  5]   flags   (NBD_CMD_FLAG_FUA, ...)
1355        [ 6 ..  7]   type    (NBD_CMD_READ, ...)
1356        [ 8 .. 15]   handle
1357        [16 .. 23]   from
1358        [24 .. 27]   len
1359      */
1360 
1361     magic = ldl_be_p(buf);
1362     request->flags  = lduw_be_p(buf + 4);
1363     request->type   = lduw_be_p(buf + 6);
1364     request->handle = ldq_be_p(buf + 8);
1365     request->from   = ldq_be_p(buf + 16);
1366     request->len    = ldl_be_p(buf + 24);
1367 
1368     trace_nbd_receive_request(magic, request->flags, request->type,
1369                               request->from, request->len);
1370 
1371     if (magic != NBD_REQUEST_MAGIC) {
1372         error_setg(errp, "invalid magic (got 0x%" PRIx32 ")", magic);
1373         return -EINVAL;
1374     }
1375     return 0;
1376 }
1377 
1378 #define MAX_NBD_REQUESTS 16
1379 
1380 void nbd_client_get(NBDClient *client)
1381 {
1382     client->refcount++;
1383 }
1384 
1385 void nbd_client_put(NBDClient *client)
1386 {
1387     if (--client->refcount == 0) {
1388         /* The last reference should be dropped by client->close,
1389          * which is called by client_close.
1390          */
1391         assert(client->closing);
1392 
1393         qio_channel_detach_aio_context(client->ioc);
1394         object_unref(OBJECT(client->sioc));
1395         object_unref(OBJECT(client->ioc));
1396         if (client->tlscreds) {
1397             object_unref(OBJECT(client->tlscreds));
1398         }
1399         g_free(client->tlsauthz);
1400         if (client->exp) {
1401             QTAILQ_REMOVE(&client->exp->clients, client, next);
1402             nbd_export_put(client->exp);
1403         }
1404         g_free(client);
1405     }
1406 }
1407 
1408 static void client_close(NBDClient *client, bool negotiated)
1409 {
1410     if (client->closing) {
1411         return;
1412     }
1413 
1414     client->closing = true;
1415 
1416     /* Force requests to finish.  They will drop their own references,
1417      * then we'll close the socket and free the NBDClient.
1418      */
1419     qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
1420                          NULL);
1421 
1422     /* Also tell the client, so that they release their reference.  */
1423     if (client->close_fn) {
1424         client->close_fn(client, negotiated);
1425     }
1426 }
1427 
1428 static NBDRequestData *nbd_request_get(NBDClient *client)
1429 {
1430     NBDRequestData *req;
1431 
1432     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
1433     client->nb_requests++;
1434 
1435     req = g_new0(NBDRequestData, 1);
1436     nbd_client_get(client);
1437     req->client = client;
1438     return req;
1439 }
1440 
1441 static void nbd_request_put(NBDRequestData *req)
1442 {
1443     NBDClient *client = req->client;
1444 
1445     if (req->data) {
1446         qemu_vfree(req->data);
1447     }
1448     g_free(req);
1449 
1450     client->nb_requests--;
1451     nbd_client_receive_next_request(client);
1452 
1453     nbd_client_put(client);
1454 }
1455 
1456 static void blk_aio_attached(AioContext *ctx, void *opaque)
1457 {
1458     NBDExport *exp = opaque;
1459     NBDClient *client;
1460 
1461     trace_nbd_blk_aio_attached(exp->name, ctx);
1462 
1463     exp->ctx = ctx;
1464 
1465     QTAILQ_FOREACH(client, &exp->clients, next) {
1466         qio_channel_attach_aio_context(client->ioc, ctx);
1467         if (client->recv_coroutine) {
1468             aio_co_schedule(ctx, client->recv_coroutine);
1469         }
1470         if (client->send_coroutine) {
1471             aio_co_schedule(ctx, client->send_coroutine);
1472         }
1473     }
1474 }
1475 
1476 static void blk_aio_detach(void *opaque)
1477 {
1478     NBDExport *exp = opaque;
1479     NBDClient *client;
1480 
1481     trace_nbd_blk_aio_detach(exp->name, exp->ctx);
1482 
1483     QTAILQ_FOREACH(client, &exp->clients, next) {
1484         qio_channel_detach_aio_context(client->ioc);
1485     }
1486 
1487     exp->ctx = NULL;
1488 }
1489 
1490 static void nbd_eject_notifier(Notifier *n, void *data)
1491 {
1492     NBDExport *exp = container_of(n, NBDExport, eject_notifier);
1493     AioContext *aio_context;
1494 
1495     aio_context = exp->ctx;
1496     aio_context_acquire(aio_context);
1497     nbd_export_close(exp);
1498     aio_context_release(aio_context);
1499 }
1500 
1501 NBDExport *nbd_export_new(BlockDriverState *bs, uint64_t dev_offset,
1502                           uint64_t size, const char *name, const char *desc,
1503                           const char *bitmap, bool readonly, bool shared,
1504                           void (*close)(NBDExport *), bool writethrough,
1505                           BlockBackend *on_eject_blk, Error **errp)
1506 {
1507     AioContext *ctx;
1508     BlockBackend *blk;
1509     NBDExport *exp = g_new0(NBDExport, 1);
1510     uint64_t perm;
1511     int ret;
1512 
1513     /*
1514      * NBD exports are used for non-shared storage migration.  Make sure
1515      * that BDRV_O_INACTIVE is cleared and the image is ready for write
1516      * access since the export could be available before migration handover.
1517      * ctx was acquired in the caller.
1518      */
1519     assert(name && strlen(name) <= NBD_MAX_STRING_SIZE);
1520     ctx = bdrv_get_aio_context(bs);
1521     bdrv_invalidate_cache(bs, NULL);
1522 
1523     /* Don't allow resize while the NBD server is running, otherwise we don't
1524      * care what happens with the node. */
1525     perm = BLK_PERM_CONSISTENT_READ;
1526     if (!readonly) {
1527         perm |= BLK_PERM_WRITE;
1528     }
1529     blk = blk_new(ctx, perm,
1530                   BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
1531                   BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD);
1532     ret = blk_insert_bs(blk, bs, errp);
1533     if (ret < 0) {
1534         goto fail;
1535     }
1536     blk_set_enable_write_cache(blk, !writethrough);
1537     blk_set_allow_aio_context_change(blk, true);
1538 
1539     exp->refcount = 1;
1540     QTAILQ_INIT(&exp->clients);
1541     exp->blk = blk;
1542     assert(dev_offset <= INT64_MAX);
1543     exp->dev_offset = dev_offset;
1544     exp->name = g_strdup(name);
1545     assert(!desc || strlen(desc) <= NBD_MAX_STRING_SIZE);
1546     exp->description = g_strdup(desc);
1547     exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH |
1548                      NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE);
1549     if (readonly) {
1550         exp->nbdflags |= NBD_FLAG_READ_ONLY;
1551         if (shared) {
1552             exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN;
1553         }
1554     } else {
1555         exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES |
1556                           NBD_FLAG_SEND_FAST_ZERO);
1557     }
1558     assert(size <= INT64_MAX - dev_offset);
1559     exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
1560 
1561     if (bitmap) {
1562         BdrvDirtyBitmap *bm = NULL;
1563 
1564         while (true) {
1565             bm = bdrv_find_dirty_bitmap(bs, bitmap);
1566             if (bm != NULL || bs->backing == NULL) {
1567                 break;
1568             }
1569 
1570             bs = bs->backing->bs;
1571         }
1572 
1573         if (bm == NULL) {
1574             error_setg(errp, "Bitmap '%s' is not found", bitmap);
1575             goto fail;
1576         }
1577 
1578         if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
1579             goto fail;
1580         }
1581 
1582         if (readonly && bdrv_is_writable(bs) &&
1583             bdrv_dirty_bitmap_enabled(bm)) {
1584             error_setg(errp,
1585                        "Enabled bitmap '%s' incompatible with readonly export",
1586                        bitmap);
1587             goto fail;
1588         }
1589 
1590         bdrv_dirty_bitmap_set_busy(bm, true);
1591         exp->export_bitmap = bm;
1592         assert(strlen(bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
1593         exp->export_bitmap_context = g_strdup_printf("qemu:dirty-bitmap:%s",
1594                                                      bitmap);
1595         assert(strlen(exp->export_bitmap_context) < NBD_MAX_STRING_SIZE);
1596     }
1597 
1598     exp->close = close;
1599     exp->ctx = ctx;
1600     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
1601 
1602     if (on_eject_blk) {
1603         blk_ref(on_eject_blk);
1604         exp->eject_notifier_blk = on_eject_blk;
1605         exp->eject_notifier.notify = nbd_eject_notifier;
1606         blk_add_remove_bs_notifier(on_eject_blk, &exp->eject_notifier);
1607     }
1608     QTAILQ_INSERT_TAIL(&exports, exp, next);
1609     nbd_export_get(exp);
1610     return exp;
1611 
1612 fail:
1613     blk_unref(blk);
1614     g_free(exp->name);
1615     g_free(exp->description);
1616     g_free(exp);
1617     return NULL;
1618 }
1619 
1620 NBDExport *nbd_export_find(const char *name)
1621 {
1622     NBDExport *exp;
1623     QTAILQ_FOREACH(exp, &exports, next) {
1624         if (strcmp(name, exp->name) == 0) {
1625             return exp;
1626         }
1627     }
1628 
1629     return NULL;
1630 }
1631 
1632 AioContext *
1633 nbd_export_aio_context(NBDExport *exp)
1634 {
1635     return exp->ctx;
1636 }
1637 
1638 void nbd_export_close(NBDExport *exp)
1639 {
1640     NBDClient *client, *next;
1641 
1642     nbd_export_get(exp);
1643     /*
1644      * TODO: Should we expand QMP NbdServerRemoveNode enum to allow a
1645      * close mode that stops advertising the export to new clients but
1646      * still permits existing clients to run to completion? Because of
1647      * that possibility, nbd_export_close() can be called more than
1648      * once on an export.
1649      */
1650     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
1651         client_close(client, true);
1652     }
1653     if (exp->name) {
1654         nbd_export_put(exp);
1655         g_free(exp->name);
1656         exp->name = NULL;
1657         QTAILQ_REMOVE(&exports, exp, next);
1658     }
1659     g_free(exp->description);
1660     exp->description = NULL;
1661     nbd_export_put(exp);
1662 }
1663 
1664 void nbd_export_remove(NBDExport *exp, NbdServerRemoveMode mode, Error **errp)
1665 {
1666     if (mode == NBD_SERVER_REMOVE_MODE_HARD || QTAILQ_EMPTY(&exp->clients)) {
1667         nbd_export_close(exp);
1668         return;
1669     }
1670 
1671     assert(mode == NBD_SERVER_REMOVE_MODE_SAFE);
1672 
1673     error_setg(errp, "export '%s' still in use", exp->name);
1674     error_append_hint(errp, "Use mode='hard' to force client disconnect\n");
1675 }
1676 
1677 void nbd_export_get(NBDExport *exp)
1678 {
1679     assert(exp->refcount > 0);
1680     exp->refcount++;
1681 }
1682 
1683 void nbd_export_put(NBDExport *exp)
1684 {
1685     assert(exp->refcount > 0);
1686     if (exp->refcount == 1) {
1687         nbd_export_close(exp);
1688     }
1689 
1690     /* nbd_export_close() may theoretically reduce refcount to 0. It may happen
1691      * if someone calls nbd_export_put() on named export not through
1692      * nbd_export_set_name() when refcount is 1. So, let's assert that
1693      * it is > 0.
1694      */
1695     assert(exp->refcount > 0);
1696     if (--exp->refcount == 0) {
1697         assert(exp->name == NULL);
1698         assert(exp->description == NULL);
1699 
1700         if (exp->close) {
1701             exp->close(exp);
1702         }
1703 
1704         if (exp->blk) {
1705             if (exp->eject_notifier_blk) {
1706                 notifier_remove(&exp->eject_notifier);
1707                 blk_unref(exp->eject_notifier_blk);
1708             }
1709             blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
1710                                             blk_aio_detach, exp);
1711             blk_unref(exp->blk);
1712             exp->blk = NULL;
1713         }
1714 
1715         if (exp->export_bitmap) {
1716             bdrv_dirty_bitmap_set_busy(exp->export_bitmap, false);
1717             g_free(exp->export_bitmap_context);
1718         }
1719 
1720         g_free(exp);
1721     }
1722 }
1723 
1724 BlockBackend *nbd_export_get_blockdev(NBDExport *exp)
1725 {
1726     return exp->blk;
1727 }
1728 
1729 void nbd_export_close_all(void)
1730 {
1731     NBDExport *exp, *next;
1732     AioContext *aio_context;
1733 
1734     QTAILQ_FOREACH_SAFE(exp, &exports, next, next) {
1735         aio_context = exp->ctx;
1736         aio_context_acquire(aio_context);
1737         nbd_export_close(exp);
1738         aio_context_release(aio_context);
1739     }
1740 }
1741 
1742 static int coroutine_fn nbd_co_send_iov(NBDClient *client, struct iovec *iov,
1743                                         unsigned niov, Error **errp)
1744 {
1745     int ret;
1746 
1747     g_assert(qemu_in_coroutine());
1748     qemu_co_mutex_lock(&client->send_lock);
1749     client->send_coroutine = qemu_coroutine_self();
1750 
1751     ret = qio_channel_writev_all(client->ioc, iov, niov, errp) < 0 ? -EIO : 0;
1752 
1753     client->send_coroutine = NULL;
1754     qemu_co_mutex_unlock(&client->send_lock);
1755 
1756     return ret;
1757 }
1758 
1759 static inline void set_be_simple_reply(NBDSimpleReply *reply, uint64_t error,
1760                                        uint64_t handle)
1761 {
1762     stl_be_p(&reply->magic, NBD_SIMPLE_REPLY_MAGIC);
1763     stl_be_p(&reply->error, error);
1764     stq_be_p(&reply->handle, handle);
1765 }
1766 
1767 static int nbd_co_send_simple_reply(NBDClient *client,
1768                                     uint64_t handle,
1769                                     uint32_t error,
1770                                     void *data,
1771                                     size_t len,
1772                                     Error **errp)
1773 {
1774     NBDSimpleReply reply;
1775     int nbd_err = system_errno_to_nbd_errno(error);
1776     struct iovec iov[] = {
1777         {.iov_base = &reply, .iov_len = sizeof(reply)},
1778         {.iov_base = data, .iov_len = len}
1779     };
1780 
1781     trace_nbd_co_send_simple_reply(handle, nbd_err, nbd_err_lookup(nbd_err),
1782                                    len);
1783     set_be_simple_reply(&reply, nbd_err, handle);
1784 
1785     return nbd_co_send_iov(client, iov, len ? 2 : 1, errp);
1786 }
1787 
1788 static inline void set_be_chunk(NBDStructuredReplyChunk *chunk, uint16_t flags,
1789                                 uint16_t type, uint64_t handle, uint32_t length)
1790 {
1791     stl_be_p(&chunk->magic, NBD_STRUCTURED_REPLY_MAGIC);
1792     stw_be_p(&chunk->flags, flags);
1793     stw_be_p(&chunk->type, type);
1794     stq_be_p(&chunk->handle, handle);
1795     stl_be_p(&chunk->length, length);
1796 }
1797 
1798 static int coroutine_fn nbd_co_send_structured_done(NBDClient *client,
1799                                                     uint64_t handle,
1800                                                     Error **errp)
1801 {
1802     NBDStructuredReplyChunk chunk;
1803     struct iovec iov[] = {
1804         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1805     };
1806 
1807     trace_nbd_co_send_structured_done(handle);
1808     set_be_chunk(&chunk, NBD_REPLY_FLAG_DONE, NBD_REPLY_TYPE_NONE, handle, 0);
1809 
1810     return nbd_co_send_iov(client, iov, 1, errp);
1811 }
1812 
1813 static int coroutine_fn nbd_co_send_structured_read(NBDClient *client,
1814                                                     uint64_t handle,
1815                                                     uint64_t offset,
1816                                                     void *data,
1817                                                     size_t size,
1818                                                     bool final,
1819                                                     Error **errp)
1820 {
1821     NBDStructuredReadData chunk;
1822     struct iovec iov[] = {
1823         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1824         {.iov_base = data, .iov_len = size}
1825     };
1826 
1827     assert(size);
1828     trace_nbd_co_send_structured_read(handle, offset, data, size);
1829     set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
1830                  NBD_REPLY_TYPE_OFFSET_DATA, handle,
1831                  sizeof(chunk) - sizeof(chunk.h) + size);
1832     stq_be_p(&chunk.offset, offset);
1833 
1834     return nbd_co_send_iov(client, iov, 2, errp);
1835 }
1836 
1837 static int coroutine_fn nbd_co_send_structured_error(NBDClient *client,
1838                                                      uint64_t handle,
1839                                                      uint32_t error,
1840                                                      const char *msg,
1841                                                      Error **errp)
1842 {
1843     NBDStructuredError chunk;
1844     int nbd_err = system_errno_to_nbd_errno(error);
1845     struct iovec iov[] = {
1846         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1847         {.iov_base = (char *)msg, .iov_len = msg ? strlen(msg) : 0},
1848     };
1849 
1850     assert(nbd_err);
1851     trace_nbd_co_send_structured_error(handle, nbd_err,
1852                                        nbd_err_lookup(nbd_err), msg ? msg : "");
1853     set_be_chunk(&chunk.h, NBD_REPLY_FLAG_DONE, NBD_REPLY_TYPE_ERROR, handle,
1854                  sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len);
1855     stl_be_p(&chunk.error, nbd_err);
1856     stw_be_p(&chunk.message_length, iov[1].iov_len);
1857 
1858     return nbd_co_send_iov(client, iov, 1 + !!iov[1].iov_len, errp);
1859 }
1860 
1861 /* Do a sparse read and send the structured reply to the client.
1862  * Returns -errno if sending fails. bdrv_block_status_above() failure is
1863  * reported to the client, at which point this function succeeds.
1864  */
1865 static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
1866                                                 uint64_t handle,
1867                                                 uint64_t offset,
1868                                                 uint8_t *data,
1869                                                 size_t size,
1870                                                 Error **errp)
1871 {
1872     int ret = 0;
1873     NBDExport *exp = client->exp;
1874     size_t progress = 0;
1875 
1876     while (progress < size) {
1877         int64_t pnum;
1878         int status = bdrv_block_status_above(blk_bs(exp->blk), NULL,
1879                                              offset + progress,
1880                                              size - progress, &pnum, NULL,
1881                                              NULL);
1882         bool final;
1883 
1884         if (status < 0) {
1885             char *msg = g_strdup_printf("unable to check for holes: %s",
1886                                         strerror(-status));
1887 
1888             ret = nbd_co_send_structured_error(client, handle, -status, msg,
1889                                                errp);
1890             g_free(msg);
1891             return ret;
1892         }
1893         assert(pnum && pnum <= size - progress);
1894         final = progress + pnum == size;
1895         if (status & BDRV_BLOCK_ZERO) {
1896             NBDStructuredReadHole chunk;
1897             struct iovec iov[] = {
1898                 {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1899             };
1900 
1901             trace_nbd_co_send_structured_read_hole(handle, offset + progress,
1902                                                    pnum);
1903             set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
1904                          NBD_REPLY_TYPE_OFFSET_HOLE,
1905                          handle, sizeof(chunk) - sizeof(chunk.h));
1906             stq_be_p(&chunk.offset, offset + progress);
1907             stl_be_p(&chunk.length, pnum);
1908             ret = nbd_co_send_iov(client, iov, 1, errp);
1909         } else {
1910             ret = blk_pread(exp->blk, offset + progress + exp->dev_offset,
1911                             data + progress, pnum);
1912             if (ret < 0) {
1913                 error_setg_errno(errp, -ret, "reading from file failed");
1914                 break;
1915             }
1916             ret = nbd_co_send_structured_read(client, handle, offset + progress,
1917                                               data + progress, pnum, final,
1918                                               errp);
1919         }
1920 
1921         if (ret < 0) {
1922             break;
1923         }
1924         progress += pnum;
1925     }
1926     return ret;
1927 }
1928 
1929 typedef struct NBDExtentArray {
1930     NBDExtent *extents;
1931     unsigned int nb_alloc;
1932     unsigned int count;
1933     uint64_t total_length;
1934     bool can_add;
1935     bool converted_to_be;
1936 } NBDExtentArray;
1937 
1938 static NBDExtentArray *nbd_extent_array_new(unsigned int nb_alloc)
1939 {
1940     NBDExtentArray *ea = g_new0(NBDExtentArray, 1);
1941 
1942     ea->nb_alloc = nb_alloc;
1943     ea->extents = g_new(NBDExtent, nb_alloc);
1944     ea->can_add = true;
1945 
1946     return ea;
1947 }
1948 
1949 static void nbd_extent_array_free(NBDExtentArray *ea)
1950 {
1951     g_free(ea->extents);
1952     g_free(ea);
1953 }
1954 G_DEFINE_AUTOPTR_CLEANUP_FUNC(NBDExtentArray, nbd_extent_array_free);
1955 
1956 /* Further modifications of the array after conversion are abandoned */
1957 static void nbd_extent_array_convert_to_be(NBDExtentArray *ea)
1958 {
1959     int i;
1960 
1961     assert(!ea->converted_to_be);
1962     ea->can_add = false;
1963     ea->converted_to_be = true;
1964 
1965     for (i = 0; i < ea->count; i++) {
1966         ea->extents[i].flags = cpu_to_be32(ea->extents[i].flags);
1967         ea->extents[i].length = cpu_to_be32(ea->extents[i].length);
1968     }
1969 }
1970 
1971 /*
1972  * Add extent to NBDExtentArray. If extent can't be added (no available space),
1973  * return -1.
1974  * For safety, when returning -1 for the first time, .can_add is set to false,
1975  * further call to nbd_extent_array_add() will crash.
1976  * (to avoid the situation, when after failing to add an extent (returned -1),
1977  * user miss this failure and add another extent, which is successfully added
1978  * (array is full, but new extent may be squashed into the last one), then we
1979  * have invalid array with skipped extent)
1980  */
1981 static int nbd_extent_array_add(NBDExtentArray *ea,
1982                                 uint32_t length, uint32_t flags)
1983 {
1984     assert(ea->can_add);
1985 
1986     if (!length) {
1987         return 0;
1988     }
1989 
1990     /* Extend previous extent if flags are the same */
1991     if (ea->count > 0 && flags == ea->extents[ea->count - 1].flags) {
1992         uint64_t sum = (uint64_t)length + ea->extents[ea->count - 1].length;
1993 
1994         if (sum <= UINT32_MAX) {
1995             ea->extents[ea->count - 1].length = sum;
1996             ea->total_length += length;
1997             return 0;
1998         }
1999     }
2000 
2001     if (ea->count >= ea->nb_alloc) {
2002         ea->can_add = false;
2003         return -1;
2004     }
2005 
2006     ea->total_length += length;
2007     ea->extents[ea->count] = (NBDExtent) {.length = length, .flags = flags};
2008     ea->count++;
2009 
2010     return 0;
2011 }
2012 
2013 static int blockstatus_to_extents(BlockDriverState *bs, uint64_t offset,
2014                                   uint64_t bytes, NBDExtentArray *ea)
2015 {
2016     while (bytes) {
2017         uint32_t flags;
2018         int64_t num;
2019         int ret = bdrv_block_status_above(bs, NULL, offset, bytes, &num,
2020                                           NULL, NULL);
2021 
2022         if (ret < 0) {
2023             return ret;
2024         }
2025 
2026         flags = (ret & BDRV_BLOCK_ALLOCATED ? 0 : NBD_STATE_HOLE) |
2027                 (ret & BDRV_BLOCK_ZERO      ? NBD_STATE_ZERO : 0);
2028 
2029         if (nbd_extent_array_add(ea, num, flags) < 0) {
2030             return 0;
2031         }
2032 
2033         offset += num;
2034         bytes -= num;
2035     }
2036 
2037     return 0;
2038 }
2039 
2040 /*
2041  * nbd_co_send_extents
2042  *
2043  * @ea is converted to BE by the function
2044  * @last controls whether NBD_REPLY_FLAG_DONE is sent.
2045  */
2046 static int nbd_co_send_extents(NBDClient *client, uint64_t handle,
2047                                NBDExtentArray *ea,
2048                                bool last, uint32_t context_id, Error **errp)
2049 {
2050     NBDStructuredMeta chunk;
2051     struct iovec iov[] = {
2052         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
2053         {.iov_base = ea->extents, .iov_len = ea->count * sizeof(ea->extents[0])}
2054     };
2055 
2056     nbd_extent_array_convert_to_be(ea);
2057 
2058     trace_nbd_co_send_extents(handle, ea->count, context_id, ea->total_length,
2059                               last);
2060     set_be_chunk(&chunk.h, last ? NBD_REPLY_FLAG_DONE : 0,
2061                  NBD_REPLY_TYPE_BLOCK_STATUS,
2062                  handle, sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len);
2063     stl_be_p(&chunk.context_id, context_id);
2064 
2065     return nbd_co_send_iov(client, iov, 2, errp);
2066 }
2067 
2068 /* Get block status from the exported device and send it to the client */
2069 static int nbd_co_send_block_status(NBDClient *client, uint64_t handle,
2070                                     BlockDriverState *bs, uint64_t offset,
2071                                     uint32_t length, bool dont_fragment,
2072                                     bool last, uint32_t context_id,
2073                                     Error **errp)
2074 {
2075     int ret;
2076     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2077     g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
2078 
2079     ret = blockstatus_to_extents(bs, offset, length, ea);
2080     if (ret < 0) {
2081         return nbd_co_send_structured_error(
2082                 client, handle, -ret, "can't get block status", errp);
2083     }
2084 
2085     return nbd_co_send_extents(client, handle, ea, last, context_id, errp);
2086 }
2087 
2088 /* Populate @ea from a dirty bitmap. */
2089 static void bitmap_to_extents(BdrvDirtyBitmap *bitmap,
2090                               uint64_t offset, uint64_t length,
2091                               NBDExtentArray *es)
2092 {
2093     int64_t start, dirty_start, dirty_count;
2094     int64_t end = offset + length;
2095     bool full = false;
2096 
2097     bdrv_dirty_bitmap_lock(bitmap);
2098 
2099     for (start = offset;
2100          bdrv_dirty_bitmap_next_dirty_area(bitmap, start, end, INT32_MAX,
2101                                            &dirty_start, &dirty_count);
2102          start = dirty_start + dirty_count)
2103     {
2104         if ((nbd_extent_array_add(es, dirty_start - start, 0) < 0) ||
2105             (nbd_extent_array_add(es, dirty_count, NBD_STATE_DIRTY) < 0))
2106         {
2107             full = true;
2108             break;
2109         }
2110     }
2111 
2112     if (!full) {
2113         /* last non dirty extent */
2114         nbd_extent_array_add(es, end - start, 0);
2115     }
2116 
2117     bdrv_dirty_bitmap_unlock(bitmap);
2118 }
2119 
2120 static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
2121                               BdrvDirtyBitmap *bitmap, uint64_t offset,
2122                               uint32_t length, bool dont_fragment, bool last,
2123                               uint32_t context_id, Error **errp)
2124 {
2125     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2126     g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
2127 
2128     bitmap_to_extents(bitmap, offset, length, ea);
2129 
2130     return nbd_co_send_extents(client, handle, ea, last, context_id, errp);
2131 }
2132 
2133 /* nbd_co_receive_request
2134  * Collect a client request. Return 0 if request looks valid, -EIO to drop
2135  * connection right away, and any other negative value to report an error to
2136  * the client (although the caller may still need to disconnect after reporting
2137  * the error).
2138  */
2139 static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
2140                                   Error **errp)
2141 {
2142     NBDClient *client = req->client;
2143     int valid_flags;
2144 
2145     g_assert(qemu_in_coroutine());
2146     assert(client->recv_coroutine == qemu_coroutine_self());
2147     if (nbd_receive_request(client->ioc, request, errp) < 0) {
2148         return -EIO;
2149     }
2150 
2151     trace_nbd_co_receive_request_decode_type(request->handle, request->type,
2152                                              nbd_cmd_lookup(request->type));
2153 
2154     if (request->type != NBD_CMD_WRITE) {
2155         /* No payload, we are ready to read the next request.  */
2156         req->complete = true;
2157     }
2158 
2159     if (request->type == NBD_CMD_DISC) {
2160         /* Special case: we're going to disconnect without a reply,
2161          * whether or not flags, from, or len are bogus */
2162         return -EIO;
2163     }
2164 
2165     if (request->type == NBD_CMD_READ || request->type == NBD_CMD_WRITE ||
2166         request->type == NBD_CMD_CACHE)
2167     {
2168         if (request->len > NBD_MAX_BUFFER_SIZE) {
2169             error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
2170                        request->len, NBD_MAX_BUFFER_SIZE);
2171             return -EINVAL;
2172         }
2173 
2174         if (request->type != NBD_CMD_CACHE) {
2175             req->data = blk_try_blockalign(client->exp->blk, request->len);
2176             if (req->data == NULL) {
2177                 error_setg(errp, "No memory");
2178                 return -ENOMEM;
2179             }
2180         }
2181     }
2182 
2183     if (request->type == NBD_CMD_WRITE) {
2184         if (nbd_read(client->ioc, req->data, request->len, "CMD_WRITE data",
2185                      errp) < 0)
2186         {
2187             return -EIO;
2188         }
2189         req->complete = true;
2190 
2191         trace_nbd_co_receive_request_payload_received(request->handle,
2192                                                       request->len);
2193     }
2194 
2195     /* Sanity checks. */
2196     if (client->exp->nbdflags & NBD_FLAG_READ_ONLY &&
2197         (request->type == NBD_CMD_WRITE ||
2198          request->type == NBD_CMD_WRITE_ZEROES ||
2199          request->type == NBD_CMD_TRIM)) {
2200         error_setg(errp, "Export is read-only");
2201         return -EROFS;
2202     }
2203     if (request->from > client->exp->size ||
2204         request->len > client->exp->size - request->from) {
2205         error_setg(errp, "operation past EOF; From: %" PRIu64 ", Len: %" PRIu32
2206                    ", Size: %" PRIu64, request->from, request->len,
2207                    client->exp->size);
2208         return (request->type == NBD_CMD_WRITE ||
2209                 request->type == NBD_CMD_WRITE_ZEROES) ? -ENOSPC : -EINVAL;
2210     }
2211     if (client->check_align && !QEMU_IS_ALIGNED(request->from | request->len,
2212                                                 client->check_align)) {
2213         /*
2214          * The block layer gracefully handles unaligned requests, but
2215          * it's still worth tracing client non-compliance
2216          */
2217         trace_nbd_co_receive_align_compliance(nbd_cmd_lookup(request->type),
2218                                               request->from,
2219                                               request->len,
2220                                               client->check_align);
2221     }
2222     valid_flags = NBD_CMD_FLAG_FUA;
2223     if (request->type == NBD_CMD_READ && client->structured_reply) {
2224         valid_flags |= NBD_CMD_FLAG_DF;
2225     } else if (request->type == NBD_CMD_WRITE_ZEROES) {
2226         valid_flags |= NBD_CMD_FLAG_NO_HOLE | NBD_CMD_FLAG_FAST_ZERO;
2227     } else if (request->type == NBD_CMD_BLOCK_STATUS) {
2228         valid_flags |= NBD_CMD_FLAG_REQ_ONE;
2229     }
2230     if (request->flags & ~valid_flags) {
2231         error_setg(errp, "unsupported flags for command %s (got 0x%x)",
2232                    nbd_cmd_lookup(request->type), request->flags);
2233         return -EINVAL;
2234     }
2235 
2236     return 0;
2237 }
2238 
2239 /* Send simple reply without a payload, or a structured error
2240  * @error_msg is ignored if @ret >= 0
2241  * Returns 0 if connection is still live, -errno on failure to talk to client
2242  */
2243 static coroutine_fn int nbd_send_generic_reply(NBDClient *client,
2244                                                uint64_t handle,
2245                                                int ret,
2246                                                const char *error_msg,
2247                                                Error **errp)
2248 {
2249     if (client->structured_reply && ret < 0) {
2250         return nbd_co_send_structured_error(client, handle, -ret, error_msg,
2251                                             errp);
2252     } else {
2253         return nbd_co_send_simple_reply(client, handle, ret < 0 ? -ret : 0,
2254                                         NULL, 0, errp);
2255     }
2256 }
2257 
2258 /* Handle NBD_CMD_READ request.
2259  * Return -errno if sending fails. Other errors are reported directly to the
2260  * client as an error reply. */
2261 static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
2262                                         uint8_t *data, Error **errp)
2263 {
2264     int ret;
2265     NBDExport *exp = client->exp;
2266 
2267     assert(request->type == NBD_CMD_READ);
2268 
2269     /* XXX: NBD Protocol only documents use of FUA with WRITE */
2270     if (request->flags & NBD_CMD_FLAG_FUA) {
2271         ret = blk_co_flush(exp->blk);
2272         if (ret < 0) {
2273             return nbd_send_generic_reply(client, request->handle, ret,
2274                                           "flush failed", errp);
2275         }
2276     }
2277 
2278     if (client->structured_reply && !(request->flags & NBD_CMD_FLAG_DF) &&
2279         request->len)
2280     {
2281         return nbd_co_send_sparse_read(client, request->handle, request->from,
2282                                        data, request->len, errp);
2283     }
2284 
2285     ret = blk_pread(exp->blk, request->from + exp->dev_offset, data,
2286                     request->len);
2287     if (ret < 0) {
2288         return nbd_send_generic_reply(client, request->handle, ret,
2289                                       "reading from file failed", errp);
2290     }
2291 
2292     if (client->structured_reply) {
2293         if (request->len) {
2294             return nbd_co_send_structured_read(client, request->handle,
2295                                                request->from, data,
2296                                                request->len, true, errp);
2297         } else {
2298             return nbd_co_send_structured_done(client, request->handle, errp);
2299         }
2300     } else {
2301         return nbd_co_send_simple_reply(client, request->handle, 0,
2302                                         data, request->len, errp);
2303     }
2304 }
2305 
2306 /*
2307  * nbd_do_cmd_cache
2308  *
2309  * Handle NBD_CMD_CACHE request.
2310  * Return -errno if sending fails. Other errors are reported directly to the
2311  * client as an error reply.
2312  */
2313 static coroutine_fn int nbd_do_cmd_cache(NBDClient *client, NBDRequest *request,
2314                                          Error **errp)
2315 {
2316     int ret;
2317     NBDExport *exp = client->exp;
2318 
2319     assert(request->type == NBD_CMD_CACHE);
2320 
2321     ret = blk_co_preadv(exp->blk, request->from + exp->dev_offset, request->len,
2322                         NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
2323 
2324     return nbd_send_generic_reply(client, request->handle, ret,
2325                                   "caching data failed", errp);
2326 }
2327 
2328 /* Handle NBD request.
2329  * Return -errno if sending fails. Other errors are reported directly to the
2330  * client as an error reply. */
2331 static coroutine_fn int nbd_handle_request(NBDClient *client,
2332                                            NBDRequest *request,
2333                                            uint8_t *data, Error **errp)
2334 {
2335     int ret;
2336     int flags;
2337     NBDExport *exp = client->exp;
2338     char *msg;
2339 
2340     switch (request->type) {
2341     case NBD_CMD_CACHE:
2342         return nbd_do_cmd_cache(client, request, errp);
2343 
2344     case NBD_CMD_READ:
2345         return nbd_do_cmd_read(client, request, data, errp);
2346 
2347     case NBD_CMD_WRITE:
2348         flags = 0;
2349         if (request->flags & NBD_CMD_FLAG_FUA) {
2350             flags |= BDRV_REQ_FUA;
2351         }
2352         ret = blk_pwrite(exp->blk, request->from + exp->dev_offset,
2353                          data, request->len, flags);
2354         return nbd_send_generic_reply(client, request->handle, ret,
2355                                       "writing to file failed", errp);
2356 
2357     case NBD_CMD_WRITE_ZEROES:
2358         flags = 0;
2359         if (request->flags & NBD_CMD_FLAG_FUA) {
2360             flags |= BDRV_REQ_FUA;
2361         }
2362         if (!(request->flags & NBD_CMD_FLAG_NO_HOLE)) {
2363             flags |= BDRV_REQ_MAY_UNMAP;
2364         }
2365         if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
2366             flags |= BDRV_REQ_NO_FALLBACK;
2367         }
2368         ret = blk_pwrite_zeroes(exp->blk, request->from + exp->dev_offset,
2369                                 request->len, flags);
2370         return nbd_send_generic_reply(client, request->handle, ret,
2371                                       "writing to file failed", errp);
2372 
2373     case NBD_CMD_DISC:
2374         /* unreachable, thanks to special case in nbd_co_receive_request() */
2375         abort();
2376 
2377     case NBD_CMD_FLUSH:
2378         ret = blk_co_flush(exp->blk);
2379         return nbd_send_generic_reply(client, request->handle, ret,
2380                                       "flush failed", errp);
2381 
2382     case NBD_CMD_TRIM:
2383         ret = blk_co_pdiscard(exp->blk, request->from + exp->dev_offset,
2384                               request->len);
2385         if (ret == 0 && request->flags & NBD_CMD_FLAG_FUA) {
2386             ret = blk_co_flush(exp->blk);
2387         }
2388         return nbd_send_generic_reply(client, request->handle, ret,
2389                                       "discard failed", errp);
2390 
2391     case NBD_CMD_BLOCK_STATUS:
2392         if (!request->len) {
2393             return nbd_send_generic_reply(client, request->handle, -EINVAL,
2394                                           "need non-zero length", errp);
2395         }
2396         if (client->export_meta.valid &&
2397             (client->export_meta.base_allocation ||
2398              client->export_meta.bitmap))
2399         {
2400             bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE;
2401 
2402             if (client->export_meta.base_allocation) {
2403                 ret = nbd_co_send_block_status(client, request->handle,
2404                                                blk_bs(exp->blk), request->from,
2405                                                request->len, dont_fragment,
2406                                                !client->export_meta.bitmap,
2407                                                NBD_META_ID_BASE_ALLOCATION,
2408                                                errp);
2409                 if (ret < 0) {
2410                     return ret;
2411                 }
2412             }
2413 
2414             if (client->export_meta.bitmap) {
2415                 ret = nbd_co_send_bitmap(client, request->handle,
2416                                          client->exp->export_bitmap,
2417                                          request->from, request->len,
2418                                          dont_fragment,
2419                                          true, NBD_META_ID_DIRTY_BITMAP, errp);
2420                 if (ret < 0) {
2421                     return ret;
2422                 }
2423             }
2424 
2425             return 0;
2426         } else {
2427             return nbd_send_generic_reply(client, request->handle, -EINVAL,
2428                                           "CMD_BLOCK_STATUS not negotiated",
2429                                           errp);
2430         }
2431 
2432     default:
2433         msg = g_strdup_printf("invalid request type (%" PRIu32 ") received",
2434                               request->type);
2435         ret = nbd_send_generic_reply(client, request->handle, -EINVAL, msg,
2436                                      errp);
2437         g_free(msg);
2438         return ret;
2439     }
2440 }
2441 
2442 /* Owns a reference to the NBDClient passed as opaque.  */
2443 static coroutine_fn void nbd_trip(void *opaque)
2444 {
2445     NBDClient *client = opaque;
2446     NBDRequestData *req;
2447     NBDRequest request = { 0 };    /* GCC thinks it can be used uninitialized */
2448     int ret;
2449     Error *local_err = NULL;
2450 
2451     trace_nbd_trip();
2452     if (client->closing) {
2453         nbd_client_put(client);
2454         return;
2455     }
2456 
2457     req = nbd_request_get(client);
2458     ret = nbd_co_receive_request(req, &request, &local_err);
2459     client->recv_coroutine = NULL;
2460 
2461     if (client->closing) {
2462         /*
2463          * The client may be closed when we are blocked in
2464          * nbd_co_receive_request()
2465          */
2466         goto done;
2467     }
2468 
2469     nbd_client_receive_next_request(client);
2470     if (ret == -EIO) {
2471         goto disconnect;
2472     }
2473 
2474     if (ret < 0) {
2475         /* It wans't -EIO, so, according to nbd_co_receive_request()
2476          * semantics, we should return the error to the client. */
2477         Error *export_err = local_err;
2478 
2479         local_err = NULL;
2480         ret = nbd_send_generic_reply(client, request.handle, -EINVAL,
2481                                      error_get_pretty(export_err), &local_err);
2482         error_free(export_err);
2483     } else {
2484         ret = nbd_handle_request(client, &request, req->data, &local_err);
2485     }
2486     if (ret < 0) {
2487         error_prepend(&local_err, "Failed to send reply: ");
2488         goto disconnect;
2489     }
2490 
2491     /* We must disconnect after NBD_CMD_WRITE if we did not
2492      * read the payload.
2493      */
2494     if (!req->complete) {
2495         error_setg(&local_err, "Request handling failed in intermediate state");
2496         goto disconnect;
2497     }
2498 
2499 done:
2500     nbd_request_put(req);
2501     nbd_client_put(client);
2502     return;
2503 
2504 disconnect:
2505     if (local_err) {
2506         error_reportf_err(local_err, "Disconnect client, due to: ");
2507     }
2508     nbd_request_put(req);
2509     client_close(client, true);
2510     nbd_client_put(client);
2511 }
2512 
2513 static void nbd_client_receive_next_request(NBDClient *client)
2514 {
2515     if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
2516         nbd_client_get(client);
2517         client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
2518         aio_co_schedule(client->exp->ctx, client->recv_coroutine);
2519     }
2520 }
2521 
2522 static coroutine_fn void nbd_co_client_start(void *opaque)
2523 {
2524     NBDClient *client = opaque;
2525     Error *local_err = NULL;
2526 
2527     qemu_co_mutex_init(&client->send_lock);
2528 
2529     if (nbd_negotiate(client, &local_err)) {
2530         if (local_err) {
2531             error_report_err(local_err);
2532         }
2533         client_close(client, false);
2534         return;
2535     }
2536 
2537     nbd_client_receive_next_request(client);
2538 }
2539 
2540 /*
2541  * Create a new client listener using the given channel @sioc.
2542  * Begin servicing it in a coroutine.  When the connection closes, call
2543  * @close_fn with an indication of whether the client completed negotiation.
2544  */
2545 void nbd_client_new(QIOChannelSocket *sioc,
2546                     QCryptoTLSCreds *tlscreds,
2547                     const char *tlsauthz,
2548                     void (*close_fn)(NBDClient *, bool))
2549 {
2550     NBDClient *client;
2551     Coroutine *co;
2552 
2553     client = g_new0(NBDClient, 1);
2554     client->refcount = 1;
2555     client->tlscreds = tlscreds;
2556     if (tlscreds) {
2557         object_ref(OBJECT(client->tlscreds));
2558     }
2559     client->tlsauthz = g_strdup(tlsauthz);
2560     client->sioc = sioc;
2561     object_ref(OBJECT(client->sioc));
2562     client->ioc = QIO_CHANNEL(sioc);
2563     object_ref(OBJECT(client->ioc));
2564     client->close_fn = close_fn;
2565 
2566     co = qemu_coroutine_create(nbd_co_client_start, client);
2567     qemu_coroutine_enter(co);
2568 }
2569