xref: /openbmc/qemu/nbd/server.c (revision 8fa3b702)
1 /*
2  *  Copyright (C) 2016-2018 Red Hat, Inc.
3  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
4  *
5  *  Network Block Device Server Side
6  *
7  *  This program is free software; you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License as published by
9  *  the Free Software Foundation; under version 2 of the License.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "qemu/osdep.h"
21 #include "qapi/error.h"
22 #include "qemu/queue.h"
23 #include "trace.h"
24 #include "nbd-internal.h"
25 #include "qemu/units.h"
26 
27 #define NBD_META_ID_BASE_ALLOCATION 0
28 #define NBD_META_ID_DIRTY_BITMAP 1
29 
30 /*
31  * NBD_MAX_BLOCK_STATUS_EXTENTS: 1 MiB of extents data. An empirical
32  * constant. If an increase is needed, note that the NBD protocol
33  * recommends no larger than 32 mb, so that the client won't consider
34  * the reply as a denial of service attack.
35  */
36 #define NBD_MAX_BLOCK_STATUS_EXTENTS (1 * MiB / 8)
37 
38 static int system_errno_to_nbd_errno(int err)
39 {
40     switch (err) {
41     case 0:
42         return NBD_SUCCESS;
43     case EPERM:
44     case EROFS:
45         return NBD_EPERM;
46     case EIO:
47         return NBD_EIO;
48     case ENOMEM:
49         return NBD_ENOMEM;
50 #ifdef EDQUOT
51     case EDQUOT:
52 #endif
53     case EFBIG:
54     case ENOSPC:
55         return NBD_ENOSPC;
56     case EOVERFLOW:
57         return NBD_EOVERFLOW;
58     case ENOTSUP:
59 #if ENOTSUP != EOPNOTSUPP
60     case EOPNOTSUPP:
61 #endif
62         return NBD_ENOTSUP;
63     case ESHUTDOWN:
64         return NBD_ESHUTDOWN;
65     case EINVAL:
66     default:
67         return NBD_EINVAL;
68     }
69 }
70 
71 /* Definitions for opaque data types */
72 
73 typedef struct NBDRequestData NBDRequestData;
74 
75 struct NBDRequestData {
76     QSIMPLEQ_ENTRY(NBDRequestData) entry;
77     NBDClient *client;
78     uint8_t *data;
79     bool complete;
80 };
81 
82 struct NBDExport {
83     int refcount;
84     void (*close)(NBDExport *exp);
85 
86     BlockBackend *blk;
87     char *name;
88     char *description;
89     uint64_t dev_offset;
90     uint64_t size;
91     uint16_t nbdflags;
92     QTAILQ_HEAD(, NBDClient) clients;
93     QTAILQ_ENTRY(NBDExport) next;
94 
95     AioContext *ctx;
96 
97     BlockBackend *eject_notifier_blk;
98     Notifier eject_notifier;
99 
100     BdrvDirtyBitmap *export_bitmap;
101     char *export_bitmap_context;
102 };
103 
104 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
105 static QTAILQ_HEAD(, NBDExport) closed_exports =
106         QTAILQ_HEAD_INITIALIZER(closed_exports);
107 
108 /* NBDExportMetaContexts represents a list of contexts to be exported,
109  * as selected by NBD_OPT_SET_META_CONTEXT. Also used for
110  * NBD_OPT_LIST_META_CONTEXT. */
111 typedef struct NBDExportMetaContexts {
112     NBDExport *exp;
113     bool valid; /* means that negotiation of the option finished without
114                    errors */
115     bool base_allocation; /* export base:allocation context (block status) */
116     bool bitmap; /* export qemu:dirty-bitmap:<export bitmap name> */
117 } NBDExportMetaContexts;
118 
119 struct NBDClient {
120     int refcount;
121     void (*close_fn)(NBDClient *client, bool negotiated);
122 
123     NBDExport *exp;
124     QCryptoTLSCreds *tlscreds;
125     char *tlsauthz;
126     QIOChannelSocket *sioc; /* The underlying data channel */
127     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
128 
129     Coroutine *recv_coroutine;
130 
131     CoMutex send_lock;
132     Coroutine *send_coroutine;
133 
134     QTAILQ_ENTRY(NBDClient) next;
135     int nb_requests;
136     bool closing;
137 
138     uint32_t check_align; /* If non-zero, check for aligned client requests */
139 
140     bool structured_reply;
141     NBDExportMetaContexts export_meta;
142 
143     uint32_t opt; /* Current option being negotiated */
144     uint32_t optlen; /* remaining length of data in ioc for the option being
145                         negotiated now */
146 };
147 
148 static void nbd_client_receive_next_request(NBDClient *client);
149 
150 /* Basic flow for negotiation
151 
152    Server         Client
153    Negotiate
154 
155    or
156 
157    Server         Client
158    Negotiate #1
159                   Option
160    Negotiate #2
161 
162    ----
163 
164    followed by
165 
166    Server         Client
167                   Request
168    Response
169                   Request
170    Response
171                   ...
172    ...
173                   Request (type == 2)
174 
175 */
176 
177 static inline void set_be_option_rep(NBDOptionReply *rep, uint32_t option,
178                                      uint32_t type, uint32_t length)
179 {
180     stq_be_p(&rep->magic, NBD_REP_MAGIC);
181     stl_be_p(&rep->option, option);
182     stl_be_p(&rep->type, type);
183     stl_be_p(&rep->length, length);
184 }
185 
186 /* Send a reply header, including length, but no payload.
187  * Return -errno on error, 0 on success. */
188 static int nbd_negotiate_send_rep_len(NBDClient *client, uint32_t type,
189                                       uint32_t len, Error **errp)
190 {
191     NBDOptionReply rep;
192 
193     trace_nbd_negotiate_send_rep_len(client->opt, nbd_opt_lookup(client->opt),
194                                      type, nbd_rep_lookup(type), len);
195 
196     assert(len < NBD_MAX_BUFFER_SIZE);
197 
198     set_be_option_rep(&rep, client->opt, type, len);
199     return nbd_write(client->ioc, &rep, sizeof(rep), errp);
200 }
201 
202 /* Send a reply header with default 0 length.
203  * Return -errno on error, 0 on success. */
204 static int nbd_negotiate_send_rep(NBDClient *client, uint32_t type,
205                                   Error **errp)
206 {
207     return nbd_negotiate_send_rep_len(client, type, 0, errp);
208 }
209 
210 /* Send an error reply.
211  * Return -errno on error, 0 on success. */
212 static int GCC_FMT_ATTR(4, 0)
213 nbd_negotiate_send_rep_verr(NBDClient *client, uint32_t type,
214                             Error **errp, const char *fmt, va_list va)
215 {
216     ERRP_GUARD();
217     g_autofree char *msg = NULL;
218     int ret;
219     size_t len;
220 
221     msg = g_strdup_vprintf(fmt, va);
222     len = strlen(msg);
223     assert(len < NBD_MAX_STRING_SIZE);
224     trace_nbd_negotiate_send_rep_err(msg);
225     ret = nbd_negotiate_send_rep_len(client, type, len, errp);
226     if (ret < 0) {
227         return ret;
228     }
229     if (nbd_write(client->ioc, msg, len, errp) < 0) {
230         error_prepend(errp, "write failed (error message): ");
231         return -EIO;
232     }
233 
234     return 0;
235 }
236 
237 /*
238  * Return a malloc'd copy of @name suitable for use in an error reply.
239  */
240 static char *
241 nbd_sanitize_name(const char *name)
242 {
243     if (strnlen(name, 80) < 80) {
244         return g_strdup(name);
245     }
246     /* XXX Should we also try to sanitize any control characters? */
247     return g_strdup_printf("%.80s...", name);
248 }
249 
250 /* Send an error reply.
251  * Return -errno on error, 0 on success. */
252 static int GCC_FMT_ATTR(4, 5)
253 nbd_negotiate_send_rep_err(NBDClient *client, uint32_t type,
254                            Error **errp, const char *fmt, ...)
255 {
256     va_list va;
257     int ret;
258 
259     va_start(va, fmt);
260     ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
261     va_end(va);
262     return ret;
263 }
264 
265 /* Drop remainder of the current option, and send a reply with the
266  * given error type and message. Return -errno on read or write
267  * failure; or 0 if connection is still live. */
268 static int GCC_FMT_ATTR(4, 0)
269 nbd_opt_vdrop(NBDClient *client, uint32_t type, Error **errp,
270               const char *fmt, va_list va)
271 {
272     int ret = nbd_drop(client->ioc, client->optlen, errp);
273 
274     client->optlen = 0;
275     if (!ret) {
276         ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
277     }
278     return ret;
279 }
280 
281 static int GCC_FMT_ATTR(4, 5)
282 nbd_opt_drop(NBDClient *client, uint32_t type, Error **errp,
283              const char *fmt, ...)
284 {
285     int ret;
286     va_list va;
287 
288     va_start(va, fmt);
289     ret = nbd_opt_vdrop(client, type, errp, fmt, va);
290     va_end(va);
291 
292     return ret;
293 }
294 
295 static int GCC_FMT_ATTR(3, 4)
296 nbd_opt_invalid(NBDClient *client, Error **errp, const char *fmt, ...)
297 {
298     int ret;
299     va_list va;
300 
301     va_start(va, fmt);
302     ret = nbd_opt_vdrop(client, NBD_REP_ERR_INVALID, errp, fmt, va);
303     va_end(va);
304 
305     return ret;
306 }
307 
308 /* Read size bytes from the unparsed payload of the current option.
309  * Return -errno on I/O error, 0 if option was completely handled by
310  * sending a reply about inconsistent lengths, or 1 on success. */
311 static int nbd_opt_read(NBDClient *client, void *buffer, size_t size,
312                         Error **errp)
313 {
314     if (size > client->optlen) {
315         return nbd_opt_invalid(client, errp,
316                                "Inconsistent lengths in option %s",
317                                nbd_opt_lookup(client->opt));
318     }
319     client->optlen -= size;
320     return qio_channel_read_all(client->ioc, buffer, size, errp) < 0 ? -EIO : 1;
321 }
322 
323 /* Drop size bytes from the unparsed payload of the current option.
324  * Return -errno on I/O error, 0 if option was completely handled by
325  * sending a reply about inconsistent lengths, or 1 on success. */
326 static int nbd_opt_skip(NBDClient *client, size_t size, Error **errp)
327 {
328     if (size > client->optlen) {
329         return nbd_opt_invalid(client, errp,
330                                "Inconsistent lengths in option %s",
331                                nbd_opt_lookup(client->opt));
332     }
333     client->optlen -= size;
334     return nbd_drop(client->ioc, size, errp) < 0 ? -EIO : 1;
335 }
336 
337 /* nbd_opt_read_name
338  *
339  * Read a string with the format:
340  *   uint32_t len     (<= NBD_MAX_STRING_SIZE)
341  *   len bytes string (not 0-terminated)
342  *
343  * On success, @name will be allocated.
344  * If @length is non-null, it will be set to the actual string length.
345  *
346  * Return -errno on I/O error, 0 if option was completely handled by
347  * sending a reply about inconsistent lengths, or 1 on success.
348  */
349 static int nbd_opt_read_name(NBDClient *client, char **name, uint32_t *length,
350                              Error **errp)
351 {
352     int ret;
353     uint32_t len;
354     g_autofree char *local_name = NULL;
355 
356     *name = NULL;
357     ret = nbd_opt_read(client, &len, sizeof(len), errp);
358     if (ret <= 0) {
359         return ret;
360     }
361     len = cpu_to_be32(len);
362 
363     if (len > NBD_MAX_STRING_SIZE) {
364         return nbd_opt_invalid(client, errp,
365                                "Invalid name length: %" PRIu32, len);
366     }
367 
368     local_name = g_malloc(len + 1);
369     ret = nbd_opt_read(client, local_name, len, errp);
370     if (ret <= 0) {
371         return ret;
372     }
373     local_name[len] = '\0';
374 
375     if (length) {
376         *length = len;
377     }
378     *name = g_steal_pointer(&local_name);
379 
380     return 1;
381 }
382 
383 /* Send a single NBD_REP_SERVER reply to NBD_OPT_LIST, including payload.
384  * Return -errno on error, 0 on success. */
385 static int nbd_negotiate_send_rep_list(NBDClient *client, NBDExport *exp,
386                                        Error **errp)
387 {
388     ERRP_GUARD();
389     size_t name_len, desc_len;
390     uint32_t len;
391     const char *name = exp->name ? exp->name : "";
392     const char *desc = exp->description ? exp->description : "";
393     QIOChannel *ioc = client->ioc;
394     int ret;
395 
396     trace_nbd_negotiate_send_rep_list(name, desc);
397     name_len = strlen(name);
398     desc_len = strlen(desc);
399     assert(name_len <= NBD_MAX_STRING_SIZE && desc_len <= NBD_MAX_STRING_SIZE);
400     len = name_len + desc_len + sizeof(len);
401     ret = nbd_negotiate_send_rep_len(client, NBD_REP_SERVER, len, errp);
402     if (ret < 0) {
403         return ret;
404     }
405 
406     len = cpu_to_be32(name_len);
407     if (nbd_write(ioc, &len, sizeof(len), errp) < 0) {
408         error_prepend(errp, "write failed (name length): ");
409         return -EINVAL;
410     }
411 
412     if (nbd_write(ioc, name, name_len, errp) < 0) {
413         error_prepend(errp, "write failed (name buffer): ");
414         return -EINVAL;
415     }
416 
417     if (nbd_write(ioc, desc, desc_len, errp) < 0) {
418         error_prepend(errp, "write failed (description buffer): ");
419         return -EINVAL;
420     }
421 
422     return 0;
423 }
424 
425 /* Process the NBD_OPT_LIST command, with a potential series of replies.
426  * Return -errno on error, 0 on success. */
427 static int nbd_negotiate_handle_list(NBDClient *client, Error **errp)
428 {
429     NBDExport *exp;
430     assert(client->opt == NBD_OPT_LIST);
431 
432     /* For each export, send a NBD_REP_SERVER reply. */
433     QTAILQ_FOREACH(exp, &exports, next) {
434         if (nbd_negotiate_send_rep_list(client, exp, errp)) {
435             return -EINVAL;
436         }
437     }
438     /* Finish with a NBD_REP_ACK. */
439     return nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
440 }
441 
442 static void nbd_check_meta_export(NBDClient *client)
443 {
444     client->export_meta.valid &= client->exp == client->export_meta.exp;
445 }
446 
447 /* Send a reply to NBD_OPT_EXPORT_NAME.
448  * Return -errno on error, 0 on success. */
449 static int nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes,
450                                             Error **errp)
451 {
452     ERRP_GUARD();
453     g_autofree char *name = NULL;
454     char buf[NBD_REPLY_EXPORT_NAME_SIZE] = "";
455     size_t len;
456     int ret;
457     uint16_t myflags;
458 
459     /* Client sends:
460         [20 ..  xx]   export name (length bytes)
461        Server replies:
462         [ 0 ..   7]   size
463         [ 8 ..   9]   export flags
464         [10 .. 133]   reserved     (0) [unless no_zeroes]
465      */
466     trace_nbd_negotiate_handle_export_name();
467     if (client->optlen > NBD_MAX_STRING_SIZE) {
468         error_setg(errp, "Bad length received");
469         return -EINVAL;
470     }
471     name = g_malloc(client->optlen + 1);
472     if (nbd_read(client->ioc, name, client->optlen, "export name", errp) < 0) {
473         return -EIO;
474     }
475     name[client->optlen] = '\0';
476     client->optlen = 0;
477 
478     trace_nbd_negotiate_handle_export_name_request(name);
479 
480     client->exp = nbd_export_find(name);
481     if (!client->exp) {
482         error_setg(errp, "export not found");
483         return -EINVAL;
484     }
485 
486     myflags = client->exp->nbdflags;
487     if (client->structured_reply) {
488         myflags |= NBD_FLAG_SEND_DF;
489     }
490     trace_nbd_negotiate_new_style_size_flags(client->exp->size, myflags);
491     stq_be_p(buf, client->exp->size);
492     stw_be_p(buf + 8, myflags);
493     len = no_zeroes ? 10 : sizeof(buf);
494     ret = nbd_write(client->ioc, buf, len, errp);
495     if (ret < 0) {
496         error_prepend(errp, "write failed: ");
497         return ret;
498     }
499 
500     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
501     nbd_export_get(client->exp);
502     nbd_check_meta_export(client);
503 
504     return 0;
505 }
506 
507 /* Send a single NBD_REP_INFO, with a buffer @buf of @length bytes.
508  * The buffer does NOT include the info type prefix.
509  * Return -errno on error, 0 if ready to send more. */
510 static int nbd_negotiate_send_info(NBDClient *client,
511                                    uint16_t info, uint32_t length, void *buf,
512                                    Error **errp)
513 {
514     int rc;
515 
516     trace_nbd_negotiate_send_info(info, nbd_info_lookup(info), length);
517     rc = nbd_negotiate_send_rep_len(client, NBD_REP_INFO,
518                                     sizeof(info) + length, errp);
519     if (rc < 0) {
520         return rc;
521     }
522     info = cpu_to_be16(info);
523     if (nbd_write(client->ioc, &info, sizeof(info), errp) < 0) {
524         return -EIO;
525     }
526     if (nbd_write(client->ioc, buf, length, errp) < 0) {
527         return -EIO;
528     }
529     return 0;
530 }
531 
532 /* nbd_reject_length: Handle any unexpected payload.
533  * @fatal requests that we quit talking to the client, even if we are able
534  * to successfully send an error reply.
535  * Return:
536  * -errno  transmission error occurred or @fatal was requested, errp is set
537  * 0       error message successfully sent to client, errp is not set
538  */
539 static int nbd_reject_length(NBDClient *client, bool fatal, Error **errp)
540 {
541     int ret;
542 
543     assert(client->optlen);
544     ret = nbd_opt_invalid(client, errp, "option '%s' has unexpected length",
545                           nbd_opt_lookup(client->opt));
546     if (fatal && !ret) {
547         error_setg(errp, "option '%s' has unexpected length",
548                    nbd_opt_lookup(client->opt));
549         return -EINVAL;
550     }
551     return ret;
552 }
553 
554 /* Handle NBD_OPT_INFO and NBD_OPT_GO.
555  * Return -errno on error, 0 if ready for next option, and 1 to move
556  * into transmission phase.  */
557 static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
558 {
559     int rc;
560     g_autofree char *name = NULL;
561     NBDExport *exp;
562     uint16_t requests;
563     uint16_t request;
564     uint32_t namelen;
565     bool sendname = false;
566     bool blocksize = false;
567     uint32_t sizes[3];
568     char buf[sizeof(uint64_t) + sizeof(uint16_t)];
569     uint32_t check_align = 0;
570     uint16_t myflags;
571 
572     /* Client sends:
573         4 bytes: L, name length (can be 0)
574         L bytes: export name
575         2 bytes: N, number of requests (can be 0)
576         N * 2 bytes: N requests
577     */
578     rc = nbd_opt_read_name(client, &name, &namelen, errp);
579     if (rc <= 0) {
580         return rc;
581     }
582     trace_nbd_negotiate_handle_export_name_request(name);
583 
584     rc = nbd_opt_read(client, &requests, sizeof(requests), errp);
585     if (rc <= 0) {
586         return rc;
587     }
588     requests = be16_to_cpu(requests);
589     trace_nbd_negotiate_handle_info_requests(requests);
590     while (requests--) {
591         rc = nbd_opt_read(client, &request, sizeof(request), errp);
592         if (rc <= 0) {
593             return rc;
594         }
595         request = be16_to_cpu(request);
596         trace_nbd_negotiate_handle_info_request(request,
597                                                 nbd_info_lookup(request));
598         /* We care about NBD_INFO_NAME and NBD_INFO_BLOCK_SIZE;
599          * everything else is either a request we don't know or
600          * something we send regardless of request */
601         switch (request) {
602         case NBD_INFO_NAME:
603             sendname = true;
604             break;
605         case NBD_INFO_BLOCK_SIZE:
606             blocksize = true;
607             break;
608         }
609     }
610     if (client->optlen) {
611         return nbd_reject_length(client, false, errp);
612     }
613 
614     exp = nbd_export_find(name);
615     if (!exp) {
616         g_autofree char *sane_name = nbd_sanitize_name(name);
617 
618         return nbd_negotiate_send_rep_err(client, NBD_REP_ERR_UNKNOWN,
619                                           errp, "export '%s' not present",
620                                           sane_name);
621     }
622 
623     /* Don't bother sending NBD_INFO_NAME unless client requested it */
624     if (sendname) {
625         rc = nbd_negotiate_send_info(client, NBD_INFO_NAME, namelen, name,
626                                      errp);
627         if (rc < 0) {
628             return rc;
629         }
630     }
631 
632     /* Send NBD_INFO_DESCRIPTION only if available, regardless of
633      * client request */
634     if (exp->description) {
635         size_t len = strlen(exp->description);
636 
637         assert(len <= NBD_MAX_STRING_SIZE);
638         rc = nbd_negotiate_send_info(client, NBD_INFO_DESCRIPTION,
639                                      len, exp->description, errp);
640         if (rc < 0) {
641             return rc;
642         }
643     }
644 
645     /* Send NBD_INFO_BLOCK_SIZE always, but tweak the minimum size
646      * according to whether the client requested it, and according to
647      * whether this is OPT_INFO or OPT_GO. */
648     /* minimum - 1 for back-compat, or actual if client will obey it. */
649     if (client->opt == NBD_OPT_INFO || blocksize) {
650         check_align = sizes[0] = blk_get_request_alignment(exp->blk);
651     } else {
652         sizes[0] = 1;
653     }
654     assert(sizes[0] <= NBD_MAX_BUFFER_SIZE);
655     /* preferred - Hard-code to 4096 for now.
656      * TODO: is blk_bs(blk)->bl.opt_transfer appropriate? */
657     sizes[1] = MAX(4096, sizes[0]);
658     /* maximum - At most 32M, but smaller as appropriate. */
659     sizes[2] = MIN(blk_get_max_transfer(exp->blk), NBD_MAX_BUFFER_SIZE);
660     trace_nbd_negotiate_handle_info_block_size(sizes[0], sizes[1], sizes[2]);
661     sizes[0] = cpu_to_be32(sizes[0]);
662     sizes[1] = cpu_to_be32(sizes[1]);
663     sizes[2] = cpu_to_be32(sizes[2]);
664     rc = nbd_negotiate_send_info(client, NBD_INFO_BLOCK_SIZE,
665                                  sizeof(sizes), sizes, errp);
666     if (rc < 0) {
667         return rc;
668     }
669 
670     /* Send NBD_INFO_EXPORT always */
671     myflags = exp->nbdflags;
672     if (client->structured_reply) {
673         myflags |= NBD_FLAG_SEND_DF;
674     }
675     trace_nbd_negotiate_new_style_size_flags(exp->size, myflags);
676     stq_be_p(buf, exp->size);
677     stw_be_p(buf + 8, myflags);
678     rc = nbd_negotiate_send_info(client, NBD_INFO_EXPORT,
679                                  sizeof(buf), buf, errp);
680     if (rc < 0) {
681         return rc;
682     }
683 
684     /*
685      * If the client is just asking for NBD_OPT_INFO, but forgot to
686      * request block sizes in a situation that would impact
687      * performance, then return an error. But for NBD_OPT_GO, we
688      * tolerate all clients, regardless of alignments.
689      */
690     if (client->opt == NBD_OPT_INFO && !blocksize &&
691         blk_get_request_alignment(exp->blk) > 1) {
692         return nbd_negotiate_send_rep_err(client,
693                                           NBD_REP_ERR_BLOCK_SIZE_REQD,
694                                           errp,
695                                           "request NBD_INFO_BLOCK_SIZE to "
696                                           "use this export");
697     }
698 
699     /* Final reply */
700     rc = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
701     if (rc < 0) {
702         return rc;
703     }
704 
705     if (client->opt == NBD_OPT_GO) {
706         client->exp = exp;
707         client->check_align = check_align;
708         QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
709         nbd_export_get(client->exp);
710         nbd_check_meta_export(client);
711         rc = 1;
712     }
713     return rc;
714 }
715 
716 
717 /* Handle NBD_OPT_STARTTLS. Return NULL to drop connection, or else the
718  * new channel for all further (now-encrypted) communication. */
719 static QIOChannel *nbd_negotiate_handle_starttls(NBDClient *client,
720                                                  Error **errp)
721 {
722     QIOChannel *ioc;
723     QIOChannelTLS *tioc;
724     struct NBDTLSHandshakeData data = { 0 };
725 
726     assert(client->opt == NBD_OPT_STARTTLS);
727 
728     trace_nbd_negotiate_handle_starttls();
729     ioc = client->ioc;
730 
731     if (nbd_negotiate_send_rep(client, NBD_REP_ACK, errp) < 0) {
732         return NULL;
733     }
734 
735     tioc = qio_channel_tls_new_server(ioc,
736                                       client->tlscreds,
737                                       client->tlsauthz,
738                                       errp);
739     if (!tioc) {
740         return NULL;
741     }
742 
743     qio_channel_set_name(QIO_CHANNEL(tioc), "nbd-server-tls");
744     trace_nbd_negotiate_handle_starttls_handshake();
745     data.loop = g_main_loop_new(g_main_context_default(), FALSE);
746     qio_channel_tls_handshake(tioc,
747                               nbd_tls_handshake,
748                               &data,
749                               NULL,
750                               NULL);
751 
752     if (!data.complete) {
753         g_main_loop_run(data.loop);
754     }
755     g_main_loop_unref(data.loop);
756     if (data.error) {
757         object_unref(OBJECT(tioc));
758         error_propagate(errp, data.error);
759         return NULL;
760     }
761 
762     return QIO_CHANNEL(tioc);
763 }
764 
765 /* nbd_negotiate_send_meta_context
766  *
767  * Send one chunk of reply to NBD_OPT_{LIST,SET}_META_CONTEXT
768  *
769  * For NBD_OPT_LIST_META_CONTEXT @context_id is ignored, 0 is used instead.
770  */
771 static int nbd_negotiate_send_meta_context(NBDClient *client,
772                                            const char *context,
773                                            uint32_t context_id,
774                                            Error **errp)
775 {
776     NBDOptionReplyMetaContext opt;
777     struct iovec iov[] = {
778         {.iov_base = &opt, .iov_len = sizeof(opt)},
779         {.iov_base = (void *)context, .iov_len = strlen(context)}
780     };
781 
782     assert(iov[1].iov_len <= NBD_MAX_STRING_SIZE);
783     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
784         context_id = 0;
785     }
786 
787     trace_nbd_negotiate_meta_query_reply(context, context_id);
788     set_be_option_rep(&opt.h, client->opt, NBD_REP_META_CONTEXT,
789                       sizeof(opt) - sizeof(opt.h) + iov[1].iov_len);
790     stl_be_p(&opt.context_id, context_id);
791 
792     return qio_channel_writev_all(client->ioc, iov, 2, errp) < 0 ? -EIO : 0;
793 }
794 
795 /* Read strlen(@pattern) bytes, and set @match to true if they match @pattern.
796  * @match is never set to false.
797  *
798  * Return -errno on I/O error, 0 if option was completely handled by
799  * sending a reply about inconsistent lengths, or 1 on success.
800  *
801  * Note: return code = 1 doesn't mean that we've read exactly @pattern.
802  * It only means that there are no errors.
803  */
804 static int nbd_meta_pattern(NBDClient *client, const char *pattern, bool *match,
805                             Error **errp)
806 {
807     int ret;
808     char *query;
809     size_t len = strlen(pattern);
810 
811     assert(len);
812 
813     query = g_malloc(len);
814     ret = nbd_opt_read(client, query, len, errp);
815     if (ret <= 0) {
816         g_free(query);
817         return ret;
818     }
819 
820     if (strncmp(query, pattern, len) == 0) {
821         trace_nbd_negotiate_meta_query_parse(pattern);
822         *match = true;
823     } else {
824         trace_nbd_negotiate_meta_query_skip("pattern not matched");
825     }
826     g_free(query);
827 
828     return 1;
829 }
830 
831 /*
832  * Read @len bytes, and set @match to true if they match @pattern, or if @len
833  * is 0 and the client is performing _LIST_. @match is never set to false.
834  *
835  * Return -errno on I/O error, 0 if option was completely handled by
836  * sending a reply about inconsistent lengths, or 1 on success.
837  *
838  * Note: return code = 1 doesn't mean that we've read exactly @pattern.
839  * It only means that there are no errors.
840  */
841 static int nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
842                                      uint32_t len, bool *match, Error **errp)
843 {
844     if (len == 0) {
845         if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
846             *match = true;
847         }
848         trace_nbd_negotiate_meta_query_parse("empty");
849         return 1;
850     }
851 
852     if (len != strlen(pattern)) {
853         trace_nbd_negotiate_meta_query_skip("different lengths");
854         return nbd_opt_skip(client, len, errp);
855     }
856 
857     return nbd_meta_pattern(client, pattern, match, errp);
858 }
859 
860 /* nbd_meta_base_query
861  *
862  * Handle queries to 'base' namespace. For now, only the base:allocation
863  * context is available.  'len' is the amount of text remaining to be read from
864  * the current name, after the 'base:' portion has been stripped.
865  *
866  * Return -errno on I/O error, 0 if option was completely handled by
867  * sending a reply about inconsistent lengths, or 1 on success.
868  */
869 static int nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta,
870                                uint32_t len, Error **errp)
871 {
872     return nbd_meta_empty_or_pattern(client, "allocation", len,
873                                      &meta->base_allocation, errp);
874 }
875 
876 /* nbd_meta_bitmap_query
877  *
878  * Handle query to 'qemu:' namespace.
879  * @len is the amount of text remaining to be read from the current name, after
880  * the 'qemu:' portion has been stripped.
881  *
882  * Return -errno on I/O error, 0 if option was completely handled by
883  * sending a reply about inconsistent lengths, or 1 on success. */
884 static int nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
885                                uint32_t len, Error **errp)
886 {
887     bool dirty_bitmap = false;
888     size_t dirty_bitmap_len = strlen("dirty-bitmap:");
889     int ret;
890 
891     if (!meta->exp->export_bitmap) {
892         trace_nbd_negotiate_meta_query_skip("no dirty-bitmap exported");
893         return nbd_opt_skip(client, len, errp);
894     }
895 
896     if (len == 0) {
897         if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
898             meta->bitmap = true;
899         }
900         trace_nbd_negotiate_meta_query_parse("empty");
901         return 1;
902     }
903 
904     if (len < dirty_bitmap_len) {
905         trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
906         return nbd_opt_skip(client, len, errp);
907     }
908 
909     len -= dirty_bitmap_len;
910     ret = nbd_meta_pattern(client, "dirty-bitmap:", &dirty_bitmap, errp);
911     if (ret <= 0) {
912         return ret;
913     }
914     if (!dirty_bitmap) {
915         trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
916         return nbd_opt_skip(client, len, errp);
917     }
918 
919     trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
920 
921     return nbd_meta_empty_or_pattern(
922             client, meta->exp->export_bitmap_context +
923             strlen("qemu:dirty_bitmap:"), len, &meta->bitmap, errp);
924 }
925 
926 /* nbd_negotiate_meta_query
927  *
928  * Parse namespace name and call corresponding function to parse body of the
929  * query.
930  *
931  * The only supported namespaces are 'base' and 'qemu'.
932  *
933  * The function aims not wasting time and memory to read long unknown namespace
934  * names.
935  *
936  * Return -errno on I/O error, 0 if option was completely handled by
937  * sending a reply about inconsistent lengths, or 1 on success. */
938 static int nbd_negotiate_meta_query(NBDClient *client,
939                                     NBDExportMetaContexts *meta, Error **errp)
940 {
941     /*
942      * Both 'qemu' and 'base' namespaces have length = 5 including a
943      * colon. If another length namespace is later introduced, this
944      * should certainly be refactored.
945      */
946     int ret;
947     size_t ns_len = 5;
948     char ns[5];
949     uint32_t len;
950 
951     ret = nbd_opt_read(client, &len, sizeof(len), errp);
952     if (ret <= 0) {
953         return ret;
954     }
955     len = cpu_to_be32(len);
956 
957     if (len > NBD_MAX_STRING_SIZE) {
958         trace_nbd_negotiate_meta_query_skip("length too long");
959         return nbd_opt_skip(client, len, errp);
960     }
961     if (len < ns_len) {
962         trace_nbd_negotiate_meta_query_skip("length too short");
963         return nbd_opt_skip(client, len, errp);
964     }
965 
966     len -= ns_len;
967     ret = nbd_opt_read(client, ns, ns_len, errp);
968     if (ret <= 0) {
969         return ret;
970     }
971 
972     if (!strncmp(ns, "base:", ns_len)) {
973         trace_nbd_negotiate_meta_query_parse("base:");
974         return nbd_meta_base_query(client, meta, len, errp);
975     } else if (!strncmp(ns, "qemu:", ns_len)) {
976         trace_nbd_negotiate_meta_query_parse("qemu:");
977         return nbd_meta_qemu_query(client, meta, len, errp);
978     }
979 
980     trace_nbd_negotiate_meta_query_skip("unknown namespace");
981     return nbd_opt_skip(client, len, errp);
982 }
983 
984 /* nbd_negotiate_meta_queries
985  * Handle NBD_OPT_LIST_META_CONTEXT and NBD_OPT_SET_META_CONTEXT
986  *
987  * Return -errno on I/O error, or 0 if option was completely handled. */
988 static int nbd_negotiate_meta_queries(NBDClient *client,
989                                       NBDExportMetaContexts *meta, Error **errp)
990 {
991     int ret;
992     g_autofree char *export_name = NULL;
993     NBDExportMetaContexts local_meta;
994     uint32_t nb_queries;
995     int i;
996 
997     if (!client->structured_reply) {
998         return nbd_opt_invalid(client, errp,
999                                "request option '%s' when structured reply "
1000                                "is not negotiated",
1001                                nbd_opt_lookup(client->opt));
1002     }
1003 
1004     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
1005         /* Only change the caller's meta on SET. */
1006         meta = &local_meta;
1007     }
1008 
1009     memset(meta, 0, sizeof(*meta));
1010 
1011     ret = nbd_opt_read_name(client, &export_name, NULL, errp);
1012     if (ret <= 0) {
1013         return ret;
1014     }
1015 
1016     meta->exp = nbd_export_find(export_name);
1017     if (meta->exp == NULL) {
1018         g_autofree char *sane_name = nbd_sanitize_name(export_name);
1019 
1020         return nbd_opt_drop(client, NBD_REP_ERR_UNKNOWN, errp,
1021                             "export '%s' not present", sane_name);
1022     }
1023 
1024     ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), errp);
1025     if (ret <= 0) {
1026         return ret;
1027     }
1028     nb_queries = cpu_to_be32(nb_queries);
1029     trace_nbd_negotiate_meta_context(nbd_opt_lookup(client->opt),
1030                                      export_name, nb_queries);
1031 
1032     if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
1033         /* enable all known contexts */
1034         meta->base_allocation = true;
1035         meta->bitmap = !!meta->exp->export_bitmap;
1036     } else {
1037         for (i = 0; i < nb_queries; ++i) {
1038             ret = nbd_negotiate_meta_query(client, meta, errp);
1039             if (ret <= 0) {
1040                 return ret;
1041             }
1042         }
1043     }
1044 
1045     if (meta->base_allocation) {
1046         ret = nbd_negotiate_send_meta_context(client, "base:allocation",
1047                                               NBD_META_ID_BASE_ALLOCATION,
1048                                               errp);
1049         if (ret < 0) {
1050             return ret;
1051         }
1052     }
1053 
1054     if (meta->bitmap) {
1055         ret = nbd_negotiate_send_meta_context(client,
1056                                               meta->exp->export_bitmap_context,
1057                                               NBD_META_ID_DIRTY_BITMAP,
1058                                               errp);
1059         if (ret < 0) {
1060             return ret;
1061         }
1062     }
1063 
1064     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1065     if (ret == 0) {
1066         meta->valid = true;
1067     }
1068 
1069     return ret;
1070 }
1071 
1072 /* nbd_negotiate_options
1073  * Process all NBD_OPT_* client option commands, during fixed newstyle
1074  * negotiation.
1075  * Return:
1076  * -errno  on error, errp is set
1077  * 0       on successful negotiation, errp is not set
1078  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
1079  *         errp is not set
1080  */
1081 static int nbd_negotiate_options(NBDClient *client, Error **errp)
1082 {
1083     uint32_t flags;
1084     bool fixedNewstyle = false;
1085     bool no_zeroes = false;
1086 
1087     /* Client sends:
1088         [ 0 ..   3]   client flags
1089 
1090        Then we loop until NBD_OPT_EXPORT_NAME or NBD_OPT_GO:
1091         [ 0 ..   7]   NBD_OPTS_MAGIC
1092         [ 8 ..  11]   NBD option
1093         [12 ..  15]   Data length
1094         ...           Rest of request
1095 
1096         [ 0 ..   7]   NBD_OPTS_MAGIC
1097         [ 8 ..  11]   Second NBD option
1098         [12 ..  15]   Data length
1099         ...           Rest of request
1100     */
1101 
1102     if (nbd_read32(client->ioc, &flags, "flags", errp) < 0) {
1103         return -EIO;
1104     }
1105     trace_nbd_negotiate_options_flags(flags);
1106     if (flags & NBD_FLAG_C_FIXED_NEWSTYLE) {
1107         fixedNewstyle = true;
1108         flags &= ~NBD_FLAG_C_FIXED_NEWSTYLE;
1109     }
1110     if (flags & NBD_FLAG_C_NO_ZEROES) {
1111         no_zeroes = true;
1112         flags &= ~NBD_FLAG_C_NO_ZEROES;
1113     }
1114     if (flags != 0) {
1115         error_setg(errp, "Unknown client flags 0x%" PRIx32 " received", flags);
1116         return -EINVAL;
1117     }
1118 
1119     while (1) {
1120         int ret;
1121         uint32_t option, length;
1122         uint64_t magic;
1123 
1124         if (nbd_read64(client->ioc, &magic, "opts magic", errp) < 0) {
1125             return -EINVAL;
1126         }
1127         trace_nbd_negotiate_options_check_magic(magic);
1128         if (magic != NBD_OPTS_MAGIC) {
1129             error_setg(errp, "Bad magic received");
1130             return -EINVAL;
1131         }
1132 
1133         if (nbd_read32(client->ioc, &option, "option", errp) < 0) {
1134             return -EINVAL;
1135         }
1136         client->opt = option;
1137 
1138         if (nbd_read32(client->ioc, &length, "option length", errp) < 0) {
1139             return -EINVAL;
1140         }
1141         assert(!client->optlen);
1142         client->optlen = length;
1143 
1144         if (length > NBD_MAX_BUFFER_SIZE) {
1145             error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
1146                        length, NBD_MAX_BUFFER_SIZE);
1147             return -EINVAL;
1148         }
1149 
1150         trace_nbd_negotiate_options_check_option(option,
1151                                                  nbd_opt_lookup(option));
1152         if (client->tlscreds &&
1153             client->ioc == (QIOChannel *)client->sioc) {
1154             QIOChannel *tioc;
1155             if (!fixedNewstyle) {
1156                 error_setg(errp, "Unsupported option 0x%" PRIx32, option);
1157                 return -EINVAL;
1158             }
1159             switch (option) {
1160             case NBD_OPT_STARTTLS:
1161                 if (length) {
1162                     /* Unconditionally drop the connection if the client
1163                      * can't start a TLS negotiation correctly */
1164                     return nbd_reject_length(client, true, errp);
1165                 }
1166                 tioc = nbd_negotiate_handle_starttls(client, errp);
1167                 if (!tioc) {
1168                     return -EIO;
1169                 }
1170                 ret = 0;
1171                 object_unref(OBJECT(client->ioc));
1172                 client->ioc = QIO_CHANNEL(tioc);
1173                 break;
1174 
1175             case NBD_OPT_EXPORT_NAME:
1176                 /* No way to return an error to client, so drop connection */
1177                 error_setg(errp, "Option 0x%x not permitted before TLS",
1178                            option);
1179                 return -EINVAL;
1180 
1181             default:
1182                 /* Let the client keep trying, unless they asked to
1183                  * quit. Always try to give an error back to the
1184                  * client; but when replying to OPT_ABORT, be aware
1185                  * that the client may hang up before receiving the
1186                  * error, in which case we are fine ignoring the
1187                  * resulting EPIPE. */
1188                 ret = nbd_opt_drop(client, NBD_REP_ERR_TLS_REQD,
1189                                    option == NBD_OPT_ABORT ? NULL : errp,
1190                                    "Option 0x%" PRIx32
1191                                    " not permitted before TLS", option);
1192                 if (option == NBD_OPT_ABORT) {
1193                     return 1;
1194                 }
1195                 break;
1196             }
1197         } else if (fixedNewstyle) {
1198             switch (option) {
1199             case NBD_OPT_LIST:
1200                 if (length) {
1201                     ret = nbd_reject_length(client, false, errp);
1202                 } else {
1203                     ret = nbd_negotiate_handle_list(client, errp);
1204                 }
1205                 break;
1206 
1207             case NBD_OPT_ABORT:
1208                 /* NBD spec says we must try to reply before
1209                  * disconnecting, but that we must also tolerate
1210                  * guests that don't wait for our reply. */
1211                 nbd_negotiate_send_rep(client, NBD_REP_ACK, NULL);
1212                 return 1;
1213 
1214             case NBD_OPT_EXPORT_NAME:
1215                 return nbd_negotiate_handle_export_name(client, no_zeroes,
1216                                                         errp);
1217 
1218             case NBD_OPT_INFO:
1219             case NBD_OPT_GO:
1220                 ret = nbd_negotiate_handle_info(client, errp);
1221                 if (ret == 1) {
1222                     assert(option == NBD_OPT_GO);
1223                     return 0;
1224                 }
1225                 break;
1226 
1227             case NBD_OPT_STARTTLS:
1228                 if (length) {
1229                     ret = nbd_reject_length(client, false, errp);
1230                 } else if (client->tlscreds) {
1231                     ret = nbd_negotiate_send_rep_err(client,
1232                                                      NBD_REP_ERR_INVALID, errp,
1233                                                      "TLS already enabled");
1234                 } else {
1235                     ret = nbd_negotiate_send_rep_err(client,
1236                                                      NBD_REP_ERR_POLICY, errp,
1237                                                      "TLS not configured");
1238                 }
1239                 break;
1240 
1241             case NBD_OPT_STRUCTURED_REPLY:
1242                 if (length) {
1243                     ret = nbd_reject_length(client, false, errp);
1244                 } else if (client->structured_reply) {
1245                     ret = nbd_negotiate_send_rep_err(
1246                         client, NBD_REP_ERR_INVALID, errp,
1247                         "structured reply already negotiated");
1248                 } else {
1249                     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1250                     client->structured_reply = true;
1251                 }
1252                 break;
1253 
1254             case NBD_OPT_LIST_META_CONTEXT:
1255             case NBD_OPT_SET_META_CONTEXT:
1256                 ret = nbd_negotiate_meta_queries(client, &client->export_meta,
1257                                                  errp);
1258                 break;
1259 
1260             default:
1261                 ret = nbd_opt_drop(client, NBD_REP_ERR_UNSUP, errp,
1262                                    "Unsupported option %" PRIu32 " (%s)",
1263                                    option, nbd_opt_lookup(option));
1264                 break;
1265             }
1266         } else {
1267             /*
1268              * If broken new-style we should drop the connection
1269              * for anything except NBD_OPT_EXPORT_NAME
1270              */
1271             switch (option) {
1272             case NBD_OPT_EXPORT_NAME:
1273                 return nbd_negotiate_handle_export_name(client, no_zeroes,
1274                                                         errp);
1275 
1276             default:
1277                 error_setg(errp, "Unsupported option %" PRIu32 " (%s)",
1278                            option, nbd_opt_lookup(option));
1279                 return -EINVAL;
1280             }
1281         }
1282         if (ret < 0) {
1283             return ret;
1284         }
1285     }
1286 }
1287 
1288 /* nbd_negotiate
1289  * Return:
1290  * -errno  on error, errp is set
1291  * 0       on successful negotiation, errp is not set
1292  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
1293  *         errp is not set
1294  */
1295 static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
1296 {
1297     ERRP_GUARD();
1298     char buf[NBD_OLDSTYLE_NEGOTIATE_SIZE] = "";
1299     int ret;
1300 
1301     /* Old style negotiation header, no room for options
1302         [ 0 ..   7]   passwd       ("NBDMAGIC")
1303         [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
1304         [16 ..  23]   size
1305         [24 ..  27]   export flags (zero-extended)
1306         [28 .. 151]   reserved     (0)
1307 
1308        New style negotiation header, client can send options
1309         [ 0 ..   7]   passwd       ("NBDMAGIC")
1310         [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
1311         [16 ..  17]   server flags (0)
1312         ....options sent, ending in NBD_OPT_EXPORT_NAME or NBD_OPT_GO....
1313      */
1314 
1315     qio_channel_set_blocking(client->ioc, false, NULL);
1316 
1317     trace_nbd_negotiate_begin();
1318     memcpy(buf, "NBDMAGIC", 8);
1319 
1320     stq_be_p(buf + 8, NBD_OPTS_MAGIC);
1321     stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES);
1322 
1323     if (nbd_write(client->ioc, buf, 18, errp) < 0) {
1324         error_prepend(errp, "write failed: ");
1325         return -EINVAL;
1326     }
1327     ret = nbd_negotiate_options(client, errp);
1328     if (ret != 0) {
1329         if (ret < 0) {
1330             error_prepend(errp, "option negotiation failed: ");
1331         }
1332         return ret;
1333     }
1334 
1335     /* Attach the channel to the same AioContext as the export */
1336     if (client->exp && client->exp->ctx) {
1337         qio_channel_attach_aio_context(client->ioc, client->exp->ctx);
1338     }
1339 
1340     assert(!client->optlen);
1341     trace_nbd_negotiate_success();
1342 
1343     return 0;
1344 }
1345 
1346 static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
1347                                Error **errp)
1348 {
1349     uint8_t buf[NBD_REQUEST_SIZE];
1350     uint32_t magic;
1351     int ret;
1352 
1353     ret = nbd_read(ioc, buf, sizeof(buf), "request", errp);
1354     if (ret < 0) {
1355         return ret;
1356     }
1357 
1358     /* Request
1359        [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
1360        [ 4 ..  5]   flags   (NBD_CMD_FLAG_FUA, ...)
1361        [ 6 ..  7]   type    (NBD_CMD_READ, ...)
1362        [ 8 .. 15]   handle
1363        [16 .. 23]   from
1364        [24 .. 27]   len
1365      */
1366 
1367     magic = ldl_be_p(buf);
1368     request->flags  = lduw_be_p(buf + 4);
1369     request->type   = lduw_be_p(buf + 6);
1370     request->handle = ldq_be_p(buf + 8);
1371     request->from   = ldq_be_p(buf + 16);
1372     request->len    = ldl_be_p(buf + 24);
1373 
1374     trace_nbd_receive_request(magic, request->flags, request->type,
1375                               request->from, request->len);
1376 
1377     if (magic != NBD_REQUEST_MAGIC) {
1378         error_setg(errp, "invalid magic (got 0x%" PRIx32 ")", magic);
1379         return -EINVAL;
1380     }
1381     return 0;
1382 }
1383 
1384 #define MAX_NBD_REQUESTS 16
1385 
1386 void nbd_client_get(NBDClient *client)
1387 {
1388     client->refcount++;
1389 }
1390 
1391 void nbd_client_put(NBDClient *client)
1392 {
1393     if (--client->refcount == 0) {
1394         /* The last reference should be dropped by client->close,
1395          * which is called by client_close.
1396          */
1397         assert(client->closing);
1398 
1399         qio_channel_detach_aio_context(client->ioc);
1400         object_unref(OBJECT(client->sioc));
1401         object_unref(OBJECT(client->ioc));
1402         if (client->tlscreds) {
1403             object_unref(OBJECT(client->tlscreds));
1404         }
1405         g_free(client->tlsauthz);
1406         if (client->exp) {
1407             QTAILQ_REMOVE(&client->exp->clients, client, next);
1408             nbd_export_put(client->exp);
1409         }
1410         g_free(client);
1411     }
1412 }
1413 
1414 static void client_close(NBDClient *client, bool negotiated)
1415 {
1416     if (client->closing) {
1417         return;
1418     }
1419 
1420     client->closing = true;
1421 
1422     /* Force requests to finish.  They will drop their own references,
1423      * then we'll close the socket and free the NBDClient.
1424      */
1425     qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
1426                          NULL);
1427 
1428     /* Also tell the client, so that they release their reference.  */
1429     if (client->close_fn) {
1430         client->close_fn(client, negotiated);
1431     }
1432 }
1433 
1434 static NBDRequestData *nbd_request_get(NBDClient *client)
1435 {
1436     NBDRequestData *req;
1437 
1438     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
1439     client->nb_requests++;
1440 
1441     req = g_new0(NBDRequestData, 1);
1442     nbd_client_get(client);
1443     req->client = client;
1444     return req;
1445 }
1446 
1447 static void nbd_request_put(NBDRequestData *req)
1448 {
1449     NBDClient *client = req->client;
1450 
1451     if (req->data) {
1452         qemu_vfree(req->data);
1453     }
1454     g_free(req);
1455 
1456     client->nb_requests--;
1457     nbd_client_receive_next_request(client);
1458 
1459     nbd_client_put(client);
1460 }
1461 
1462 static void blk_aio_attached(AioContext *ctx, void *opaque)
1463 {
1464     NBDExport *exp = opaque;
1465     NBDClient *client;
1466 
1467     trace_nbd_blk_aio_attached(exp->name, ctx);
1468 
1469     exp->ctx = ctx;
1470 
1471     QTAILQ_FOREACH(client, &exp->clients, next) {
1472         qio_channel_attach_aio_context(client->ioc, ctx);
1473         if (client->recv_coroutine) {
1474             aio_co_schedule(ctx, client->recv_coroutine);
1475         }
1476         if (client->send_coroutine) {
1477             aio_co_schedule(ctx, client->send_coroutine);
1478         }
1479     }
1480 }
1481 
1482 static void blk_aio_detach(void *opaque)
1483 {
1484     NBDExport *exp = opaque;
1485     NBDClient *client;
1486 
1487     trace_nbd_blk_aio_detach(exp->name, exp->ctx);
1488 
1489     QTAILQ_FOREACH(client, &exp->clients, next) {
1490         qio_channel_detach_aio_context(client->ioc);
1491     }
1492 
1493     exp->ctx = NULL;
1494 }
1495 
1496 static void nbd_eject_notifier(Notifier *n, void *data)
1497 {
1498     NBDExport *exp = container_of(n, NBDExport, eject_notifier);
1499     AioContext *aio_context;
1500 
1501     aio_context = exp->ctx;
1502     aio_context_acquire(aio_context);
1503     nbd_export_close(exp);
1504     aio_context_release(aio_context);
1505 }
1506 
1507 NBDExport *nbd_export_new(BlockDriverState *bs, uint64_t dev_offset,
1508                           uint64_t size, const char *name, const char *desc,
1509                           const char *bitmap, bool readonly, bool shared,
1510                           void (*close)(NBDExport *), bool writethrough,
1511                           BlockBackend *on_eject_blk, Error **errp)
1512 {
1513     AioContext *ctx;
1514     BlockBackend *blk;
1515     NBDExport *exp = g_new0(NBDExport, 1);
1516     uint64_t perm;
1517     int ret;
1518 
1519     /*
1520      * NBD exports are used for non-shared storage migration.  Make sure
1521      * that BDRV_O_INACTIVE is cleared and the image is ready for write
1522      * access since the export could be available before migration handover.
1523      * ctx was acquired in the caller.
1524      */
1525     assert(name && strlen(name) <= NBD_MAX_STRING_SIZE);
1526     ctx = bdrv_get_aio_context(bs);
1527     bdrv_invalidate_cache(bs, NULL);
1528 
1529     /* Don't allow resize while the NBD server is running, otherwise we don't
1530      * care what happens with the node. */
1531     perm = BLK_PERM_CONSISTENT_READ;
1532     if (!readonly) {
1533         perm |= BLK_PERM_WRITE;
1534     }
1535     blk = blk_new(ctx, perm,
1536                   BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
1537                   BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD);
1538     ret = blk_insert_bs(blk, bs, errp);
1539     if (ret < 0) {
1540         goto fail;
1541     }
1542     blk_set_enable_write_cache(blk, !writethrough);
1543     blk_set_allow_aio_context_change(blk, true);
1544 
1545     exp->refcount = 1;
1546     QTAILQ_INIT(&exp->clients);
1547     exp->blk = blk;
1548     assert(dev_offset <= INT64_MAX);
1549     exp->dev_offset = dev_offset;
1550     exp->name = g_strdup(name);
1551     assert(!desc || strlen(desc) <= NBD_MAX_STRING_SIZE);
1552     exp->description = g_strdup(desc);
1553     exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH |
1554                      NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE);
1555     if (readonly) {
1556         exp->nbdflags |= NBD_FLAG_READ_ONLY;
1557         if (shared) {
1558             exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN;
1559         }
1560     } else {
1561         exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES |
1562                           NBD_FLAG_SEND_FAST_ZERO);
1563     }
1564     assert(size <= INT64_MAX - dev_offset);
1565     exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
1566 
1567     if (bitmap) {
1568         BdrvDirtyBitmap *bm = NULL;
1569 
1570         while (bs) {
1571             bm = bdrv_find_dirty_bitmap(bs, bitmap);
1572             if (bm != NULL) {
1573                 break;
1574             }
1575 
1576             bs = bdrv_filter_or_cow_bs(bs);
1577         }
1578 
1579         if (bm == NULL) {
1580             error_setg(errp, "Bitmap '%s' is not found", bitmap);
1581             goto fail;
1582         }
1583 
1584         if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
1585             goto fail;
1586         }
1587 
1588         if (readonly && bdrv_is_writable(bs) &&
1589             bdrv_dirty_bitmap_enabled(bm)) {
1590             error_setg(errp,
1591                        "Enabled bitmap '%s' incompatible with readonly export",
1592                        bitmap);
1593             goto fail;
1594         }
1595 
1596         bdrv_dirty_bitmap_set_busy(bm, true);
1597         exp->export_bitmap = bm;
1598         assert(strlen(bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
1599         exp->export_bitmap_context = g_strdup_printf("qemu:dirty-bitmap:%s",
1600                                                      bitmap);
1601         assert(strlen(exp->export_bitmap_context) < NBD_MAX_STRING_SIZE);
1602     }
1603 
1604     exp->close = close;
1605     exp->ctx = ctx;
1606     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
1607 
1608     if (on_eject_blk) {
1609         blk_ref(on_eject_blk);
1610         exp->eject_notifier_blk = on_eject_blk;
1611         exp->eject_notifier.notify = nbd_eject_notifier;
1612         blk_add_remove_bs_notifier(on_eject_blk, &exp->eject_notifier);
1613     }
1614     QTAILQ_INSERT_TAIL(&exports, exp, next);
1615     nbd_export_get(exp);
1616     return exp;
1617 
1618 fail:
1619     blk_unref(blk);
1620     g_free(exp->name);
1621     g_free(exp->description);
1622     g_free(exp);
1623     return NULL;
1624 }
1625 
1626 NBDExport *nbd_export_find(const char *name)
1627 {
1628     NBDExport *exp;
1629     QTAILQ_FOREACH(exp, &exports, next) {
1630         if (strcmp(name, exp->name) == 0) {
1631             return exp;
1632         }
1633     }
1634 
1635     return NULL;
1636 }
1637 
1638 AioContext *
1639 nbd_export_aio_context(NBDExport *exp)
1640 {
1641     return exp->ctx;
1642 }
1643 
1644 void nbd_export_close(NBDExport *exp)
1645 {
1646     NBDClient *client, *next;
1647 
1648     nbd_export_get(exp);
1649     /*
1650      * TODO: Should we expand QMP NbdServerRemoveNode enum to allow a
1651      * close mode that stops advertising the export to new clients but
1652      * still permits existing clients to run to completion? Because of
1653      * that possibility, nbd_export_close() can be called more than
1654      * once on an export.
1655      */
1656     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
1657         client_close(client, true);
1658     }
1659     if (exp->name) {
1660         nbd_export_put(exp);
1661         g_free(exp->name);
1662         exp->name = NULL;
1663         QTAILQ_REMOVE(&exports, exp, next);
1664         QTAILQ_INSERT_TAIL(&closed_exports, exp, next);
1665     }
1666     g_free(exp->description);
1667     exp->description = NULL;
1668     nbd_export_put(exp);
1669 }
1670 
1671 void nbd_export_remove(NBDExport *exp, NbdServerRemoveMode mode, Error **errp)
1672 {
1673     ERRP_GUARD();
1674     if (mode == NBD_SERVER_REMOVE_MODE_HARD || QTAILQ_EMPTY(&exp->clients)) {
1675         nbd_export_close(exp);
1676         return;
1677     }
1678 
1679     assert(mode == NBD_SERVER_REMOVE_MODE_SAFE);
1680 
1681     error_setg(errp, "export '%s' still in use", exp->name);
1682     error_append_hint(errp, "Use mode='hard' to force client disconnect\n");
1683 }
1684 
1685 void nbd_export_get(NBDExport *exp)
1686 {
1687     assert(exp->refcount > 0);
1688     exp->refcount++;
1689 }
1690 
1691 void nbd_export_put(NBDExport *exp)
1692 {
1693     assert(exp->refcount > 0);
1694     if (exp->refcount == 1) {
1695         nbd_export_close(exp);
1696     }
1697 
1698     /* nbd_export_close() may theoretically reduce refcount to 0. It may happen
1699      * if someone calls nbd_export_put() on named export not through
1700      * nbd_export_set_name() when refcount is 1. So, let's assert that
1701      * it is > 0.
1702      */
1703     assert(exp->refcount > 0);
1704     if (--exp->refcount == 0) {
1705         assert(exp->name == NULL);
1706         assert(exp->description == NULL);
1707 
1708         if (exp->close) {
1709             exp->close(exp);
1710         }
1711 
1712         if (exp->blk) {
1713             if (exp->eject_notifier_blk) {
1714                 notifier_remove(&exp->eject_notifier);
1715                 blk_unref(exp->eject_notifier_blk);
1716             }
1717             blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
1718                                             blk_aio_detach, exp);
1719             blk_unref(exp->blk);
1720             exp->blk = NULL;
1721         }
1722 
1723         if (exp->export_bitmap) {
1724             bdrv_dirty_bitmap_set_busy(exp->export_bitmap, false);
1725             g_free(exp->export_bitmap_context);
1726         }
1727 
1728         QTAILQ_REMOVE(&closed_exports, exp, next);
1729         g_free(exp);
1730         aio_wait_kick();
1731     }
1732 }
1733 
1734 BlockBackend *nbd_export_get_blockdev(NBDExport *exp)
1735 {
1736     return exp->blk;
1737 }
1738 
1739 void nbd_export_close_all(void)
1740 {
1741     NBDExport *exp, *next;
1742     AioContext *aio_context;
1743 
1744     QTAILQ_FOREACH_SAFE(exp, &exports, next, next) {
1745         aio_context = exp->ctx;
1746         aio_context_acquire(aio_context);
1747         nbd_export_close(exp);
1748         aio_context_release(aio_context);
1749     }
1750 
1751     AIO_WAIT_WHILE(NULL, !(QTAILQ_EMPTY(&exports) &&
1752                            QTAILQ_EMPTY(&closed_exports)));
1753 }
1754 
1755 static int coroutine_fn nbd_co_send_iov(NBDClient *client, struct iovec *iov,
1756                                         unsigned niov, Error **errp)
1757 {
1758     int ret;
1759 
1760     g_assert(qemu_in_coroutine());
1761     qemu_co_mutex_lock(&client->send_lock);
1762     client->send_coroutine = qemu_coroutine_self();
1763 
1764     ret = qio_channel_writev_all(client->ioc, iov, niov, errp) < 0 ? -EIO : 0;
1765 
1766     client->send_coroutine = NULL;
1767     qemu_co_mutex_unlock(&client->send_lock);
1768 
1769     return ret;
1770 }
1771 
1772 static inline void set_be_simple_reply(NBDSimpleReply *reply, uint64_t error,
1773                                        uint64_t handle)
1774 {
1775     stl_be_p(&reply->magic, NBD_SIMPLE_REPLY_MAGIC);
1776     stl_be_p(&reply->error, error);
1777     stq_be_p(&reply->handle, handle);
1778 }
1779 
1780 static int nbd_co_send_simple_reply(NBDClient *client,
1781                                     uint64_t handle,
1782                                     uint32_t error,
1783                                     void *data,
1784                                     size_t len,
1785                                     Error **errp)
1786 {
1787     NBDSimpleReply reply;
1788     int nbd_err = system_errno_to_nbd_errno(error);
1789     struct iovec iov[] = {
1790         {.iov_base = &reply, .iov_len = sizeof(reply)},
1791         {.iov_base = data, .iov_len = len}
1792     };
1793 
1794     trace_nbd_co_send_simple_reply(handle, nbd_err, nbd_err_lookup(nbd_err),
1795                                    len);
1796     set_be_simple_reply(&reply, nbd_err, handle);
1797 
1798     return nbd_co_send_iov(client, iov, len ? 2 : 1, errp);
1799 }
1800 
1801 static inline void set_be_chunk(NBDStructuredReplyChunk *chunk, uint16_t flags,
1802                                 uint16_t type, uint64_t handle, uint32_t length)
1803 {
1804     stl_be_p(&chunk->magic, NBD_STRUCTURED_REPLY_MAGIC);
1805     stw_be_p(&chunk->flags, flags);
1806     stw_be_p(&chunk->type, type);
1807     stq_be_p(&chunk->handle, handle);
1808     stl_be_p(&chunk->length, length);
1809 }
1810 
1811 static int coroutine_fn nbd_co_send_structured_done(NBDClient *client,
1812                                                     uint64_t handle,
1813                                                     Error **errp)
1814 {
1815     NBDStructuredReplyChunk chunk;
1816     struct iovec iov[] = {
1817         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1818     };
1819 
1820     trace_nbd_co_send_structured_done(handle);
1821     set_be_chunk(&chunk, NBD_REPLY_FLAG_DONE, NBD_REPLY_TYPE_NONE, handle, 0);
1822 
1823     return nbd_co_send_iov(client, iov, 1, errp);
1824 }
1825 
1826 static int coroutine_fn nbd_co_send_structured_read(NBDClient *client,
1827                                                     uint64_t handle,
1828                                                     uint64_t offset,
1829                                                     void *data,
1830                                                     size_t size,
1831                                                     bool final,
1832                                                     Error **errp)
1833 {
1834     NBDStructuredReadData chunk;
1835     struct iovec iov[] = {
1836         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1837         {.iov_base = data, .iov_len = size}
1838     };
1839 
1840     assert(size);
1841     trace_nbd_co_send_structured_read(handle, offset, data, size);
1842     set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
1843                  NBD_REPLY_TYPE_OFFSET_DATA, handle,
1844                  sizeof(chunk) - sizeof(chunk.h) + size);
1845     stq_be_p(&chunk.offset, offset);
1846 
1847     return nbd_co_send_iov(client, iov, 2, errp);
1848 }
1849 
1850 static int coroutine_fn nbd_co_send_structured_error(NBDClient *client,
1851                                                      uint64_t handle,
1852                                                      uint32_t error,
1853                                                      const char *msg,
1854                                                      Error **errp)
1855 {
1856     NBDStructuredError chunk;
1857     int nbd_err = system_errno_to_nbd_errno(error);
1858     struct iovec iov[] = {
1859         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1860         {.iov_base = (char *)msg, .iov_len = msg ? strlen(msg) : 0},
1861     };
1862 
1863     assert(nbd_err);
1864     trace_nbd_co_send_structured_error(handle, nbd_err,
1865                                        nbd_err_lookup(nbd_err), msg ? msg : "");
1866     set_be_chunk(&chunk.h, NBD_REPLY_FLAG_DONE, NBD_REPLY_TYPE_ERROR, handle,
1867                  sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len);
1868     stl_be_p(&chunk.error, nbd_err);
1869     stw_be_p(&chunk.message_length, iov[1].iov_len);
1870 
1871     return nbd_co_send_iov(client, iov, 1 + !!iov[1].iov_len, errp);
1872 }
1873 
1874 /* Do a sparse read and send the structured reply to the client.
1875  * Returns -errno if sending fails. bdrv_block_status_above() failure is
1876  * reported to the client, at which point this function succeeds.
1877  */
1878 static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
1879                                                 uint64_t handle,
1880                                                 uint64_t offset,
1881                                                 uint8_t *data,
1882                                                 size_t size,
1883                                                 Error **errp)
1884 {
1885     int ret = 0;
1886     NBDExport *exp = client->exp;
1887     size_t progress = 0;
1888 
1889     while (progress < size) {
1890         int64_t pnum;
1891         int status = bdrv_block_status_above(blk_bs(exp->blk), NULL,
1892                                              offset + progress,
1893                                              size - progress, &pnum, NULL,
1894                                              NULL);
1895         bool final;
1896 
1897         if (status < 0) {
1898             char *msg = g_strdup_printf("unable to check for holes: %s",
1899                                         strerror(-status));
1900 
1901             ret = nbd_co_send_structured_error(client, handle, -status, msg,
1902                                                errp);
1903             g_free(msg);
1904             return ret;
1905         }
1906         assert(pnum && pnum <= size - progress);
1907         final = progress + pnum == size;
1908         if (status & BDRV_BLOCK_ZERO) {
1909             NBDStructuredReadHole chunk;
1910             struct iovec iov[] = {
1911                 {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1912             };
1913 
1914             trace_nbd_co_send_structured_read_hole(handle, offset + progress,
1915                                                    pnum);
1916             set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
1917                          NBD_REPLY_TYPE_OFFSET_HOLE,
1918                          handle, sizeof(chunk) - sizeof(chunk.h));
1919             stq_be_p(&chunk.offset, offset + progress);
1920             stl_be_p(&chunk.length, pnum);
1921             ret = nbd_co_send_iov(client, iov, 1, errp);
1922         } else {
1923             ret = blk_pread(exp->blk, offset + progress + exp->dev_offset,
1924                             data + progress, pnum);
1925             if (ret < 0) {
1926                 error_setg_errno(errp, -ret, "reading from file failed");
1927                 break;
1928             }
1929             ret = nbd_co_send_structured_read(client, handle, offset + progress,
1930                                               data + progress, pnum, final,
1931                                               errp);
1932         }
1933 
1934         if (ret < 0) {
1935             break;
1936         }
1937         progress += pnum;
1938     }
1939     return ret;
1940 }
1941 
1942 typedef struct NBDExtentArray {
1943     NBDExtent *extents;
1944     unsigned int nb_alloc;
1945     unsigned int count;
1946     uint64_t total_length;
1947     bool can_add;
1948     bool converted_to_be;
1949 } NBDExtentArray;
1950 
1951 static NBDExtentArray *nbd_extent_array_new(unsigned int nb_alloc)
1952 {
1953     NBDExtentArray *ea = g_new0(NBDExtentArray, 1);
1954 
1955     ea->nb_alloc = nb_alloc;
1956     ea->extents = g_new(NBDExtent, nb_alloc);
1957     ea->can_add = true;
1958 
1959     return ea;
1960 }
1961 
1962 static void nbd_extent_array_free(NBDExtentArray *ea)
1963 {
1964     g_free(ea->extents);
1965     g_free(ea);
1966 }
1967 G_DEFINE_AUTOPTR_CLEANUP_FUNC(NBDExtentArray, nbd_extent_array_free);
1968 
1969 /* Further modifications of the array after conversion are abandoned */
1970 static void nbd_extent_array_convert_to_be(NBDExtentArray *ea)
1971 {
1972     int i;
1973 
1974     assert(!ea->converted_to_be);
1975     ea->can_add = false;
1976     ea->converted_to_be = true;
1977 
1978     for (i = 0; i < ea->count; i++) {
1979         ea->extents[i].flags = cpu_to_be32(ea->extents[i].flags);
1980         ea->extents[i].length = cpu_to_be32(ea->extents[i].length);
1981     }
1982 }
1983 
1984 /*
1985  * Add extent to NBDExtentArray. If extent can't be added (no available space),
1986  * return -1.
1987  * For safety, when returning -1 for the first time, .can_add is set to false,
1988  * further call to nbd_extent_array_add() will crash.
1989  * (to avoid the situation, when after failing to add an extent (returned -1),
1990  * user miss this failure and add another extent, which is successfully added
1991  * (array is full, but new extent may be squashed into the last one), then we
1992  * have invalid array with skipped extent)
1993  */
1994 static int nbd_extent_array_add(NBDExtentArray *ea,
1995                                 uint32_t length, uint32_t flags)
1996 {
1997     assert(ea->can_add);
1998 
1999     if (!length) {
2000         return 0;
2001     }
2002 
2003     /* Extend previous extent if flags are the same */
2004     if (ea->count > 0 && flags == ea->extents[ea->count - 1].flags) {
2005         uint64_t sum = (uint64_t)length + ea->extents[ea->count - 1].length;
2006 
2007         if (sum <= UINT32_MAX) {
2008             ea->extents[ea->count - 1].length = sum;
2009             ea->total_length += length;
2010             return 0;
2011         }
2012     }
2013 
2014     if (ea->count >= ea->nb_alloc) {
2015         ea->can_add = false;
2016         return -1;
2017     }
2018 
2019     ea->total_length += length;
2020     ea->extents[ea->count] = (NBDExtent) {.length = length, .flags = flags};
2021     ea->count++;
2022 
2023     return 0;
2024 }
2025 
2026 static int blockstatus_to_extents(BlockDriverState *bs, uint64_t offset,
2027                                   uint64_t bytes, NBDExtentArray *ea)
2028 {
2029     while (bytes) {
2030         uint32_t flags;
2031         int64_t num;
2032         int ret = bdrv_block_status_above(bs, NULL, offset, bytes, &num,
2033                                           NULL, NULL);
2034 
2035         if (ret < 0) {
2036             return ret;
2037         }
2038 
2039         flags = (ret & BDRV_BLOCK_ALLOCATED ? 0 : NBD_STATE_HOLE) |
2040                 (ret & BDRV_BLOCK_ZERO      ? NBD_STATE_ZERO : 0);
2041 
2042         if (nbd_extent_array_add(ea, num, flags) < 0) {
2043             return 0;
2044         }
2045 
2046         offset += num;
2047         bytes -= num;
2048     }
2049 
2050     return 0;
2051 }
2052 
2053 /*
2054  * nbd_co_send_extents
2055  *
2056  * @ea is converted to BE by the function
2057  * @last controls whether NBD_REPLY_FLAG_DONE is sent.
2058  */
2059 static int nbd_co_send_extents(NBDClient *client, uint64_t handle,
2060                                NBDExtentArray *ea,
2061                                bool last, uint32_t context_id, Error **errp)
2062 {
2063     NBDStructuredMeta chunk;
2064     struct iovec iov[] = {
2065         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
2066         {.iov_base = ea->extents, .iov_len = ea->count * sizeof(ea->extents[0])}
2067     };
2068 
2069     nbd_extent_array_convert_to_be(ea);
2070 
2071     trace_nbd_co_send_extents(handle, ea->count, context_id, ea->total_length,
2072                               last);
2073     set_be_chunk(&chunk.h, last ? NBD_REPLY_FLAG_DONE : 0,
2074                  NBD_REPLY_TYPE_BLOCK_STATUS,
2075                  handle, sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len);
2076     stl_be_p(&chunk.context_id, context_id);
2077 
2078     return nbd_co_send_iov(client, iov, 2, errp);
2079 }
2080 
2081 /* Get block status from the exported device and send it to the client */
2082 static int nbd_co_send_block_status(NBDClient *client, uint64_t handle,
2083                                     BlockDriverState *bs, uint64_t offset,
2084                                     uint32_t length, bool dont_fragment,
2085                                     bool last, uint32_t context_id,
2086                                     Error **errp)
2087 {
2088     int ret;
2089     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2090     g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
2091 
2092     ret = blockstatus_to_extents(bs, offset, length, ea);
2093     if (ret < 0) {
2094         return nbd_co_send_structured_error(
2095                 client, handle, -ret, "can't get block status", errp);
2096     }
2097 
2098     return nbd_co_send_extents(client, handle, ea, last, context_id, errp);
2099 }
2100 
2101 /* Populate @ea from a dirty bitmap. */
2102 static void bitmap_to_extents(BdrvDirtyBitmap *bitmap,
2103                               uint64_t offset, uint64_t length,
2104                               NBDExtentArray *es)
2105 {
2106     int64_t start, dirty_start, dirty_count;
2107     int64_t end = offset + length;
2108     bool full = false;
2109 
2110     bdrv_dirty_bitmap_lock(bitmap);
2111 
2112     for (start = offset;
2113          bdrv_dirty_bitmap_next_dirty_area(bitmap, start, end, INT32_MAX,
2114                                            &dirty_start, &dirty_count);
2115          start = dirty_start + dirty_count)
2116     {
2117         if ((nbd_extent_array_add(es, dirty_start - start, 0) < 0) ||
2118             (nbd_extent_array_add(es, dirty_count, NBD_STATE_DIRTY) < 0))
2119         {
2120             full = true;
2121             break;
2122         }
2123     }
2124 
2125     if (!full) {
2126         /* last non dirty extent */
2127         nbd_extent_array_add(es, end - start, 0);
2128     }
2129 
2130     bdrv_dirty_bitmap_unlock(bitmap);
2131 }
2132 
2133 static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
2134                               BdrvDirtyBitmap *bitmap, uint64_t offset,
2135                               uint32_t length, bool dont_fragment, bool last,
2136                               uint32_t context_id, Error **errp)
2137 {
2138     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2139     g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
2140 
2141     bitmap_to_extents(bitmap, offset, length, ea);
2142 
2143     return nbd_co_send_extents(client, handle, ea, last, context_id, errp);
2144 }
2145 
2146 /* nbd_co_receive_request
2147  * Collect a client request. Return 0 if request looks valid, -EIO to drop
2148  * connection right away, and any other negative value to report an error to
2149  * the client (although the caller may still need to disconnect after reporting
2150  * the error).
2151  */
2152 static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
2153                                   Error **errp)
2154 {
2155     NBDClient *client = req->client;
2156     int valid_flags;
2157 
2158     g_assert(qemu_in_coroutine());
2159     assert(client->recv_coroutine == qemu_coroutine_self());
2160     if (nbd_receive_request(client->ioc, request, errp) < 0) {
2161         return -EIO;
2162     }
2163 
2164     trace_nbd_co_receive_request_decode_type(request->handle, request->type,
2165                                              nbd_cmd_lookup(request->type));
2166 
2167     if (request->type != NBD_CMD_WRITE) {
2168         /* No payload, we are ready to read the next request.  */
2169         req->complete = true;
2170     }
2171 
2172     if (request->type == NBD_CMD_DISC) {
2173         /* Special case: we're going to disconnect without a reply,
2174          * whether or not flags, from, or len are bogus */
2175         return -EIO;
2176     }
2177 
2178     if (request->type == NBD_CMD_READ || request->type == NBD_CMD_WRITE ||
2179         request->type == NBD_CMD_CACHE)
2180     {
2181         if (request->len > NBD_MAX_BUFFER_SIZE) {
2182             error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
2183                        request->len, NBD_MAX_BUFFER_SIZE);
2184             return -EINVAL;
2185         }
2186 
2187         if (request->type != NBD_CMD_CACHE) {
2188             req->data = blk_try_blockalign(client->exp->blk, request->len);
2189             if (req->data == NULL) {
2190                 error_setg(errp, "No memory");
2191                 return -ENOMEM;
2192             }
2193         }
2194     }
2195 
2196     if (request->type == NBD_CMD_WRITE) {
2197         if (nbd_read(client->ioc, req->data, request->len, "CMD_WRITE data",
2198                      errp) < 0)
2199         {
2200             return -EIO;
2201         }
2202         req->complete = true;
2203 
2204         trace_nbd_co_receive_request_payload_received(request->handle,
2205                                                       request->len);
2206     }
2207 
2208     /* Sanity checks. */
2209     if (client->exp->nbdflags & NBD_FLAG_READ_ONLY &&
2210         (request->type == NBD_CMD_WRITE ||
2211          request->type == NBD_CMD_WRITE_ZEROES ||
2212          request->type == NBD_CMD_TRIM)) {
2213         error_setg(errp, "Export is read-only");
2214         return -EROFS;
2215     }
2216     if (request->from > client->exp->size ||
2217         request->len > client->exp->size - request->from) {
2218         error_setg(errp, "operation past EOF; From: %" PRIu64 ", Len: %" PRIu32
2219                    ", Size: %" PRIu64, request->from, request->len,
2220                    client->exp->size);
2221         return (request->type == NBD_CMD_WRITE ||
2222                 request->type == NBD_CMD_WRITE_ZEROES) ? -ENOSPC : -EINVAL;
2223     }
2224     if (client->check_align && !QEMU_IS_ALIGNED(request->from | request->len,
2225                                                 client->check_align)) {
2226         /*
2227          * The block layer gracefully handles unaligned requests, but
2228          * it's still worth tracing client non-compliance
2229          */
2230         trace_nbd_co_receive_align_compliance(nbd_cmd_lookup(request->type),
2231                                               request->from,
2232                                               request->len,
2233                                               client->check_align);
2234     }
2235     valid_flags = NBD_CMD_FLAG_FUA;
2236     if (request->type == NBD_CMD_READ && client->structured_reply) {
2237         valid_flags |= NBD_CMD_FLAG_DF;
2238     } else if (request->type == NBD_CMD_WRITE_ZEROES) {
2239         valid_flags |= NBD_CMD_FLAG_NO_HOLE | NBD_CMD_FLAG_FAST_ZERO;
2240     } else if (request->type == NBD_CMD_BLOCK_STATUS) {
2241         valid_flags |= NBD_CMD_FLAG_REQ_ONE;
2242     }
2243     if (request->flags & ~valid_flags) {
2244         error_setg(errp, "unsupported flags for command %s (got 0x%x)",
2245                    nbd_cmd_lookup(request->type), request->flags);
2246         return -EINVAL;
2247     }
2248 
2249     return 0;
2250 }
2251 
2252 /* Send simple reply without a payload, or a structured error
2253  * @error_msg is ignored if @ret >= 0
2254  * Returns 0 if connection is still live, -errno on failure to talk to client
2255  */
2256 static coroutine_fn int nbd_send_generic_reply(NBDClient *client,
2257                                                uint64_t handle,
2258                                                int ret,
2259                                                const char *error_msg,
2260                                                Error **errp)
2261 {
2262     if (client->structured_reply && ret < 0) {
2263         return nbd_co_send_structured_error(client, handle, -ret, error_msg,
2264                                             errp);
2265     } else {
2266         return nbd_co_send_simple_reply(client, handle, ret < 0 ? -ret : 0,
2267                                         NULL, 0, errp);
2268     }
2269 }
2270 
2271 /* Handle NBD_CMD_READ request.
2272  * Return -errno if sending fails. Other errors are reported directly to the
2273  * client as an error reply. */
2274 static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
2275                                         uint8_t *data, Error **errp)
2276 {
2277     int ret;
2278     NBDExport *exp = client->exp;
2279 
2280     assert(request->type == NBD_CMD_READ);
2281 
2282     /* XXX: NBD Protocol only documents use of FUA with WRITE */
2283     if (request->flags & NBD_CMD_FLAG_FUA) {
2284         ret = blk_co_flush(exp->blk);
2285         if (ret < 0) {
2286             return nbd_send_generic_reply(client, request->handle, ret,
2287                                           "flush failed", errp);
2288         }
2289     }
2290 
2291     if (client->structured_reply && !(request->flags & NBD_CMD_FLAG_DF) &&
2292         request->len)
2293     {
2294         return nbd_co_send_sparse_read(client, request->handle, request->from,
2295                                        data, request->len, errp);
2296     }
2297 
2298     ret = blk_pread(exp->blk, request->from + exp->dev_offset, data,
2299                     request->len);
2300     if (ret < 0) {
2301         return nbd_send_generic_reply(client, request->handle, ret,
2302                                       "reading from file failed", errp);
2303     }
2304 
2305     if (client->structured_reply) {
2306         if (request->len) {
2307             return nbd_co_send_structured_read(client, request->handle,
2308                                                request->from, data,
2309                                                request->len, true, errp);
2310         } else {
2311             return nbd_co_send_structured_done(client, request->handle, errp);
2312         }
2313     } else {
2314         return nbd_co_send_simple_reply(client, request->handle, 0,
2315                                         data, request->len, errp);
2316     }
2317 }
2318 
2319 /*
2320  * nbd_do_cmd_cache
2321  *
2322  * Handle NBD_CMD_CACHE request.
2323  * Return -errno if sending fails. Other errors are reported directly to the
2324  * client as an error reply.
2325  */
2326 static coroutine_fn int nbd_do_cmd_cache(NBDClient *client, NBDRequest *request,
2327                                          Error **errp)
2328 {
2329     int ret;
2330     NBDExport *exp = client->exp;
2331 
2332     assert(request->type == NBD_CMD_CACHE);
2333 
2334     ret = blk_co_preadv(exp->blk, request->from + exp->dev_offset, request->len,
2335                         NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
2336 
2337     return nbd_send_generic_reply(client, request->handle, ret,
2338                                   "caching data failed", errp);
2339 }
2340 
2341 /* Handle NBD request.
2342  * Return -errno if sending fails. Other errors are reported directly to the
2343  * client as an error reply. */
2344 static coroutine_fn int nbd_handle_request(NBDClient *client,
2345                                            NBDRequest *request,
2346                                            uint8_t *data, Error **errp)
2347 {
2348     int ret;
2349     int flags;
2350     NBDExport *exp = client->exp;
2351     char *msg;
2352 
2353     switch (request->type) {
2354     case NBD_CMD_CACHE:
2355         return nbd_do_cmd_cache(client, request, errp);
2356 
2357     case NBD_CMD_READ:
2358         return nbd_do_cmd_read(client, request, data, errp);
2359 
2360     case NBD_CMD_WRITE:
2361         flags = 0;
2362         if (request->flags & NBD_CMD_FLAG_FUA) {
2363             flags |= BDRV_REQ_FUA;
2364         }
2365         ret = blk_pwrite(exp->blk, request->from + exp->dev_offset,
2366                          data, request->len, flags);
2367         return nbd_send_generic_reply(client, request->handle, ret,
2368                                       "writing to file failed", errp);
2369 
2370     case NBD_CMD_WRITE_ZEROES:
2371         flags = 0;
2372         if (request->flags & NBD_CMD_FLAG_FUA) {
2373             flags |= BDRV_REQ_FUA;
2374         }
2375         if (!(request->flags & NBD_CMD_FLAG_NO_HOLE)) {
2376             flags |= BDRV_REQ_MAY_UNMAP;
2377         }
2378         if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
2379             flags |= BDRV_REQ_NO_FALLBACK;
2380         }
2381         ret = 0;
2382         /* FIXME simplify this when blk_pwrite_zeroes switches to 64-bit */
2383         while (ret >= 0 && request->len) {
2384             int align = client->check_align ?: 1;
2385             int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES,
2386                                                         align));
2387             ret = blk_pwrite_zeroes(exp->blk, request->from + exp->dev_offset,
2388                                     len, flags);
2389             request->len -= len;
2390             request->from += len;
2391         }
2392         return nbd_send_generic_reply(client, request->handle, ret,
2393                                       "writing to file failed", errp);
2394 
2395     case NBD_CMD_DISC:
2396         /* unreachable, thanks to special case in nbd_co_receive_request() */
2397         abort();
2398 
2399     case NBD_CMD_FLUSH:
2400         ret = blk_co_flush(exp->blk);
2401         return nbd_send_generic_reply(client, request->handle, ret,
2402                                       "flush failed", errp);
2403 
2404     case NBD_CMD_TRIM:
2405         ret = 0;
2406         /* FIXME simplify this when blk_co_pdiscard switches to 64-bit */
2407         while (ret >= 0 && request->len) {
2408             int align = client->check_align ?: 1;
2409             int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES,
2410                                                         align));
2411             ret = blk_co_pdiscard(exp->blk, request->from + exp->dev_offset,
2412                                   len);
2413             request->len -= len;
2414             request->from += len;
2415         }
2416         if (ret >= 0 && request->flags & NBD_CMD_FLAG_FUA) {
2417             ret = blk_co_flush(exp->blk);
2418         }
2419         return nbd_send_generic_reply(client, request->handle, ret,
2420                                       "discard failed", errp);
2421 
2422     case NBD_CMD_BLOCK_STATUS:
2423         if (!request->len) {
2424             return nbd_send_generic_reply(client, request->handle, -EINVAL,
2425                                           "need non-zero length", errp);
2426         }
2427         if (client->export_meta.valid &&
2428             (client->export_meta.base_allocation ||
2429              client->export_meta.bitmap))
2430         {
2431             bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE;
2432 
2433             if (client->export_meta.base_allocation) {
2434                 ret = nbd_co_send_block_status(client, request->handle,
2435                                                blk_bs(exp->blk), request->from,
2436                                                request->len, dont_fragment,
2437                                                !client->export_meta.bitmap,
2438                                                NBD_META_ID_BASE_ALLOCATION,
2439                                                errp);
2440                 if (ret < 0) {
2441                     return ret;
2442                 }
2443             }
2444 
2445             if (client->export_meta.bitmap) {
2446                 ret = nbd_co_send_bitmap(client, request->handle,
2447                                          client->exp->export_bitmap,
2448                                          request->from, request->len,
2449                                          dont_fragment,
2450                                          true, NBD_META_ID_DIRTY_BITMAP, errp);
2451                 if (ret < 0) {
2452                     return ret;
2453                 }
2454             }
2455 
2456             return 0;
2457         } else {
2458             return nbd_send_generic_reply(client, request->handle, -EINVAL,
2459                                           "CMD_BLOCK_STATUS not negotiated",
2460                                           errp);
2461         }
2462 
2463     default:
2464         msg = g_strdup_printf("invalid request type (%" PRIu32 ") received",
2465                               request->type);
2466         ret = nbd_send_generic_reply(client, request->handle, -EINVAL, msg,
2467                                      errp);
2468         g_free(msg);
2469         return ret;
2470     }
2471 }
2472 
2473 /* Owns a reference to the NBDClient passed as opaque.  */
2474 static coroutine_fn void nbd_trip(void *opaque)
2475 {
2476     NBDClient *client = opaque;
2477     NBDRequestData *req;
2478     NBDRequest request = { 0 };    /* GCC thinks it can be used uninitialized */
2479     int ret;
2480     Error *local_err = NULL;
2481 
2482     trace_nbd_trip();
2483     if (client->closing) {
2484         nbd_client_put(client);
2485         return;
2486     }
2487 
2488     req = nbd_request_get(client);
2489     ret = nbd_co_receive_request(req, &request, &local_err);
2490     client->recv_coroutine = NULL;
2491 
2492     if (client->closing) {
2493         /*
2494          * The client may be closed when we are blocked in
2495          * nbd_co_receive_request()
2496          */
2497         goto done;
2498     }
2499 
2500     nbd_client_receive_next_request(client);
2501     if (ret == -EIO) {
2502         goto disconnect;
2503     }
2504 
2505     if (ret < 0) {
2506         /* It wans't -EIO, so, according to nbd_co_receive_request()
2507          * semantics, we should return the error to the client. */
2508         Error *export_err = local_err;
2509 
2510         local_err = NULL;
2511         ret = nbd_send_generic_reply(client, request.handle, -EINVAL,
2512                                      error_get_pretty(export_err), &local_err);
2513         error_free(export_err);
2514     } else {
2515         ret = nbd_handle_request(client, &request, req->data, &local_err);
2516     }
2517     if (ret < 0) {
2518         error_prepend(&local_err, "Failed to send reply: ");
2519         goto disconnect;
2520     }
2521 
2522     /* We must disconnect after NBD_CMD_WRITE if we did not
2523      * read the payload.
2524      */
2525     if (!req->complete) {
2526         error_setg(&local_err, "Request handling failed in intermediate state");
2527         goto disconnect;
2528     }
2529 
2530 done:
2531     nbd_request_put(req);
2532     nbd_client_put(client);
2533     return;
2534 
2535 disconnect:
2536     if (local_err) {
2537         error_reportf_err(local_err, "Disconnect client, due to: ");
2538     }
2539     nbd_request_put(req);
2540     client_close(client, true);
2541     nbd_client_put(client);
2542 }
2543 
2544 static void nbd_client_receive_next_request(NBDClient *client)
2545 {
2546     if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
2547         nbd_client_get(client);
2548         client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
2549         aio_co_schedule(client->exp->ctx, client->recv_coroutine);
2550     }
2551 }
2552 
2553 static coroutine_fn void nbd_co_client_start(void *opaque)
2554 {
2555     NBDClient *client = opaque;
2556     Error *local_err = NULL;
2557 
2558     qemu_co_mutex_init(&client->send_lock);
2559 
2560     if (nbd_negotiate(client, &local_err)) {
2561         if (local_err) {
2562             error_report_err(local_err);
2563         }
2564         client_close(client, false);
2565         return;
2566     }
2567 
2568     nbd_client_receive_next_request(client);
2569 }
2570 
2571 /*
2572  * Create a new client listener using the given channel @sioc.
2573  * Begin servicing it in a coroutine.  When the connection closes, call
2574  * @close_fn with an indication of whether the client completed negotiation.
2575  */
2576 void nbd_client_new(QIOChannelSocket *sioc,
2577                     QCryptoTLSCreds *tlscreds,
2578                     const char *tlsauthz,
2579                     void (*close_fn)(NBDClient *, bool))
2580 {
2581     NBDClient *client;
2582     Coroutine *co;
2583 
2584     client = g_new0(NBDClient, 1);
2585     client->refcount = 1;
2586     client->tlscreds = tlscreds;
2587     if (tlscreds) {
2588         object_ref(OBJECT(client->tlscreds));
2589     }
2590     client->tlsauthz = g_strdup(tlsauthz);
2591     client->sioc = sioc;
2592     object_ref(OBJECT(client->sioc));
2593     client->ioc = QIO_CHANNEL(sioc);
2594     object_ref(OBJECT(client->ioc));
2595     client->close_fn = close_fn;
2596 
2597     co = qemu_coroutine_create(nbd_co_client_start, client);
2598     qemu_coroutine_enter(co);
2599 }
2600