xref: /openbmc/qemu/nbd/server.c (revision 740b1759)
1 /*
2  *  Copyright (C) 2016-2018 Red Hat, Inc.
3  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
4  *
5  *  Network Block Device Server Side
6  *
7  *  This program is free software; you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License as published by
9  *  the Free Software Foundation; under version 2 of the License.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "qemu/osdep.h"
21 
22 #include "block/export.h"
23 #include "qapi/error.h"
24 #include "qemu/queue.h"
25 #include "trace.h"
26 #include "nbd-internal.h"
27 #include "qemu/units.h"
28 
29 #define NBD_META_ID_BASE_ALLOCATION 0
30 #define NBD_META_ID_DIRTY_BITMAP 1
31 
32 /*
33  * NBD_MAX_BLOCK_STATUS_EXTENTS: 1 MiB of extents data. An empirical
34  * constant. If an increase is needed, note that the NBD protocol
35  * recommends no larger than 32 mb, so that the client won't consider
36  * the reply as a denial of service attack.
37  */
38 #define NBD_MAX_BLOCK_STATUS_EXTENTS (1 * MiB / 8)
39 
40 static int system_errno_to_nbd_errno(int err)
41 {
42     switch (err) {
43     case 0:
44         return NBD_SUCCESS;
45     case EPERM:
46     case EROFS:
47         return NBD_EPERM;
48     case EIO:
49         return NBD_EIO;
50     case ENOMEM:
51         return NBD_ENOMEM;
52 #ifdef EDQUOT
53     case EDQUOT:
54 #endif
55     case EFBIG:
56     case ENOSPC:
57         return NBD_ENOSPC;
58     case EOVERFLOW:
59         return NBD_EOVERFLOW;
60     case ENOTSUP:
61 #if ENOTSUP != EOPNOTSUPP
62     case EOPNOTSUPP:
63 #endif
64         return NBD_ENOTSUP;
65     case ESHUTDOWN:
66         return NBD_ESHUTDOWN;
67     case EINVAL:
68     default:
69         return NBD_EINVAL;
70     }
71 }
72 
73 /* Definitions for opaque data types */
74 
75 typedef struct NBDRequestData NBDRequestData;
76 
77 struct NBDRequestData {
78     QSIMPLEQ_ENTRY(NBDRequestData) entry;
79     NBDClient *client;
80     uint8_t *data;
81     bool complete;
82 };
83 
84 struct NBDExport {
85     BlockExport common;
86 
87     char *name;
88     char *description;
89     uint64_t size;
90     uint16_t nbdflags;
91     QTAILQ_HEAD(, NBDClient) clients;
92     QTAILQ_ENTRY(NBDExport) next;
93 
94     BlockBackend *eject_notifier_blk;
95     Notifier eject_notifier;
96 
97     BdrvDirtyBitmap *export_bitmap;
98     char *export_bitmap_context;
99 };
100 
101 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
102 
103 /* NBDExportMetaContexts represents a list of contexts to be exported,
104  * as selected by NBD_OPT_SET_META_CONTEXT. Also used for
105  * NBD_OPT_LIST_META_CONTEXT. */
106 typedef struct NBDExportMetaContexts {
107     NBDExport *exp;
108     bool valid; /* means that negotiation of the option finished without
109                    errors */
110     bool base_allocation; /* export base:allocation context (block status) */
111     bool bitmap; /* export qemu:dirty-bitmap:<export bitmap name> */
112 } NBDExportMetaContexts;
113 
114 struct NBDClient {
115     int refcount;
116     void (*close_fn)(NBDClient *client, bool negotiated);
117 
118     NBDExport *exp;
119     QCryptoTLSCreds *tlscreds;
120     char *tlsauthz;
121     QIOChannelSocket *sioc; /* The underlying data channel */
122     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
123 
124     Coroutine *recv_coroutine;
125 
126     CoMutex send_lock;
127     Coroutine *send_coroutine;
128 
129     QTAILQ_ENTRY(NBDClient) next;
130     int nb_requests;
131     bool closing;
132 
133     uint32_t check_align; /* If non-zero, check for aligned client requests */
134 
135     bool structured_reply;
136     NBDExportMetaContexts export_meta;
137 
138     uint32_t opt; /* Current option being negotiated */
139     uint32_t optlen; /* remaining length of data in ioc for the option being
140                         negotiated now */
141 };
142 
143 static void nbd_client_receive_next_request(NBDClient *client);
144 
145 /* Basic flow for negotiation
146 
147    Server         Client
148    Negotiate
149 
150    or
151 
152    Server         Client
153    Negotiate #1
154                   Option
155    Negotiate #2
156 
157    ----
158 
159    followed by
160 
161    Server         Client
162                   Request
163    Response
164                   Request
165    Response
166                   ...
167    ...
168                   Request (type == 2)
169 
170 */
171 
172 static inline void set_be_option_rep(NBDOptionReply *rep, uint32_t option,
173                                      uint32_t type, uint32_t length)
174 {
175     stq_be_p(&rep->magic, NBD_REP_MAGIC);
176     stl_be_p(&rep->option, option);
177     stl_be_p(&rep->type, type);
178     stl_be_p(&rep->length, length);
179 }
180 
181 /* Send a reply header, including length, but no payload.
182  * Return -errno on error, 0 on success. */
183 static int nbd_negotiate_send_rep_len(NBDClient *client, uint32_t type,
184                                       uint32_t len, Error **errp)
185 {
186     NBDOptionReply rep;
187 
188     trace_nbd_negotiate_send_rep_len(client->opt, nbd_opt_lookup(client->opt),
189                                      type, nbd_rep_lookup(type), len);
190 
191     assert(len < NBD_MAX_BUFFER_SIZE);
192 
193     set_be_option_rep(&rep, client->opt, type, len);
194     return nbd_write(client->ioc, &rep, sizeof(rep), errp);
195 }
196 
197 /* Send a reply header with default 0 length.
198  * Return -errno on error, 0 on success. */
199 static int nbd_negotiate_send_rep(NBDClient *client, uint32_t type,
200                                   Error **errp)
201 {
202     return nbd_negotiate_send_rep_len(client, type, 0, errp);
203 }
204 
205 /* Send an error reply.
206  * Return -errno on error, 0 on success. */
207 static int GCC_FMT_ATTR(4, 0)
208 nbd_negotiate_send_rep_verr(NBDClient *client, uint32_t type,
209                             Error **errp, const char *fmt, va_list va)
210 {
211     ERRP_GUARD();
212     g_autofree char *msg = NULL;
213     int ret;
214     size_t len;
215 
216     msg = g_strdup_vprintf(fmt, va);
217     len = strlen(msg);
218     assert(len < NBD_MAX_STRING_SIZE);
219     trace_nbd_negotiate_send_rep_err(msg);
220     ret = nbd_negotiate_send_rep_len(client, type, len, errp);
221     if (ret < 0) {
222         return ret;
223     }
224     if (nbd_write(client->ioc, msg, len, errp) < 0) {
225         error_prepend(errp, "write failed (error message): ");
226         return -EIO;
227     }
228 
229     return 0;
230 }
231 
232 /*
233  * Return a malloc'd copy of @name suitable for use in an error reply.
234  */
235 static char *
236 nbd_sanitize_name(const char *name)
237 {
238     if (strnlen(name, 80) < 80) {
239         return g_strdup(name);
240     }
241     /* XXX Should we also try to sanitize any control characters? */
242     return g_strdup_printf("%.80s...", name);
243 }
244 
245 /* Send an error reply.
246  * Return -errno on error, 0 on success. */
247 static int GCC_FMT_ATTR(4, 5)
248 nbd_negotiate_send_rep_err(NBDClient *client, uint32_t type,
249                            Error **errp, const char *fmt, ...)
250 {
251     va_list va;
252     int ret;
253 
254     va_start(va, fmt);
255     ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
256     va_end(va);
257     return ret;
258 }
259 
260 /* Drop remainder of the current option, and send a reply with the
261  * given error type and message. Return -errno on read or write
262  * failure; or 0 if connection is still live. */
263 static int GCC_FMT_ATTR(4, 0)
264 nbd_opt_vdrop(NBDClient *client, uint32_t type, Error **errp,
265               const char *fmt, va_list va)
266 {
267     int ret = nbd_drop(client->ioc, client->optlen, errp);
268 
269     client->optlen = 0;
270     if (!ret) {
271         ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
272     }
273     return ret;
274 }
275 
276 static int GCC_FMT_ATTR(4, 5)
277 nbd_opt_drop(NBDClient *client, uint32_t type, Error **errp,
278              const char *fmt, ...)
279 {
280     int ret;
281     va_list va;
282 
283     va_start(va, fmt);
284     ret = nbd_opt_vdrop(client, type, errp, fmt, va);
285     va_end(va);
286 
287     return ret;
288 }
289 
290 static int GCC_FMT_ATTR(3, 4)
291 nbd_opt_invalid(NBDClient *client, Error **errp, const char *fmt, ...)
292 {
293     int ret;
294     va_list va;
295 
296     va_start(va, fmt);
297     ret = nbd_opt_vdrop(client, NBD_REP_ERR_INVALID, errp, fmt, va);
298     va_end(va);
299 
300     return ret;
301 }
302 
303 /* Read size bytes from the unparsed payload of the current option.
304  * Return -errno on I/O error, 0 if option was completely handled by
305  * sending a reply about inconsistent lengths, or 1 on success. */
306 static int nbd_opt_read(NBDClient *client, void *buffer, size_t size,
307                         Error **errp)
308 {
309     if (size > client->optlen) {
310         return nbd_opt_invalid(client, errp,
311                                "Inconsistent lengths in option %s",
312                                nbd_opt_lookup(client->opt));
313     }
314     client->optlen -= size;
315     return qio_channel_read_all(client->ioc, buffer, size, errp) < 0 ? -EIO : 1;
316 }
317 
318 /* Drop size bytes from the unparsed payload of the current option.
319  * Return -errno on I/O error, 0 if option was completely handled by
320  * sending a reply about inconsistent lengths, or 1 on success. */
321 static int nbd_opt_skip(NBDClient *client, size_t size, Error **errp)
322 {
323     if (size > client->optlen) {
324         return nbd_opt_invalid(client, errp,
325                                "Inconsistent lengths in option %s",
326                                nbd_opt_lookup(client->opt));
327     }
328     client->optlen -= size;
329     return nbd_drop(client->ioc, size, errp) < 0 ? -EIO : 1;
330 }
331 
332 /* nbd_opt_read_name
333  *
334  * Read a string with the format:
335  *   uint32_t len     (<= NBD_MAX_STRING_SIZE)
336  *   len bytes string (not 0-terminated)
337  *
338  * On success, @name will be allocated.
339  * If @length is non-null, it will be set to the actual string length.
340  *
341  * Return -errno on I/O error, 0 if option was completely handled by
342  * sending a reply about inconsistent lengths, or 1 on success.
343  */
344 static int nbd_opt_read_name(NBDClient *client, char **name, uint32_t *length,
345                              Error **errp)
346 {
347     int ret;
348     uint32_t len;
349     g_autofree char *local_name = NULL;
350 
351     *name = NULL;
352     ret = nbd_opt_read(client, &len, sizeof(len), errp);
353     if (ret <= 0) {
354         return ret;
355     }
356     len = cpu_to_be32(len);
357 
358     if (len > NBD_MAX_STRING_SIZE) {
359         return nbd_opt_invalid(client, errp,
360                                "Invalid name length: %" PRIu32, len);
361     }
362 
363     local_name = g_malloc(len + 1);
364     ret = nbd_opt_read(client, local_name, len, errp);
365     if (ret <= 0) {
366         return ret;
367     }
368     local_name[len] = '\0';
369 
370     if (length) {
371         *length = len;
372     }
373     *name = g_steal_pointer(&local_name);
374 
375     return 1;
376 }
377 
378 /* Send a single NBD_REP_SERVER reply to NBD_OPT_LIST, including payload.
379  * Return -errno on error, 0 on success. */
380 static int nbd_negotiate_send_rep_list(NBDClient *client, NBDExport *exp,
381                                        Error **errp)
382 {
383     ERRP_GUARD();
384     size_t name_len, desc_len;
385     uint32_t len;
386     const char *name = exp->name ? exp->name : "";
387     const char *desc = exp->description ? exp->description : "";
388     QIOChannel *ioc = client->ioc;
389     int ret;
390 
391     trace_nbd_negotiate_send_rep_list(name, desc);
392     name_len = strlen(name);
393     desc_len = strlen(desc);
394     assert(name_len <= NBD_MAX_STRING_SIZE && desc_len <= NBD_MAX_STRING_SIZE);
395     len = name_len + desc_len + sizeof(len);
396     ret = nbd_negotiate_send_rep_len(client, NBD_REP_SERVER, len, errp);
397     if (ret < 0) {
398         return ret;
399     }
400 
401     len = cpu_to_be32(name_len);
402     if (nbd_write(ioc, &len, sizeof(len), errp) < 0) {
403         error_prepend(errp, "write failed (name length): ");
404         return -EINVAL;
405     }
406 
407     if (nbd_write(ioc, name, name_len, errp) < 0) {
408         error_prepend(errp, "write failed (name buffer): ");
409         return -EINVAL;
410     }
411 
412     if (nbd_write(ioc, desc, desc_len, errp) < 0) {
413         error_prepend(errp, "write failed (description buffer): ");
414         return -EINVAL;
415     }
416 
417     return 0;
418 }
419 
420 /* Process the NBD_OPT_LIST command, with a potential series of replies.
421  * Return -errno on error, 0 on success. */
422 static int nbd_negotiate_handle_list(NBDClient *client, Error **errp)
423 {
424     NBDExport *exp;
425     assert(client->opt == NBD_OPT_LIST);
426 
427     /* For each export, send a NBD_REP_SERVER reply. */
428     QTAILQ_FOREACH(exp, &exports, next) {
429         if (nbd_negotiate_send_rep_list(client, exp, errp)) {
430             return -EINVAL;
431         }
432     }
433     /* Finish with a NBD_REP_ACK. */
434     return nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
435 }
436 
437 static void nbd_check_meta_export(NBDClient *client)
438 {
439     client->export_meta.valid &= client->exp == client->export_meta.exp;
440 }
441 
442 /* Send a reply to NBD_OPT_EXPORT_NAME.
443  * Return -errno on error, 0 on success. */
444 static int nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes,
445                                             Error **errp)
446 {
447     ERRP_GUARD();
448     g_autofree char *name = NULL;
449     char buf[NBD_REPLY_EXPORT_NAME_SIZE] = "";
450     size_t len;
451     int ret;
452     uint16_t myflags;
453 
454     /* Client sends:
455         [20 ..  xx]   export name (length bytes)
456        Server replies:
457         [ 0 ..   7]   size
458         [ 8 ..   9]   export flags
459         [10 .. 133]   reserved     (0) [unless no_zeroes]
460      */
461     trace_nbd_negotiate_handle_export_name();
462     if (client->optlen > NBD_MAX_STRING_SIZE) {
463         error_setg(errp, "Bad length received");
464         return -EINVAL;
465     }
466     name = g_malloc(client->optlen + 1);
467     if (nbd_read(client->ioc, name, client->optlen, "export name", errp) < 0) {
468         return -EIO;
469     }
470     name[client->optlen] = '\0';
471     client->optlen = 0;
472 
473     trace_nbd_negotiate_handle_export_name_request(name);
474 
475     client->exp = nbd_export_find(name);
476     if (!client->exp) {
477         error_setg(errp, "export not found");
478         return -EINVAL;
479     }
480 
481     myflags = client->exp->nbdflags;
482     if (client->structured_reply) {
483         myflags |= NBD_FLAG_SEND_DF;
484     }
485     trace_nbd_negotiate_new_style_size_flags(client->exp->size, myflags);
486     stq_be_p(buf, client->exp->size);
487     stw_be_p(buf + 8, myflags);
488     len = no_zeroes ? 10 : sizeof(buf);
489     ret = nbd_write(client->ioc, buf, len, errp);
490     if (ret < 0) {
491         error_prepend(errp, "write failed: ");
492         return ret;
493     }
494 
495     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
496     blk_exp_ref(&client->exp->common);
497     nbd_check_meta_export(client);
498 
499     return 0;
500 }
501 
502 /* Send a single NBD_REP_INFO, with a buffer @buf of @length bytes.
503  * The buffer does NOT include the info type prefix.
504  * Return -errno on error, 0 if ready to send more. */
505 static int nbd_negotiate_send_info(NBDClient *client,
506                                    uint16_t info, uint32_t length, void *buf,
507                                    Error **errp)
508 {
509     int rc;
510 
511     trace_nbd_negotiate_send_info(info, nbd_info_lookup(info), length);
512     rc = nbd_negotiate_send_rep_len(client, NBD_REP_INFO,
513                                     sizeof(info) + length, errp);
514     if (rc < 0) {
515         return rc;
516     }
517     info = cpu_to_be16(info);
518     if (nbd_write(client->ioc, &info, sizeof(info), errp) < 0) {
519         return -EIO;
520     }
521     if (nbd_write(client->ioc, buf, length, errp) < 0) {
522         return -EIO;
523     }
524     return 0;
525 }
526 
527 /* nbd_reject_length: Handle any unexpected payload.
528  * @fatal requests that we quit talking to the client, even if we are able
529  * to successfully send an error reply.
530  * Return:
531  * -errno  transmission error occurred or @fatal was requested, errp is set
532  * 0       error message successfully sent to client, errp is not set
533  */
534 static int nbd_reject_length(NBDClient *client, bool fatal, Error **errp)
535 {
536     int ret;
537 
538     assert(client->optlen);
539     ret = nbd_opt_invalid(client, errp, "option '%s' has unexpected length",
540                           nbd_opt_lookup(client->opt));
541     if (fatal && !ret) {
542         error_setg(errp, "option '%s' has unexpected length",
543                    nbd_opt_lookup(client->opt));
544         return -EINVAL;
545     }
546     return ret;
547 }
548 
549 /* Handle NBD_OPT_INFO and NBD_OPT_GO.
550  * Return -errno on error, 0 if ready for next option, and 1 to move
551  * into transmission phase.  */
552 static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
553 {
554     int rc;
555     g_autofree char *name = NULL;
556     NBDExport *exp;
557     uint16_t requests;
558     uint16_t request;
559     uint32_t namelen;
560     bool sendname = false;
561     bool blocksize = false;
562     uint32_t sizes[3];
563     char buf[sizeof(uint64_t) + sizeof(uint16_t)];
564     uint32_t check_align = 0;
565     uint16_t myflags;
566 
567     /* Client sends:
568         4 bytes: L, name length (can be 0)
569         L bytes: export name
570         2 bytes: N, number of requests (can be 0)
571         N * 2 bytes: N requests
572     */
573     rc = nbd_opt_read_name(client, &name, &namelen, errp);
574     if (rc <= 0) {
575         return rc;
576     }
577     trace_nbd_negotiate_handle_export_name_request(name);
578 
579     rc = nbd_opt_read(client, &requests, sizeof(requests), errp);
580     if (rc <= 0) {
581         return rc;
582     }
583     requests = be16_to_cpu(requests);
584     trace_nbd_negotiate_handle_info_requests(requests);
585     while (requests--) {
586         rc = nbd_opt_read(client, &request, sizeof(request), errp);
587         if (rc <= 0) {
588             return rc;
589         }
590         request = be16_to_cpu(request);
591         trace_nbd_negotiate_handle_info_request(request,
592                                                 nbd_info_lookup(request));
593         /* We care about NBD_INFO_NAME and NBD_INFO_BLOCK_SIZE;
594          * everything else is either a request we don't know or
595          * something we send regardless of request */
596         switch (request) {
597         case NBD_INFO_NAME:
598             sendname = true;
599             break;
600         case NBD_INFO_BLOCK_SIZE:
601             blocksize = true;
602             break;
603         }
604     }
605     if (client->optlen) {
606         return nbd_reject_length(client, false, errp);
607     }
608 
609     exp = nbd_export_find(name);
610     if (!exp) {
611         g_autofree char *sane_name = nbd_sanitize_name(name);
612 
613         return nbd_negotiate_send_rep_err(client, NBD_REP_ERR_UNKNOWN,
614                                           errp, "export '%s' not present",
615                                           sane_name);
616     }
617 
618     /* Don't bother sending NBD_INFO_NAME unless client requested it */
619     if (sendname) {
620         rc = nbd_negotiate_send_info(client, NBD_INFO_NAME, namelen, name,
621                                      errp);
622         if (rc < 0) {
623             return rc;
624         }
625     }
626 
627     /* Send NBD_INFO_DESCRIPTION only if available, regardless of
628      * client request */
629     if (exp->description) {
630         size_t len = strlen(exp->description);
631 
632         assert(len <= NBD_MAX_STRING_SIZE);
633         rc = nbd_negotiate_send_info(client, NBD_INFO_DESCRIPTION,
634                                      len, exp->description, errp);
635         if (rc < 0) {
636             return rc;
637         }
638     }
639 
640     /* Send NBD_INFO_BLOCK_SIZE always, but tweak the minimum size
641      * according to whether the client requested it, and according to
642      * whether this is OPT_INFO or OPT_GO. */
643     /* minimum - 1 for back-compat, or actual if client will obey it. */
644     if (client->opt == NBD_OPT_INFO || blocksize) {
645         check_align = sizes[0] = blk_get_request_alignment(exp->common.blk);
646     } else {
647         sizes[0] = 1;
648     }
649     assert(sizes[0] <= NBD_MAX_BUFFER_SIZE);
650     /* preferred - Hard-code to 4096 for now.
651      * TODO: is blk_bs(blk)->bl.opt_transfer appropriate? */
652     sizes[1] = MAX(4096, sizes[0]);
653     /* maximum - At most 32M, but smaller as appropriate. */
654     sizes[2] = MIN(blk_get_max_transfer(exp->common.blk), NBD_MAX_BUFFER_SIZE);
655     trace_nbd_negotiate_handle_info_block_size(sizes[0], sizes[1], sizes[2]);
656     sizes[0] = cpu_to_be32(sizes[0]);
657     sizes[1] = cpu_to_be32(sizes[1]);
658     sizes[2] = cpu_to_be32(sizes[2]);
659     rc = nbd_negotiate_send_info(client, NBD_INFO_BLOCK_SIZE,
660                                  sizeof(sizes), sizes, errp);
661     if (rc < 0) {
662         return rc;
663     }
664 
665     /* Send NBD_INFO_EXPORT always */
666     myflags = exp->nbdflags;
667     if (client->structured_reply) {
668         myflags |= NBD_FLAG_SEND_DF;
669     }
670     trace_nbd_negotiate_new_style_size_flags(exp->size, myflags);
671     stq_be_p(buf, exp->size);
672     stw_be_p(buf + 8, myflags);
673     rc = nbd_negotiate_send_info(client, NBD_INFO_EXPORT,
674                                  sizeof(buf), buf, errp);
675     if (rc < 0) {
676         return rc;
677     }
678 
679     /*
680      * If the client is just asking for NBD_OPT_INFO, but forgot to
681      * request block sizes in a situation that would impact
682      * performance, then return an error. But for NBD_OPT_GO, we
683      * tolerate all clients, regardless of alignments.
684      */
685     if (client->opt == NBD_OPT_INFO && !blocksize &&
686         blk_get_request_alignment(exp->common.blk) > 1) {
687         return nbd_negotiate_send_rep_err(client,
688                                           NBD_REP_ERR_BLOCK_SIZE_REQD,
689                                           errp,
690                                           "request NBD_INFO_BLOCK_SIZE to "
691                                           "use this export");
692     }
693 
694     /* Final reply */
695     rc = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
696     if (rc < 0) {
697         return rc;
698     }
699 
700     if (client->opt == NBD_OPT_GO) {
701         client->exp = exp;
702         client->check_align = check_align;
703         QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
704         blk_exp_ref(&client->exp->common);
705         nbd_check_meta_export(client);
706         rc = 1;
707     }
708     return rc;
709 }
710 
711 
712 /* Handle NBD_OPT_STARTTLS. Return NULL to drop connection, or else the
713  * new channel for all further (now-encrypted) communication. */
714 static QIOChannel *nbd_negotiate_handle_starttls(NBDClient *client,
715                                                  Error **errp)
716 {
717     QIOChannel *ioc;
718     QIOChannelTLS *tioc;
719     struct NBDTLSHandshakeData data = { 0 };
720 
721     assert(client->opt == NBD_OPT_STARTTLS);
722 
723     trace_nbd_negotiate_handle_starttls();
724     ioc = client->ioc;
725 
726     if (nbd_negotiate_send_rep(client, NBD_REP_ACK, errp) < 0) {
727         return NULL;
728     }
729 
730     tioc = qio_channel_tls_new_server(ioc,
731                                       client->tlscreds,
732                                       client->tlsauthz,
733                                       errp);
734     if (!tioc) {
735         return NULL;
736     }
737 
738     qio_channel_set_name(QIO_CHANNEL(tioc), "nbd-server-tls");
739     trace_nbd_negotiate_handle_starttls_handshake();
740     data.loop = g_main_loop_new(g_main_context_default(), FALSE);
741     qio_channel_tls_handshake(tioc,
742                               nbd_tls_handshake,
743                               &data,
744                               NULL,
745                               NULL);
746 
747     if (!data.complete) {
748         g_main_loop_run(data.loop);
749     }
750     g_main_loop_unref(data.loop);
751     if (data.error) {
752         object_unref(OBJECT(tioc));
753         error_propagate(errp, data.error);
754         return NULL;
755     }
756 
757     return QIO_CHANNEL(tioc);
758 }
759 
760 /* nbd_negotiate_send_meta_context
761  *
762  * Send one chunk of reply to NBD_OPT_{LIST,SET}_META_CONTEXT
763  *
764  * For NBD_OPT_LIST_META_CONTEXT @context_id is ignored, 0 is used instead.
765  */
766 static int nbd_negotiate_send_meta_context(NBDClient *client,
767                                            const char *context,
768                                            uint32_t context_id,
769                                            Error **errp)
770 {
771     NBDOptionReplyMetaContext opt;
772     struct iovec iov[] = {
773         {.iov_base = &opt, .iov_len = sizeof(opt)},
774         {.iov_base = (void *)context, .iov_len = strlen(context)}
775     };
776 
777     assert(iov[1].iov_len <= NBD_MAX_STRING_SIZE);
778     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
779         context_id = 0;
780     }
781 
782     trace_nbd_negotiate_meta_query_reply(context, context_id);
783     set_be_option_rep(&opt.h, client->opt, NBD_REP_META_CONTEXT,
784                       sizeof(opt) - sizeof(opt.h) + iov[1].iov_len);
785     stl_be_p(&opt.context_id, context_id);
786 
787     return qio_channel_writev_all(client->ioc, iov, 2, errp) < 0 ? -EIO : 0;
788 }
789 
790 /* Read strlen(@pattern) bytes, and set @match to true if they match @pattern.
791  * @match is never set to false.
792  *
793  * Return -errno on I/O error, 0 if option was completely handled by
794  * sending a reply about inconsistent lengths, or 1 on success.
795  *
796  * Note: return code = 1 doesn't mean that we've read exactly @pattern.
797  * It only means that there are no errors.
798  */
799 static int nbd_meta_pattern(NBDClient *client, const char *pattern, bool *match,
800                             Error **errp)
801 {
802     int ret;
803     char *query;
804     size_t len = strlen(pattern);
805 
806     assert(len);
807 
808     query = g_malloc(len);
809     ret = nbd_opt_read(client, query, len, errp);
810     if (ret <= 0) {
811         g_free(query);
812         return ret;
813     }
814 
815     if (strncmp(query, pattern, len) == 0) {
816         trace_nbd_negotiate_meta_query_parse(pattern);
817         *match = true;
818     } else {
819         trace_nbd_negotiate_meta_query_skip("pattern not matched");
820     }
821     g_free(query);
822 
823     return 1;
824 }
825 
826 /*
827  * Read @len bytes, and set @match to true if they match @pattern, or if @len
828  * is 0 and the client is performing _LIST_. @match is never set to false.
829  *
830  * Return -errno on I/O error, 0 if option was completely handled by
831  * sending a reply about inconsistent lengths, or 1 on success.
832  *
833  * Note: return code = 1 doesn't mean that we've read exactly @pattern.
834  * It only means that there are no errors.
835  */
836 static int nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
837                                      uint32_t len, bool *match, Error **errp)
838 {
839     if (len == 0) {
840         if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
841             *match = true;
842         }
843         trace_nbd_negotiate_meta_query_parse("empty");
844         return 1;
845     }
846 
847     if (len != strlen(pattern)) {
848         trace_nbd_negotiate_meta_query_skip("different lengths");
849         return nbd_opt_skip(client, len, errp);
850     }
851 
852     return nbd_meta_pattern(client, pattern, match, errp);
853 }
854 
855 /* nbd_meta_base_query
856  *
857  * Handle queries to 'base' namespace. For now, only the base:allocation
858  * context is available.  'len' is the amount of text remaining to be read from
859  * the current name, after the 'base:' portion has been stripped.
860  *
861  * Return -errno on I/O error, 0 if option was completely handled by
862  * sending a reply about inconsistent lengths, or 1 on success.
863  */
864 static int nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta,
865                                uint32_t len, Error **errp)
866 {
867     return nbd_meta_empty_or_pattern(client, "allocation", len,
868                                      &meta->base_allocation, errp);
869 }
870 
871 /* nbd_meta_bitmap_query
872  *
873  * Handle query to 'qemu:' namespace.
874  * @len is the amount of text remaining to be read from the current name, after
875  * the 'qemu:' portion has been stripped.
876  *
877  * Return -errno on I/O error, 0 if option was completely handled by
878  * sending a reply about inconsistent lengths, or 1 on success. */
879 static int nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
880                                uint32_t len, Error **errp)
881 {
882     bool dirty_bitmap = false;
883     size_t dirty_bitmap_len = strlen("dirty-bitmap:");
884     int ret;
885 
886     if (!meta->exp->export_bitmap) {
887         trace_nbd_negotiate_meta_query_skip("no dirty-bitmap exported");
888         return nbd_opt_skip(client, len, errp);
889     }
890 
891     if (len == 0) {
892         if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
893             meta->bitmap = true;
894         }
895         trace_nbd_negotiate_meta_query_parse("empty");
896         return 1;
897     }
898 
899     if (len < dirty_bitmap_len) {
900         trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
901         return nbd_opt_skip(client, len, errp);
902     }
903 
904     len -= dirty_bitmap_len;
905     ret = nbd_meta_pattern(client, "dirty-bitmap:", &dirty_bitmap, errp);
906     if (ret <= 0) {
907         return ret;
908     }
909     if (!dirty_bitmap) {
910         trace_nbd_negotiate_meta_query_skip("not dirty-bitmap:");
911         return nbd_opt_skip(client, len, errp);
912     }
913 
914     trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
915 
916     return nbd_meta_empty_or_pattern(
917             client, meta->exp->export_bitmap_context +
918             strlen("qemu:dirty_bitmap:"), len, &meta->bitmap, errp);
919 }
920 
921 /* nbd_negotiate_meta_query
922  *
923  * Parse namespace name and call corresponding function to parse body of the
924  * query.
925  *
926  * The only supported namespaces are 'base' and 'qemu'.
927  *
928  * The function aims not wasting time and memory to read long unknown namespace
929  * names.
930  *
931  * Return -errno on I/O error, 0 if option was completely handled by
932  * sending a reply about inconsistent lengths, or 1 on success. */
933 static int nbd_negotiate_meta_query(NBDClient *client,
934                                     NBDExportMetaContexts *meta, Error **errp)
935 {
936     /*
937      * Both 'qemu' and 'base' namespaces have length = 5 including a
938      * colon. If another length namespace is later introduced, this
939      * should certainly be refactored.
940      */
941     int ret;
942     size_t ns_len = 5;
943     char ns[5];
944     uint32_t len;
945 
946     ret = nbd_opt_read(client, &len, sizeof(len), errp);
947     if (ret <= 0) {
948         return ret;
949     }
950     len = cpu_to_be32(len);
951 
952     if (len > NBD_MAX_STRING_SIZE) {
953         trace_nbd_negotiate_meta_query_skip("length too long");
954         return nbd_opt_skip(client, len, errp);
955     }
956     if (len < ns_len) {
957         trace_nbd_negotiate_meta_query_skip("length too short");
958         return nbd_opt_skip(client, len, errp);
959     }
960 
961     len -= ns_len;
962     ret = nbd_opt_read(client, ns, ns_len, errp);
963     if (ret <= 0) {
964         return ret;
965     }
966 
967     if (!strncmp(ns, "base:", ns_len)) {
968         trace_nbd_negotiate_meta_query_parse("base:");
969         return nbd_meta_base_query(client, meta, len, errp);
970     } else if (!strncmp(ns, "qemu:", ns_len)) {
971         trace_nbd_negotiate_meta_query_parse("qemu:");
972         return nbd_meta_qemu_query(client, meta, len, errp);
973     }
974 
975     trace_nbd_negotiate_meta_query_skip("unknown namespace");
976     return nbd_opt_skip(client, len, errp);
977 }
978 
979 /* nbd_negotiate_meta_queries
980  * Handle NBD_OPT_LIST_META_CONTEXT and NBD_OPT_SET_META_CONTEXT
981  *
982  * Return -errno on I/O error, or 0 if option was completely handled. */
983 static int nbd_negotiate_meta_queries(NBDClient *client,
984                                       NBDExportMetaContexts *meta, Error **errp)
985 {
986     int ret;
987     g_autofree char *export_name = NULL;
988     NBDExportMetaContexts local_meta;
989     uint32_t nb_queries;
990     int i;
991 
992     if (!client->structured_reply) {
993         return nbd_opt_invalid(client, errp,
994                                "request option '%s' when structured reply "
995                                "is not negotiated",
996                                nbd_opt_lookup(client->opt));
997     }
998 
999     if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
1000         /* Only change the caller's meta on SET. */
1001         meta = &local_meta;
1002     }
1003 
1004     memset(meta, 0, sizeof(*meta));
1005 
1006     ret = nbd_opt_read_name(client, &export_name, NULL, errp);
1007     if (ret <= 0) {
1008         return ret;
1009     }
1010 
1011     meta->exp = nbd_export_find(export_name);
1012     if (meta->exp == NULL) {
1013         g_autofree char *sane_name = nbd_sanitize_name(export_name);
1014 
1015         return nbd_opt_drop(client, NBD_REP_ERR_UNKNOWN, errp,
1016                             "export '%s' not present", sane_name);
1017     }
1018 
1019     ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), errp);
1020     if (ret <= 0) {
1021         return ret;
1022     }
1023     nb_queries = cpu_to_be32(nb_queries);
1024     trace_nbd_negotiate_meta_context(nbd_opt_lookup(client->opt),
1025                                      export_name, nb_queries);
1026 
1027     if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
1028         /* enable all known contexts */
1029         meta->base_allocation = true;
1030         meta->bitmap = !!meta->exp->export_bitmap;
1031     } else {
1032         for (i = 0; i < nb_queries; ++i) {
1033             ret = nbd_negotiate_meta_query(client, meta, errp);
1034             if (ret <= 0) {
1035                 return ret;
1036             }
1037         }
1038     }
1039 
1040     if (meta->base_allocation) {
1041         ret = nbd_negotiate_send_meta_context(client, "base:allocation",
1042                                               NBD_META_ID_BASE_ALLOCATION,
1043                                               errp);
1044         if (ret < 0) {
1045             return ret;
1046         }
1047     }
1048 
1049     if (meta->bitmap) {
1050         ret = nbd_negotiate_send_meta_context(client,
1051                                               meta->exp->export_bitmap_context,
1052                                               NBD_META_ID_DIRTY_BITMAP,
1053                                               errp);
1054         if (ret < 0) {
1055             return ret;
1056         }
1057     }
1058 
1059     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1060     if (ret == 0) {
1061         meta->valid = true;
1062     }
1063 
1064     return ret;
1065 }
1066 
1067 /* nbd_negotiate_options
1068  * Process all NBD_OPT_* client option commands, during fixed newstyle
1069  * negotiation.
1070  * Return:
1071  * -errno  on error, errp is set
1072  * 0       on successful negotiation, errp is not set
1073  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
1074  *         errp is not set
1075  */
1076 static int nbd_negotiate_options(NBDClient *client, Error **errp)
1077 {
1078     uint32_t flags;
1079     bool fixedNewstyle = false;
1080     bool no_zeroes = false;
1081 
1082     /* Client sends:
1083         [ 0 ..   3]   client flags
1084 
1085        Then we loop until NBD_OPT_EXPORT_NAME or NBD_OPT_GO:
1086         [ 0 ..   7]   NBD_OPTS_MAGIC
1087         [ 8 ..  11]   NBD option
1088         [12 ..  15]   Data length
1089         ...           Rest of request
1090 
1091         [ 0 ..   7]   NBD_OPTS_MAGIC
1092         [ 8 ..  11]   Second NBD option
1093         [12 ..  15]   Data length
1094         ...           Rest of request
1095     */
1096 
1097     if (nbd_read32(client->ioc, &flags, "flags", errp) < 0) {
1098         return -EIO;
1099     }
1100     trace_nbd_negotiate_options_flags(flags);
1101     if (flags & NBD_FLAG_C_FIXED_NEWSTYLE) {
1102         fixedNewstyle = true;
1103         flags &= ~NBD_FLAG_C_FIXED_NEWSTYLE;
1104     }
1105     if (flags & NBD_FLAG_C_NO_ZEROES) {
1106         no_zeroes = true;
1107         flags &= ~NBD_FLAG_C_NO_ZEROES;
1108     }
1109     if (flags != 0) {
1110         error_setg(errp, "Unknown client flags 0x%" PRIx32 " received", flags);
1111         return -EINVAL;
1112     }
1113 
1114     while (1) {
1115         int ret;
1116         uint32_t option, length;
1117         uint64_t magic;
1118 
1119         if (nbd_read64(client->ioc, &magic, "opts magic", errp) < 0) {
1120             return -EINVAL;
1121         }
1122         trace_nbd_negotiate_options_check_magic(magic);
1123         if (magic != NBD_OPTS_MAGIC) {
1124             error_setg(errp, "Bad magic received");
1125             return -EINVAL;
1126         }
1127 
1128         if (nbd_read32(client->ioc, &option, "option", errp) < 0) {
1129             return -EINVAL;
1130         }
1131         client->opt = option;
1132 
1133         if (nbd_read32(client->ioc, &length, "option length", errp) < 0) {
1134             return -EINVAL;
1135         }
1136         assert(!client->optlen);
1137         client->optlen = length;
1138 
1139         if (length > NBD_MAX_BUFFER_SIZE) {
1140             error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
1141                        length, NBD_MAX_BUFFER_SIZE);
1142             return -EINVAL;
1143         }
1144 
1145         trace_nbd_negotiate_options_check_option(option,
1146                                                  nbd_opt_lookup(option));
1147         if (client->tlscreds &&
1148             client->ioc == (QIOChannel *)client->sioc) {
1149             QIOChannel *tioc;
1150             if (!fixedNewstyle) {
1151                 error_setg(errp, "Unsupported option 0x%" PRIx32, option);
1152                 return -EINVAL;
1153             }
1154             switch (option) {
1155             case NBD_OPT_STARTTLS:
1156                 if (length) {
1157                     /* Unconditionally drop the connection if the client
1158                      * can't start a TLS negotiation correctly */
1159                     return nbd_reject_length(client, true, errp);
1160                 }
1161                 tioc = nbd_negotiate_handle_starttls(client, errp);
1162                 if (!tioc) {
1163                     return -EIO;
1164                 }
1165                 ret = 0;
1166                 object_unref(OBJECT(client->ioc));
1167                 client->ioc = QIO_CHANNEL(tioc);
1168                 break;
1169 
1170             case NBD_OPT_EXPORT_NAME:
1171                 /* No way to return an error to client, so drop connection */
1172                 error_setg(errp, "Option 0x%x not permitted before TLS",
1173                            option);
1174                 return -EINVAL;
1175 
1176             default:
1177                 /* Let the client keep trying, unless they asked to
1178                  * quit. Always try to give an error back to the
1179                  * client; but when replying to OPT_ABORT, be aware
1180                  * that the client may hang up before receiving the
1181                  * error, in which case we are fine ignoring the
1182                  * resulting EPIPE. */
1183                 ret = nbd_opt_drop(client, NBD_REP_ERR_TLS_REQD,
1184                                    option == NBD_OPT_ABORT ? NULL : errp,
1185                                    "Option 0x%" PRIx32
1186                                    " not permitted before TLS", option);
1187                 if (option == NBD_OPT_ABORT) {
1188                     return 1;
1189                 }
1190                 break;
1191             }
1192         } else if (fixedNewstyle) {
1193             switch (option) {
1194             case NBD_OPT_LIST:
1195                 if (length) {
1196                     ret = nbd_reject_length(client, false, errp);
1197                 } else {
1198                     ret = nbd_negotiate_handle_list(client, errp);
1199                 }
1200                 break;
1201 
1202             case NBD_OPT_ABORT:
1203                 /* NBD spec says we must try to reply before
1204                  * disconnecting, but that we must also tolerate
1205                  * guests that don't wait for our reply. */
1206                 nbd_negotiate_send_rep(client, NBD_REP_ACK, NULL);
1207                 return 1;
1208 
1209             case NBD_OPT_EXPORT_NAME:
1210                 return nbd_negotiate_handle_export_name(client, no_zeroes,
1211                                                         errp);
1212 
1213             case NBD_OPT_INFO:
1214             case NBD_OPT_GO:
1215                 ret = nbd_negotiate_handle_info(client, errp);
1216                 if (ret == 1) {
1217                     assert(option == NBD_OPT_GO);
1218                     return 0;
1219                 }
1220                 break;
1221 
1222             case NBD_OPT_STARTTLS:
1223                 if (length) {
1224                     ret = nbd_reject_length(client, false, errp);
1225                 } else if (client->tlscreds) {
1226                     ret = nbd_negotiate_send_rep_err(client,
1227                                                      NBD_REP_ERR_INVALID, errp,
1228                                                      "TLS already enabled");
1229                 } else {
1230                     ret = nbd_negotiate_send_rep_err(client,
1231                                                      NBD_REP_ERR_POLICY, errp,
1232                                                      "TLS not configured");
1233                 }
1234                 break;
1235 
1236             case NBD_OPT_STRUCTURED_REPLY:
1237                 if (length) {
1238                     ret = nbd_reject_length(client, false, errp);
1239                 } else if (client->structured_reply) {
1240                     ret = nbd_negotiate_send_rep_err(
1241                         client, NBD_REP_ERR_INVALID, errp,
1242                         "structured reply already negotiated");
1243                 } else {
1244                     ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1245                     client->structured_reply = true;
1246                 }
1247                 break;
1248 
1249             case NBD_OPT_LIST_META_CONTEXT:
1250             case NBD_OPT_SET_META_CONTEXT:
1251                 ret = nbd_negotiate_meta_queries(client, &client->export_meta,
1252                                                  errp);
1253                 break;
1254 
1255             default:
1256                 ret = nbd_opt_drop(client, NBD_REP_ERR_UNSUP, errp,
1257                                    "Unsupported option %" PRIu32 " (%s)",
1258                                    option, nbd_opt_lookup(option));
1259                 break;
1260             }
1261         } else {
1262             /*
1263              * If broken new-style we should drop the connection
1264              * for anything except NBD_OPT_EXPORT_NAME
1265              */
1266             switch (option) {
1267             case NBD_OPT_EXPORT_NAME:
1268                 return nbd_negotiate_handle_export_name(client, no_zeroes,
1269                                                         errp);
1270 
1271             default:
1272                 error_setg(errp, "Unsupported option %" PRIu32 " (%s)",
1273                            option, nbd_opt_lookup(option));
1274                 return -EINVAL;
1275             }
1276         }
1277         if (ret < 0) {
1278             return ret;
1279         }
1280     }
1281 }
1282 
1283 /* nbd_negotiate
1284  * Return:
1285  * -errno  on error, errp is set
1286  * 0       on successful negotiation, errp is not set
1287  * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
1288  *         errp is not set
1289  */
1290 static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
1291 {
1292     ERRP_GUARD();
1293     char buf[NBD_OLDSTYLE_NEGOTIATE_SIZE] = "";
1294     int ret;
1295 
1296     /* Old style negotiation header, no room for options
1297         [ 0 ..   7]   passwd       ("NBDMAGIC")
1298         [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
1299         [16 ..  23]   size
1300         [24 ..  27]   export flags (zero-extended)
1301         [28 .. 151]   reserved     (0)
1302 
1303        New style negotiation header, client can send options
1304         [ 0 ..   7]   passwd       ("NBDMAGIC")
1305         [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
1306         [16 ..  17]   server flags (0)
1307         ....options sent, ending in NBD_OPT_EXPORT_NAME or NBD_OPT_GO....
1308      */
1309 
1310     qio_channel_set_blocking(client->ioc, false, NULL);
1311 
1312     trace_nbd_negotiate_begin();
1313     memcpy(buf, "NBDMAGIC", 8);
1314 
1315     stq_be_p(buf + 8, NBD_OPTS_MAGIC);
1316     stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES);
1317 
1318     if (nbd_write(client->ioc, buf, 18, errp) < 0) {
1319         error_prepend(errp, "write failed: ");
1320         return -EINVAL;
1321     }
1322     ret = nbd_negotiate_options(client, errp);
1323     if (ret != 0) {
1324         if (ret < 0) {
1325             error_prepend(errp, "option negotiation failed: ");
1326         }
1327         return ret;
1328     }
1329 
1330     /* Attach the channel to the same AioContext as the export */
1331     if (client->exp && client->exp->common.ctx) {
1332         qio_channel_attach_aio_context(client->ioc, client->exp->common.ctx);
1333     }
1334 
1335     assert(!client->optlen);
1336     trace_nbd_negotiate_success();
1337 
1338     return 0;
1339 }
1340 
1341 static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
1342                                Error **errp)
1343 {
1344     uint8_t buf[NBD_REQUEST_SIZE];
1345     uint32_t magic;
1346     int ret;
1347 
1348     ret = nbd_read(ioc, buf, sizeof(buf), "request", errp);
1349     if (ret < 0) {
1350         return ret;
1351     }
1352 
1353     /* Request
1354        [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
1355        [ 4 ..  5]   flags   (NBD_CMD_FLAG_FUA, ...)
1356        [ 6 ..  7]   type    (NBD_CMD_READ, ...)
1357        [ 8 .. 15]   handle
1358        [16 .. 23]   from
1359        [24 .. 27]   len
1360      */
1361 
1362     magic = ldl_be_p(buf);
1363     request->flags  = lduw_be_p(buf + 4);
1364     request->type   = lduw_be_p(buf + 6);
1365     request->handle = ldq_be_p(buf + 8);
1366     request->from   = ldq_be_p(buf + 16);
1367     request->len    = ldl_be_p(buf + 24);
1368 
1369     trace_nbd_receive_request(magic, request->flags, request->type,
1370                               request->from, request->len);
1371 
1372     if (magic != NBD_REQUEST_MAGIC) {
1373         error_setg(errp, "invalid magic (got 0x%" PRIx32 ")", magic);
1374         return -EINVAL;
1375     }
1376     return 0;
1377 }
1378 
1379 #define MAX_NBD_REQUESTS 16
1380 
1381 void nbd_client_get(NBDClient *client)
1382 {
1383     client->refcount++;
1384 }
1385 
1386 void nbd_client_put(NBDClient *client)
1387 {
1388     if (--client->refcount == 0) {
1389         /* The last reference should be dropped by client->close,
1390          * which is called by client_close.
1391          */
1392         assert(client->closing);
1393 
1394         qio_channel_detach_aio_context(client->ioc);
1395         object_unref(OBJECT(client->sioc));
1396         object_unref(OBJECT(client->ioc));
1397         if (client->tlscreds) {
1398             object_unref(OBJECT(client->tlscreds));
1399         }
1400         g_free(client->tlsauthz);
1401         if (client->exp) {
1402             QTAILQ_REMOVE(&client->exp->clients, client, next);
1403             blk_exp_unref(&client->exp->common);
1404         }
1405         g_free(client);
1406     }
1407 }
1408 
1409 static void client_close(NBDClient *client, bool negotiated)
1410 {
1411     if (client->closing) {
1412         return;
1413     }
1414 
1415     client->closing = true;
1416 
1417     /* Force requests to finish.  They will drop their own references,
1418      * then we'll close the socket and free the NBDClient.
1419      */
1420     qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
1421                          NULL);
1422 
1423     /* Also tell the client, so that they release their reference.  */
1424     if (client->close_fn) {
1425         client->close_fn(client, negotiated);
1426     }
1427 }
1428 
1429 static NBDRequestData *nbd_request_get(NBDClient *client)
1430 {
1431     NBDRequestData *req;
1432 
1433     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
1434     client->nb_requests++;
1435 
1436     req = g_new0(NBDRequestData, 1);
1437     nbd_client_get(client);
1438     req->client = client;
1439     return req;
1440 }
1441 
1442 static void nbd_request_put(NBDRequestData *req)
1443 {
1444     NBDClient *client = req->client;
1445 
1446     if (req->data) {
1447         qemu_vfree(req->data);
1448     }
1449     g_free(req);
1450 
1451     client->nb_requests--;
1452     nbd_client_receive_next_request(client);
1453 
1454     nbd_client_put(client);
1455 }
1456 
1457 static void blk_aio_attached(AioContext *ctx, void *opaque)
1458 {
1459     NBDExport *exp = opaque;
1460     NBDClient *client;
1461 
1462     trace_nbd_blk_aio_attached(exp->name, ctx);
1463 
1464     exp->common.ctx = ctx;
1465 
1466     QTAILQ_FOREACH(client, &exp->clients, next) {
1467         qio_channel_attach_aio_context(client->ioc, ctx);
1468         if (client->recv_coroutine) {
1469             aio_co_schedule(ctx, client->recv_coroutine);
1470         }
1471         if (client->send_coroutine) {
1472             aio_co_schedule(ctx, client->send_coroutine);
1473         }
1474     }
1475 }
1476 
1477 static void blk_aio_detach(void *opaque)
1478 {
1479     NBDExport *exp = opaque;
1480     NBDClient *client;
1481 
1482     trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
1483 
1484     QTAILQ_FOREACH(client, &exp->clients, next) {
1485         qio_channel_detach_aio_context(client->ioc);
1486     }
1487 
1488     exp->common.ctx = NULL;
1489 }
1490 
1491 static void nbd_eject_notifier(Notifier *n, void *data)
1492 {
1493     NBDExport *exp = container_of(n, NBDExport, eject_notifier);
1494 
1495     blk_exp_request_shutdown(&exp->common);
1496 }
1497 
1498 void nbd_export_set_on_eject_blk(BlockExport *exp, BlockBackend *blk)
1499 {
1500     NBDExport *nbd_exp = container_of(exp, NBDExport, common);
1501     assert(exp->drv == &blk_exp_nbd);
1502     assert(nbd_exp->eject_notifier_blk == NULL);
1503 
1504     blk_ref(blk);
1505     nbd_exp->eject_notifier_blk = blk;
1506     nbd_exp->eject_notifier.notify = nbd_eject_notifier;
1507     blk_add_remove_bs_notifier(blk, &nbd_exp->eject_notifier);
1508 }
1509 
1510 static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
1511                              Error **errp)
1512 {
1513     NBDExport *exp = container_of(blk_exp, NBDExport, common);
1514     BlockExportOptionsNbd *arg = &exp_args->u.nbd;
1515     BlockBackend *blk = blk_exp->blk;
1516     int64_t size;
1517     uint64_t perm, shared_perm;
1518     bool readonly = !exp_args->writable;
1519     bool shared = !exp_args->writable;
1520     int ret;
1521 
1522     assert(exp_args->type == BLOCK_EXPORT_TYPE_NBD);
1523 
1524     if (!nbd_server_is_running()) {
1525         error_setg(errp, "NBD server not running");
1526         return -EINVAL;
1527     }
1528 
1529     if (!arg->has_name) {
1530         arg->name = exp_args->node_name;
1531     }
1532 
1533     if (strlen(arg->name) > NBD_MAX_STRING_SIZE) {
1534         error_setg(errp, "export name '%s' too long", arg->name);
1535         return -EINVAL;
1536     }
1537 
1538     if (arg->description && strlen(arg->description) > NBD_MAX_STRING_SIZE) {
1539         error_setg(errp, "description '%s' too long", arg->description);
1540         return -EINVAL;
1541     }
1542 
1543     if (nbd_export_find(arg->name)) {
1544         error_setg(errp, "NBD server already has export named '%s'", arg->name);
1545         return -EEXIST;
1546     }
1547 
1548     size = blk_getlength(blk);
1549     if (size < 0) {
1550         error_setg_errno(errp, -size,
1551                          "Failed to determine the NBD export's length");
1552         return size;
1553     }
1554 
1555     /* Don't allow resize while the NBD server is running, otherwise we don't
1556      * care what happens with the node. */
1557     blk_get_perm(blk, &perm, &shared_perm);
1558     ret = blk_set_perm(blk, perm, shared_perm & ~BLK_PERM_RESIZE, errp);
1559     if (ret < 0) {
1560         return ret;
1561     }
1562 
1563     blk_set_allow_aio_context_change(blk, true);
1564 
1565     QTAILQ_INIT(&exp->clients);
1566     exp->name = g_strdup(arg->name);
1567     exp->description = g_strdup(arg->description);
1568     exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH |
1569                      NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE);
1570     if (readonly) {
1571         exp->nbdflags |= NBD_FLAG_READ_ONLY;
1572         if (shared) {
1573             exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN;
1574         }
1575     } else {
1576         exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES |
1577                           NBD_FLAG_SEND_FAST_ZERO);
1578     }
1579     exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
1580 
1581     if (arg->bitmap) {
1582         BlockDriverState *bs = blk_bs(blk);
1583         BdrvDirtyBitmap *bm = NULL;
1584 
1585         while (bs) {
1586             bm = bdrv_find_dirty_bitmap(bs, arg->bitmap);
1587             if (bm != NULL) {
1588                 break;
1589             }
1590 
1591             bs = bdrv_filter_or_cow_bs(bs);
1592         }
1593 
1594         if (bm == NULL) {
1595             ret = -ENOENT;
1596             error_setg(errp, "Bitmap '%s' is not found", arg->bitmap);
1597             goto fail;
1598         }
1599 
1600         if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
1601             ret = -EINVAL;
1602             goto fail;
1603         }
1604 
1605         if (readonly && bdrv_is_writable(bs) &&
1606             bdrv_dirty_bitmap_enabled(bm)) {
1607             ret = -EINVAL;
1608             error_setg(errp,
1609                        "Enabled bitmap '%s' incompatible with readonly export",
1610                        arg->bitmap);
1611             goto fail;
1612         }
1613 
1614         bdrv_dirty_bitmap_set_busy(bm, true);
1615         exp->export_bitmap = bm;
1616         assert(strlen(arg->bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
1617         exp->export_bitmap_context = g_strdup_printf("qemu:dirty-bitmap:%s",
1618                                                      arg->bitmap);
1619         assert(strlen(exp->export_bitmap_context) < NBD_MAX_STRING_SIZE);
1620     }
1621 
1622     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
1623 
1624     QTAILQ_INSERT_TAIL(&exports, exp, next);
1625 
1626     return 0;
1627 
1628 fail:
1629     g_free(exp->name);
1630     g_free(exp->description);
1631     return ret;
1632 }
1633 
1634 NBDExport *nbd_export_find(const char *name)
1635 {
1636     NBDExport *exp;
1637     QTAILQ_FOREACH(exp, &exports, next) {
1638         if (strcmp(name, exp->name) == 0) {
1639             return exp;
1640         }
1641     }
1642 
1643     return NULL;
1644 }
1645 
1646 AioContext *
1647 nbd_export_aio_context(NBDExport *exp)
1648 {
1649     return exp->common.ctx;
1650 }
1651 
1652 static void nbd_export_request_shutdown(BlockExport *blk_exp)
1653 {
1654     NBDExport *exp = container_of(blk_exp, NBDExport, common);
1655     NBDClient *client, *next;
1656 
1657     blk_exp_ref(&exp->common);
1658     /*
1659      * TODO: Should we expand QMP NbdServerRemoveNode enum to allow a
1660      * close mode that stops advertising the export to new clients but
1661      * still permits existing clients to run to completion? Because of
1662      * that possibility, nbd_export_close() can be called more than
1663      * once on an export.
1664      */
1665     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
1666         client_close(client, true);
1667     }
1668     if (exp->name) {
1669         g_free(exp->name);
1670         exp->name = NULL;
1671         QTAILQ_REMOVE(&exports, exp, next);
1672     }
1673     blk_exp_unref(&exp->common);
1674 }
1675 
1676 static void nbd_export_delete(BlockExport *blk_exp)
1677 {
1678     NBDExport *exp = container_of(blk_exp, NBDExport, common);
1679 
1680     assert(exp->name == NULL);
1681     assert(QTAILQ_EMPTY(&exp->clients));
1682 
1683     g_free(exp->description);
1684     exp->description = NULL;
1685 
1686     if (exp->common.blk) {
1687         if (exp->eject_notifier_blk) {
1688             notifier_remove(&exp->eject_notifier);
1689             blk_unref(exp->eject_notifier_blk);
1690         }
1691         blk_remove_aio_context_notifier(exp->common.blk, blk_aio_attached,
1692                                         blk_aio_detach, exp);
1693     }
1694 
1695     if (exp->export_bitmap) {
1696         bdrv_dirty_bitmap_set_busy(exp->export_bitmap, false);
1697         g_free(exp->export_bitmap_context);
1698     }
1699 }
1700 
1701 const BlockExportDriver blk_exp_nbd = {
1702     .type               = BLOCK_EXPORT_TYPE_NBD,
1703     .instance_size      = sizeof(NBDExport),
1704     .create             = nbd_export_create,
1705     .delete             = nbd_export_delete,
1706     .request_shutdown   = nbd_export_request_shutdown,
1707 };
1708 
1709 static int coroutine_fn nbd_co_send_iov(NBDClient *client, struct iovec *iov,
1710                                         unsigned niov, Error **errp)
1711 {
1712     int ret;
1713 
1714     g_assert(qemu_in_coroutine());
1715     qemu_co_mutex_lock(&client->send_lock);
1716     client->send_coroutine = qemu_coroutine_self();
1717 
1718     ret = qio_channel_writev_all(client->ioc, iov, niov, errp) < 0 ? -EIO : 0;
1719 
1720     client->send_coroutine = NULL;
1721     qemu_co_mutex_unlock(&client->send_lock);
1722 
1723     return ret;
1724 }
1725 
1726 static inline void set_be_simple_reply(NBDSimpleReply *reply, uint64_t error,
1727                                        uint64_t handle)
1728 {
1729     stl_be_p(&reply->magic, NBD_SIMPLE_REPLY_MAGIC);
1730     stl_be_p(&reply->error, error);
1731     stq_be_p(&reply->handle, handle);
1732 }
1733 
1734 static int nbd_co_send_simple_reply(NBDClient *client,
1735                                     uint64_t handle,
1736                                     uint32_t error,
1737                                     void *data,
1738                                     size_t len,
1739                                     Error **errp)
1740 {
1741     NBDSimpleReply reply;
1742     int nbd_err = system_errno_to_nbd_errno(error);
1743     struct iovec iov[] = {
1744         {.iov_base = &reply, .iov_len = sizeof(reply)},
1745         {.iov_base = data, .iov_len = len}
1746     };
1747 
1748     trace_nbd_co_send_simple_reply(handle, nbd_err, nbd_err_lookup(nbd_err),
1749                                    len);
1750     set_be_simple_reply(&reply, nbd_err, handle);
1751 
1752     return nbd_co_send_iov(client, iov, len ? 2 : 1, errp);
1753 }
1754 
1755 static inline void set_be_chunk(NBDStructuredReplyChunk *chunk, uint16_t flags,
1756                                 uint16_t type, uint64_t handle, uint32_t length)
1757 {
1758     stl_be_p(&chunk->magic, NBD_STRUCTURED_REPLY_MAGIC);
1759     stw_be_p(&chunk->flags, flags);
1760     stw_be_p(&chunk->type, type);
1761     stq_be_p(&chunk->handle, handle);
1762     stl_be_p(&chunk->length, length);
1763 }
1764 
1765 static int coroutine_fn nbd_co_send_structured_done(NBDClient *client,
1766                                                     uint64_t handle,
1767                                                     Error **errp)
1768 {
1769     NBDStructuredReplyChunk chunk;
1770     struct iovec iov[] = {
1771         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1772     };
1773 
1774     trace_nbd_co_send_structured_done(handle);
1775     set_be_chunk(&chunk, NBD_REPLY_FLAG_DONE, NBD_REPLY_TYPE_NONE, handle, 0);
1776 
1777     return nbd_co_send_iov(client, iov, 1, errp);
1778 }
1779 
1780 static int coroutine_fn nbd_co_send_structured_read(NBDClient *client,
1781                                                     uint64_t handle,
1782                                                     uint64_t offset,
1783                                                     void *data,
1784                                                     size_t size,
1785                                                     bool final,
1786                                                     Error **errp)
1787 {
1788     NBDStructuredReadData chunk;
1789     struct iovec iov[] = {
1790         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1791         {.iov_base = data, .iov_len = size}
1792     };
1793 
1794     assert(size);
1795     trace_nbd_co_send_structured_read(handle, offset, data, size);
1796     set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
1797                  NBD_REPLY_TYPE_OFFSET_DATA, handle,
1798                  sizeof(chunk) - sizeof(chunk.h) + size);
1799     stq_be_p(&chunk.offset, offset);
1800 
1801     return nbd_co_send_iov(client, iov, 2, errp);
1802 }
1803 
1804 static int coroutine_fn nbd_co_send_structured_error(NBDClient *client,
1805                                                      uint64_t handle,
1806                                                      uint32_t error,
1807                                                      const char *msg,
1808                                                      Error **errp)
1809 {
1810     NBDStructuredError chunk;
1811     int nbd_err = system_errno_to_nbd_errno(error);
1812     struct iovec iov[] = {
1813         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1814         {.iov_base = (char *)msg, .iov_len = msg ? strlen(msg) : 0},
1815     };
1816 
1817     assert(nbd_err);
1818     trace_nbd_co_send_structured_error(handle, nbd_err,
1819                                        nbd_err_lookup(nbd_err), msg ? msg : "");
1820     set_be_chunk(&chunk.h, NBD_REPLY_FLAG_DONE, NBD_REPLY_TYPE_ERROR, handle,
1821                  sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len);
1822     stl_be_p(&chunk.error, nbd_err);
1823     stw_be_p(&chunk.message_length, iov[1].iov_len);
1824 
1825     return nbd_co_send_iov(client, iov, 1 + !!iov[1].iov_len, errp);
1826 }
1827 
1828 /* Do a sparse read and send the structured reply to the client.
1829  * Returns -errno if sending fails. bdrv_block_status_above() failure is
1830  * reported to the client, at which point this function succeeds.
1831  */
1832 static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
1833                                                 uint64_t handle,
1834                                                 uint64_t offset,
1835                                                 uint8_t *data,
1836                                                 size_t size,
1837                                                 Error **errp)
1838 {
1839     int ret = 0;
1840     NBDExport *exp = client->exp;
1841     size_t progress = 0;
1842 
1843     while (progress < size) {
1844         int64_t pnum;
1845         int status = bdrv_block_status_above(blk_bs(exp->common.blk), NULL,
1846                                              offset + progress,
1847                                              size - progress, &pnum, NULL,
1848                                              NULL);
1849         bool final;
1850 
1851         if (status < 0) {
1852             char *msg = g_strdup_printf("unable to check for holes: %s",
1853                                         strerror(-status));
1854 
1855             ret = nbd_co_send_structured_error(client, handle, -status, msg,
1856                                                errp);
1857             g_free(msg);
1858             return ret;
1859         }
1860         assert(pnum && pnum <= size - progress);
1861         final = progress + pnum == size;
1862         if (status & BDRV_BLOCK_ZERO) {
1863             NBDStructuredReadHole chunk;
1864             struct iovec iov[] = {
1865                 {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1866             };
1867 
1868             trace_nbd_co_send_structured_read_hole(handle, offset + progress,
1869                                                    pnum);
1870             set_be_chunk(&chunk.h, final ? NBD_REPLY_FLAG_DONE : 0,
1871                          NBD_REPLY_TYPE_OFFSET_HOLE,
1872                          handle, sizeof(chunk) - sizeof(chunk.h));
1873             stq_be_p(&chunk.offset, offset + progress);
1874             stl_be_p(&chunk.length, pnum);
1875             ret = nbd_co_send_iov(client, iov, 1, errp);
1876         } else {
1877             ret = blk_pread(exp->common.blk, offset + progress,
1878                             data + progress, pnum);
1879             if (ret < 0) {
1880                 error_setg_errno(errp, -ret, "reading from file failed");
1881                 break;
1882             }
1883             ret = nbd_co_send_structured_read(client, handle, offset + progress,
1884                                               data + progress, pnum, final,
1885                                               errp);
1886         }
1887 
1888         if (ret < 0) {
1889             break;
1890         }
1891         progress += pnum;
1892     }
1893     return ret;
1894 }
1895 
1896 typedef struct NBDExtentArray {
1897     NBDExtent *extents;
1898     unsigned int nb_alloc;
1899     unsigned int count;
1900     uint64_t total_length;
1901     bool can_add;
1902     bool converted_to_be;
1903 } NBDExtentArray;
1904 
1905 static NBDExtentArray *nbd_extent_array_new(unsigned int nb_alloc)
1906 {
1907     NBDExtentArray *ea = g_new0(NBDExtentArray, 1);
1908 
1909     ea->nb_alloc = nb_alloc;
1910     ea->extents = g_new(NBDExtent, nb_alloc);
1911     ea->can_add = true;
1912 
1913     return ea;
1914 }
1915 
1916 static void nbd_extent_array_free(NBDExtentArray *ea)
1917 {
1918     g_free(ea->extents);
1919     g_free(ea);
1920 }
1921 G_DEFINE_AUTOPTR_CLEANUP_FUNC(NBDExtentArray, nbd_extent_array_free);
1922 
1923 /* Further modifications of the array after conversion are abandoned */
1924 static void nbd_extent_array_convert_to_be(NBDExtentArray *ea)
1925 {
1926     int i;
1927 
1928     assert(!ea->converted_to_be);
1929     ea->can_add = false;
1930     ea->converted_to_be = true;
1931 
1932     for (i = 0; i < ea->count; i++) {
1933         ea->extents[i].flags = cpu_to_be32(ea->extents[i].flags);
1934         ea->extents[i].length = cpu_to_be32(ea->extents[i].length);
1935     }
1936 }
1937 
1938 /*
1939  * Add extent to NBDExtentArray. If extent can't be added (no available space),
1940  * return -1.
1941  * For safety, when returning -1 for the first time, .can_add is set to false,
1942  * further call to nbd_extent_array_add() will crash.
1943  * (to avoid the situation, when after failing to add an extent (returned -1),
1944  * user miss this failure and add another extent, which is successfully added
1945  * (array is full, but new extent may be squashed into the last one), then we
1946  * have invalid array with skipped extent)
1947  */
1948 static int nbd_extent_array_add(NBDExtentArray *ea,
1949                                 uint32_t length, uint32_t flags)
1950 {
1951     assert(ea->can_add);
1952 
1953     if (!length) {
1954         return 0;
1955     }
1956 
1957     /* Extend previous extent if flags are the same */
1958     if (ea->count > 0 && flags == ea->extents[ea->count - 1].flags) {
1959         uint64_t sum = (uint64_t)length + ea->extents[ea->count - 1].length;
1960 
1961         if (sum <= UINT32_MAX) {
1962             ea->extents[ea->count - 1].length = sum;
1963             ea->total_length += length;
1964             return 0;
1965         }
1966     }
1967 
1968     if (ea->count >= ea->nb_alloc) {
1969         ea->can_add = false;
1970         return -1;
1971     }
1972 
1973     ea->total_length += length;
1974     ea->extents[ea->count] = (NBDExtent) {.length = length, .flags = flags};
1975     ea->count++;
1976 
1977     return 0;
1978 }
1979 
1980 static int blockstatus_to_extents(BlockDriverState *bs, uint64_t offset,
1981                                   uint64_t bytes, NBDExtentArray *ea)
1982 {
1983     while (bytes) {
1984         uint32_t flags;
1985         int64_t num;
1986         int ret = bdrv_block_status_above(bs, NULL, offset, bytes, &num,
1987                                           NULL, NULL);
1988 
1989         if (ret < 0) {
1990             return ret;
1991         }
1992 
1993         flags = (ret & BDRV_BLOCK_ALLOCATED ? 0 : NBD_STATE_HOLE) |
1994                 (ret & BDRV_BLOCK_ZERO      ? NBD_STATE_ZERO : 0);
1995 
1996         if (nbd_extent_array_add(ea, num, flags) < 0) {
1997             return 0;
1998         }
1999 
2000         offset += num;
2001         bytes -= num;
2002     }
2003 
2004     return 0;
2005 }
2006 
2007 /*
2008  * nbd_co_send_extents
2009  *
2010  * @ea is converted to BE by the function
2011  * @last controls whether NBD_REPLY_FLAG_DONE is sent.
2012  */
2013 static int nbd_co_send_extents(NBDClient *client, uint64_t handle,
2014                                NBDExtentArray *ea,
2015                                bool last, uint32_t context_id, Error **errp)
2016 {
2017     NBDStructuredMeta chunk;
2018     struct iovec iov[] = {
2019         {.iov_base = &chunk, .iov_len = sizeof(chunk)},
2020         {.iov_base = ea->extents, .iov_len = ea->count * sizeof(ea->extents[0])}
2021     };
2022 
2023     nbd_extent_array_convert_to_be(ea);
2024 
2025     trace_nbd_co_send_extents(handle, ea->count, context_id, ea->total_length,
2026                               last);
2027     set_be_chunk(&chunk.h, last ? NBD_REPLY_FLAG_DONE : 0,
2028                  NBD_REPLY_TYPE_BLOCK_STATUS,
2029                  handle, sizeof(chunk) - sizeof(chunk.h) + iov[1].iov_len);
2030     stl_be_p(&chunk.context_id, context_id);
2031 
2032     return nbd_co_send_iov(client, iov, 2, errp);
2033 }
2034 
2035 /* Get block status from the exported device and send it to the client */
2036 static int nbd_co_send_block_status(NBDClient *client, uint64_t handle,
2037                                     BlockDriverState *bs, uint64_t offset,
2038                                     uint32_t length, bool dont_fragment,
2039                                     bool last, uint32_t context_id,
2040                                     Error **errp)
2041 {
2042     int ret;
2043     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2044     g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
2045 
2046     ret = blockstatus_to_extents(bs, offset, length, ea);
2047     if (ret < 0) {
2048         return nbd_co_send_structured_error(
2049                 client, handle, -ret, "can't get block status", errp);
2050     }
2051 
2052     return nbd_co_send_extents(client, handle, ea, last, context_id, errp);
2053 }
2054 
2055 /* Populate @ea from a dirty bitmap. */
2056 static void bitmap_to_extents(BdrvDirtyBitmap *bitmap,
2057                               uint64_t offset, uint64_t length,
2058                               NBDExtentArray *es)
2059 {
2060     int64_t start, dirty_start, dirty_count;
2061     int64_t end = offset + length;
2062     bool full = false;
2063 
2064     bdrv_dirty_bitmap_lock(bitmap);
2065 
2066     for (start = offset;
2067          bdrv_dirty_bitmap_next_dirty_area(bitmap, start, end, INT32_MAX,
2068                                            &dirty_start, &dirty_count);
2069          start = dirty_start + dirty_count)
2070     {
2071         if ((nbd_extent_array_add(es, dirty_start - start, 0) < 0) ||
2072             (nbd_extent_array_add(es, dirty_count, NBD_STATE_DIRTY) < 0))
2073         {
2074             full = true;
2075             break;
2076         }
2077     }
2078 
2079     if (!full) {
2080         /* last non dirty extent */
2081         nbd_extent_array_add(es, end - start, 0);
2082     }
2083 
2084     bdrv_dirty_bitmap_unlock(bitmap);
2085 }
2086 
2087 static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
2088                               BdrvDirtyBitmap *bitmap, uint64_t offset,
2089                               uint32_t length, bool dont_fragment, bool last,
2090                               uint32_t context_id, Error **errp)
2091 {
2092     unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2093     g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
2094 
2095     bitmap_to_extents(bitmap, offset, length, ea);
2096 
2097     return nbd_co_send_extents(client, handle, ea, last, context_id, errp);
2098 }
2099 
2100 /* nbd_co_receive_request
2101  * Collect a client request. Return 0 if request looks valid, -EIO to drop
2102  * connection right away, and any other negative value to report an error to
2103  * the client (although the caller may still need to disconnect after reporting
2104  * the error).
2105  */
2106 static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
2107                                   Error **errp)
2108 {
2109     NBDClient *client = req->client;
2110     int valid_flags;
2111 
2112     g_assert(qemu_in_coroutine());
2113     assert(client->recv_coroutine == qemu_coroutine_self());
2114     if (nbd_receive_request(client->ioc, request, errp) < 0) {
2115         return -EIO;
2116     }
2117 
2118     trace_nbd_co_receive_request_decode_type(request->handle, request->type,
2119                                              nbd_cmd_lookup(request->type));
2120 
2121     if (request->type != NBD_CMD_WRITE) {
2122         /* No payload, we are ready to read the next request.  */
2123         req->complete = true;
2124     }
2125 
2126     if (request->type == NBD_CMD_DISC) {
2127         /* Special case: we're going to disconnect without a reply,
2128          * whether or not flags, from, or len are bogus */
2129         return -EIO;
2130     }
2131 
2132     if (request->type == NBD_CMD_READ || request->type == NBD_CMD_WRITE ||
2133         request->type == NBD_CMD_CACHE)
2134     {
2135         if (request->len > NBD_MAX_BUFFER_SIZE) {
2136             error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
2137                        request->len, NBD_MAX_BUFFER_SIZE);
2138             return -EINVAL;
2139         }
2140 
2141         if (request->type != NBD_CMD_CACHE) {
2142             req->data = blk_try_blockalign(client->exp->common.blk,
2143                                            request->len);
2144             if (req->data == NULL) {
2145                 error_setg(errp, "No memory");
2146                 return -ENOMEM;
2147             }
2148         }
2149     }
2150 
2151     if (request->type == NBD_CMD_WRITE) {
2152         if (nbd_read(client->ioc, req->data, request->len, "CMD_WRITE data",
2153                      errp) < 0)
2154         {
2155             return -EIO;
2156         }
2157         req->complete = true;
2158 
2159         trace_nbd_co_receive_request_payload_received(request->handle,
2160                                                       request->len);
2161     }
2162 
2163     /* Sanity checks. */
2164     if (client->exp->nbdflags & NBD_FLAG_READ_ONLY &&
2165         (request->type == NBD_CMD_WRITE ||
2166          request->type == NBD_CMD_WRITE_ZEROES ||
2167          request->type == NBD_CMD_TRIM)) {
2168         error_setg(errp, "Export is read-only");
2169         return -EROFS;
2170     }
2171     if (request->from > client->exp->size ||
2172         request->len > client->exp->size - request->from) {
2173         error_setg(errp, "operation past EOF; From: %" PRIu64 ", Len: %" PRIu32
2174                    ", Size: %" PRIu64, request->from, request->len,
2175                    client->exp->size);
2176         return (request->type == NBD_CMD_WRITE ||
2177                 request->type == NBD_CMD_WRITE_ZEROES) ? -ENOSPC : -EINVAL;
2178     }
2179     if (client->check_align && !QEMU_IS_ALIGNED(request->from | request->len,
2180                                                 client->check_align)) {
2181         /*
2182          * The block layer gracefully handles unaligned requests, but
2183          * it's still worth tracing client non-compliance
2184          */
2185         trace_nbd_co_receive_align_compliance(nbd_cmd_lookup(request->type),
2186                                               request->from,
2187                                               request->len,
2188                                               client->check_align);
2189     }
2190     valid_flags = NBD_CMD_FLAG_FUA;
2191     if (request->type == NBD_CMD_READ && client->structured_reply) {
2192         valid_flags |= NBD_CMD_FLAG_DF;
2193     } else if (request->type == NBD_CMD_WRITE_ZEROES) {
2194         valid_flags |= NBD_CMD_FLAG_NO_HOLE | NBD_CMD_FLAG_FAST_ZERO;
2195     } else if (request->type == NBD_CMD_BLOCK_STATUS) {
2196         valid_flags |= NBD_CMD_FLAG_REQ_ONE;
2197     }
2198     if (request->flags & ~valid_flags) {
2199         error_setg(errp, "unsupported flags for command %s (got 0x%x)",
2200                    nbd_cmd_lookup(request->type), request->flags);
2201         return -EINVAL;
2202     }
2203 
2204     return 0;
2205 }
2206 
2207 /* Send simple reply without a payload, or a structured error
2208  * @error_msg is ignored if @ret >= 0
2209  * Returns 0 if connection is still live, -errno on failure to talk to client
2210  */
2211 static coroutine_fn int nbd_send_generic_reply(NBDClient *client,
2212                                                uint64_t handle,
2213                                                int ret,
2214                                                const char *error_msg,
2215                                                Error **errp)
2216 {
2217     if (client->structured_reply && ret < 0) {
2218         return nbd_co_send_structured_error(client, handle, -ret, error_msg,
2219                                             errp);
2220     } else {
2221         return nbd_co_send_simple_reply(client, handle, ret < 0 ? -ret : 0,
2222                                         NULL, 0, errp);
2223     }
2224 }
2225 
2226 /* Handle NBD_CMD_READ request.
2227  * Return -errno if sending fails. Other errors are reported directly to the
2228  * client as an error reply. */
2229 static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
2230                                         uint8_t *data, Error **errp)
2231 {
2232     int ret;
2233     NBDExport *exp = client->exp;
2234 
2235     assert(request->type == NBD_CMD_READ);
2236 
2237     /* XXX: NBD Protocol only documents use of FUA with WRITE */
2238     if (request->flags & NBD_CMD_FLAG_FUA) {
2239         ret = blk_co_flush(exp->common.blk);
2240         if (ret < 0) {
2241             return nbd_send_generic_reply(client, request->handle, ret,
2242                                           "flush failed", errp);
2243         }
2244     }
2245 
2246     if (client->structured_reply && !(request->flags & NBD_CMD_FLAG_DF) &&
2247         request->len)
2248     {
2249         return nbd_co_send_sparse_read(client, request->handle, request->from,
2250                                        data, request->len, errp);
2251     }
2252 
2253     ret = blk_pread(exp->common.blk, request->from, data, request->len);
2254     if (ret < 0) {
2255         return nbd_send_generic_reply(client, request->handle, ret,
2256                                       "reading from file failed", errp);
2257     }
2258 
2259     if (client->structured_reply) {
2260         if (request->len) {
2261             return nbd_co_send_structured_read(client, request->handle,
2262                                                request->from, data,
2263                                                request->len, true, errp);
2264         } else {
2265             return nbd_co_send_structured_done(client, request->handle, errp);
2266         }
2267     } else {
2268         return nbd_co_send_simple_reply(client, request->handle, 0,
2269                                         data, request->len, errp);
2270     }
2271 }
2272 
2273 /*
2274  * nbd_do_cmd_cache
2275  *
2276  * Handle NBD_CMD_CACHE request.
2277  * Return -errno if sending fails. Other errors are reported directly to the
2278  * client as an error reply.
2279  */
2280 static coroutine_fn int nbd_do_cmd_cache(NBDClient *client, NBDRequest *request,
2281                                          Error **errp)
2282 {
2283     int ret;
2284     NBDExport *exp = client->exp;
2285 
2286     assert(request->type == NBD_CMD_CACHE);
2287 
2288     ret = blk_co_preadv(exp->common.blk, request->from, request->len,
2289                         NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
2290 
2291     return nbd_send_generic_reply(client, request->handle, ret,
2292                                   "caching data failed", errp);
2293 }
2294 
2295 /* Handle NBD request.
2296  * Return -errno if sending fails. Other errors are reported directly to the
2297  * client as an error reply. */
2298 static coroutine_fn int nbd_handle_request(NBDClient *client,
2299                                            NBDRequest *request,
2300                                            uint8_t *data, Error **errp)
2301 {
2302     int ret;
2303     int flags;
2304     NBDExport *exp = client->exp;
2305     char *msg;
2306 
2307     switch (request->type) {
2308     case NBD_CMD_CACHE:
2309         return nbd_do_cmd_cache(client, request, errp);
2310 
2311     case NBD_CMD_READ:
2312         return nbd_do_cmd_read(client, request, data, errp);
2313 
2314     case NBD_CMD_WRITE:
2315         flags = 0;
2316         if (request->flags & NBD_CMD_FLAG_FUA) {
2317             flags |= BDRV_REQ_FUA;
2318         }
2319         ret = blk_pwrite(exp->common.blk, request->from, data, request->len,
2320                          flags);
2321         return nbd_send_generic_reply(client, request->handle, ret,
2322                                       "writing to file failed", errp);
2323 
2324     case NBD_CMD_WRITE_ZEROES:
2325         flags = 0;
2326         if (request->flags & NBD_CMD_FLAG_FUA) {
2327             flags |= BDRV_REQ_FUA;
2328         }
2329         if (!(request->flags & NBD_CMD_FLAG_NO_HOLE)) {
2330             flags |= BDRV_REQ_MAY_UNMAP;
2331         }
2332         if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
2333             flags |= BDRV_REQ_NO_FALLBACK;
2334         }
2335         ret = 0;
2336         /* FIXME simplify this when blk_pwrite_zeroes switches to 64-bit */
2337         while (ret >= 0 && request->len) {
2338             int align = client->check_align ?: 1;
2339             int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES,
2340                                                         align));
2341             ret = blk_pwrite_zeroes(exp->common.blk, request->from, len, flags);
2342             request->len -= len;
2343             request->from += len;
2344         }
2345         return nbd_send_generic_reply(client, request->handle, ret,
2346                                       "writing to file failed", errp);
2347 
2348     case NBD_CMD_DISC:
2349         /* unreachable, thanks to special case in nbd_co_receive_request() */
2350         abort();
2351 
2352     case NBD_CMD_FLUSH:
2353         ret = blk_co_flush(exp->common.blk);
2354         return nbd_send_generic_reply(client, request->handle, ret,
2355                                       "flush failed", errp);
2356 
2357     case NBD_CMD_TRIM:
2358         ret = 0;
2359         /* FIXME simplify this when blk_co_pdiscard switches to 64-bit */
2360         while (ret >= 0 && request->len) {
2361             int align = client->check_align ?: 1;
2362             int len = MIN(request->len, QEMU_ALIGN_DOWN(BDRV_REQUEST_MAX_BYTES,
2363                                                         align));
2364             ret = blk_co_pdiscard(exp->common.blk, request->from, len);
2365             request->len -= len;
2366             request->from += len;
2367         }
2368         if (ret >= 0 && request->flags & NBD_CMD_FLAG_FUA) {
2369             ret = blk_co_flush(exp->common.blk);
2370         }
2371         return nbd_send_generic_reply(client, request->handle, ret,
2372                                       "discard failed", errp);
2373 
2374     case NBD_CMD_BLOCK_STATUS:
2375         if (!request->len) {
2376             return nbd_send_generic_reply(client, request->handle, -EINVAL,
2377                                           "need non-zero length", errp);
2378         }
2379         if (client->export_meta.valid &&
2380             (client->export_meta.base_allocation ||
2381              client->export_meta.bitmap))
2382         {
2383             bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE;
2384 
2385             if (client->export_meta.base_allocation) {
2386                 ret = nbd_co_send_block_status(client, request->handle,
2387                                                blk_bs(exp->common.blk),
2388                                                request->from,
2389                                                request->len, dont_fragment,
2390                                                !client->export_meta.bitmap,
2391                                                NBD_META_ID_BASE_ALLOCATION,
2392                                                errp);
2393                 if (ret < 0) {
2394                     return ret;
2395                 }
2396             }
2397 
2398             if (client->export_meta.bitmap) {
2399                 ret = nbd_co_send_bitmap(client, request->handle,
2400                                          client->exp->export_bitmap,
2401                                          request->from, request->len,
2402                                          dont_fragment,
2403                                          true, NBD_META_ID_DIRTY_BITMAP, errp);
2404                 if (ret < 0) {
2405                     return ret;
2406                 }
2407             }
2408 
2409             return 0;
2410         } else {
2411             return nbd_send_generic_reply(client, request->handle, -EINVAL,
2412                                           "CMD_BLOCK_STATUS not negotiated",
2413                                           errp);
2414         }
2415 
2416     default:
2417         msg = g_strdup_printf("invalid request type (%" PRIu32 ") received",
2418                               request->type);
2419         ret = nbd_send_generic_reply(client, request->handle, -EINVAL, msg,
2420                                      errp);
2421         g_free(msg);
2422         return ret;
2423     }
2424 }
2425 
2426 /* Owns a reference to the NBDClient passed as opaque.  */
2427 static coroutine_fn void nbd_trip(void *opaque)
2428 {
2429     NBDClient *client = opaque;
2430     NBDRequestData *req;
2431     NBDRequest request = { 0 };    /* GCC thinks it can be used uninitialized */
2432     int ret;
2433     Error *local_err = NULL;
2434 
2435     trace_nbd_trip();
2436     if (client->closing) {
2437         nbd_client_put(client);
2438         return;
2439     }
2440 
2441     req = nbd_request_get(client);
2442     ret = nbd_co_receive_request(req, &request, &local_err);
2443     client->recv_coroutine = NULL;
2444 
2445     if (client->closing) {
2446         /*
2447          * The client may be closed when we are blocked in
2448          * nbd_co_receive_request()
2449          */
2450         goto done;
2451     }
2452 
2453     nbd_client_receive_next_request(client);
2454     if (ret == -EIO) {
2455         goto disconnect;
2456     }
2457 
2458     if (ret < 0) {
2459         /* It wans't -EIO, so, according to nbd_co_receive_request()
2460          * semantics, we should return the error to the client. */
2461         Error *export_err = local_err;
2462 
2463         local_err = NULL;
2464         ret = nbd_send_generic_reply(client, request.handle, -EINVAL,
2465                                      error_get_pretty(export_err), &local_err);
2466         error_free(export_err);
2467     } else {
2468         ret = nbd_handle_request(client, &request, req->data, &local_err);
2469     }
2470     if (ret < 0) {
2471         error_prepend(&local_err, "Failed to send reply: ");
2472         goto disconnect;
2473     }
2474 
2475     /* We must disconnect after NBD_CMD_WRITE if we did not
2476      * read the payload.
2477      */
2478     if (!req->complete) {
2479         error_setg(&local_err, "Request handling failed in intermediate state");
2480         goto disconnect;
2481     }
2482 
2483 done:
2484     nbd_request_put(req);
2485     nbd_client_put(client);
2486     return;
2487 
2488 disconnect:
2489     if (local_err) {
2490         error_reportf_err(local_err, "Disconnect client, due to: ");
2491     }
2492     nbd_request_put(req);
2493     client_close(client, true);
2494     nbd_client_put(client);
2495 }
2496 
2497 static void nbd_client_receive_next_request(NBDClient *client)
2498 {
2499     if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
2500         nbd_client_get(client);
2501         client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
2502         aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);
2503     }
2504 }
2505 
2506 static coroutine_fn void nbd_co_client_start(void *opaque)
2507 {
2508     NBDClient *client = opaque;
2509     Error *local_err = NULL;
2510 
2511     qemu_co_mutex_init(&client->send_lock);
2512 
2513     if (nbd_negotiate(client, &local_err)) {
2514         if (local_err) {
2515             error_report_err(local_err);
2516         }
2517         client_close(client, false);
2518         return;
2519     }
2520 
2521     nbd_client_receive_next_request(client);
2522 }
2523 
2524 /*
2525  * Create a new client listener using the given channel @sioc.
2526  * Begin servicing it in a coroutine.  When the connection closes, call
2527  * @close_fn with an indication of whether the client completed negotiation.
2528  */
2529 void nbd_client_new(QIOChannelSocket *sioc,
2530                     QCryptoTLSCreds *tlscreds,
2531                     const char *tlsauthz,
2532                     void (*close_fn)(NBDClient *, bool))
2533 {
2534     NBDClient *client;
2535     Coroutine *co;
2536 
2537     client = g_new0(NBDClient, 1);
2538     client->refcount = 1;
2539     client->tlscreds = tlscreds;
2540     if (tlscreds) {
2541         object_ref(OBJECT(client->tlscreds));
2542     }
2543     client->tlsauthz = g_strdup(tlsauthz);
2544     client->sioc = sioc;
2545     object_ref(OBJECT(client->sioc));
2546     client->ioc = QIO_CHANNEL(sioc);
2547     object_ref(OBJECT(client->ioc));
2548     client->close_fn = close_fn;
2549 
2550     co = qemu_coroutine_create(nbd_co_client_start, client);
2551     qemu_coroutine_enter(co);
2552 }
2553