xref: /openbmc/qemu/nbd/server.c (revision 43b48cfc3e8ff745a10a6b78a55519d5cf7ec5e8)
1 /*
2  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
3  *
4  *  Network Block Device Server Side
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; under version 2 of the License.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "nbd-internal.h"
20 
21 static int system_errno_to_nbd_errno(int err)
22 {
23     switch (err) {
24     case 0:
25         return NBD_SUCCESS;
26     case EPERM:
27         return NBD_EPERM;
28     case EIO:
29         return NBD_EIO;
30     case ENOMEM:
31         return NBD_ENOMEM;
32 #ifdef EDQUOT
33     case EDQUOT:
34 #endif
35     case EFBIG:
36     case ENOSPC:
37         return NBD_ENOSPC;
38     case EINVAL:
39     default:
40         return NBD_EINVAL;
41     }
42 }
43 
44 /* Definitions for opaque data types */
45 
46 typedef struct NBDRequest NBDRequest;
47 
48 struct NBDRequest {
49     QSIMPLEQ_ENTRY(NBDRequest) entry;
50     NBDClient *client;
51     uint8_t *data;
52 };
53 
54 struct NBDExport {
55     int refcount;
56     void (*close)(NBDExport *exp);
57 
58     BlockBackend *blk;
59     char *name;
60     off_t dev_offset;
61     off_t size;
62     uint32_t nbdflags;
63     QTAILQ_HEAD(, NBDClient) clients;
64     QTAILQ_ENTRY(NBDExport) next;
65 
66     AioContext *ctx;
67 
68     Notifier eject_notifier;
69 };
70 
71 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
72 
73 struct NBDClient {
74     int refcount;
75     void (*close)(NBDClient *client);
76 
77     NBDExport *exp;
78     int sock;
79 
80     Coroutine *recv_coroutine;
81 
82     CoMutex send_lock;
83     Coroutine *send_coroutine;
84 
85     bool can_read;
86 
87     QTAILQ_ENTRY(NBDClient) next;
88     int nb_requests;
89     bool closing;
90 };
91 
92 /* That's all folks */
93 
94 static void nbd_set_handlers(NBDClient *client);
95 static void nbd_unset_handlers(NBDClient *client);
96 static void nbd_update_can_read(NBDClient *client);
97 
98 static void nbd_negotiate_continue(void *opaque)
99 {
100     qemu_coroutine_enter(opaque, NULL);
101 }
102 
103 static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
104 {
105     ssize_t ret;
106 
107     assert(qemu_in_coroutine());
108     /* Negotiation are always in main loop. */
109     qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
110                         qemu_coroutine_self());
111     ret = read_sync(fd, buffer, size);
112     qemu_set_fd_handler(fd, NULL, NULL, NULL);
113     return ret;
114 
115 }
116 
117 static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
118 {
119     ssize_t ret;
120 
121     assert(qemu_in_coroutine());
122     /* Negotiation are always in main loop. */
123     qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
124                         qemu_coroutine_self());
125     ret = write_sync(fd, buffer, size);
126     qemu_set_fd_handler(fd, NULL, NULL, NULL);
127     return ret;
128 }
129 
130 static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
131 {
132     ssize_t ret, dropped = size;
133     uint8_t *buffer = g_malloc(MIN(65536, size));
134 
135     while (size > 0) {
136         ret = nbd_negotiate_read(fd, buffer, MIN(65536, size));
137         if (ret < 0) {
138             g_free(buffer);
139             return ret;
140         }
141 
142         assert(ret <= size);
143         size -= ret;
144     }
145 
146     g_free(buffer);
147     return dropped;
148 }
149 
150 /* Basic flow for negotiation
151 
152    Server         Client
153    Negotiate
154 
155    or
156 
157    Server         Client
158    Negotiate #1
159                   Option
160    Negotiate #2
161 
162    ----
163 
164    followed by
165 
166    Server         Client
167                   Request
168    Response
169                   Request
170    Response
171                   ...
172    ...
173                   Request (type == 2)
174 
175 */
176 
177 static int nbd_negotiate_send_rep(int csock, uint32_t type, uint32_t opt)
178 {
179     uint64_t magic;
180     uint32_t len;
181 
182     magic = cpu_to_be64(NBD_REP_MAGIC);
183     if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
184         LOG("write failed (rep magic)");
185         return -EINVAL;
186     }
187     opt = cpu_to_be32(opt);
188     if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
189         LOG("write failed (rep opt)");
190         return -EINVAL;
191     }
192     type = cpu_to_be32(type);
193     if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
194         LOG("write failed (rep type)");
195         return -EINVAL;
196     }
197     len = cpu_to_be32(0);
198     if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
199         LOG("write failed (rep data length)");
200         return -EINVAL;
201     }
202     return 0;
203 }
204 
205 static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
206 {
207     uint64_t magic, name_len;
208     uint32_t opt, type, len;
209 
210     name_len = strlen(exp->name);
211     magic = cpu_to_be64(NBD_REP_MAGIC);
212     if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
213         LOG("write failed (magic)");
214         return -EINVAL;
215      }
216     opt = cpu_to_be32(NBD_OPT_LIST);
217     if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
218         LOG("write failed (opt)");
219         return -EINVAL;
220     }
221     type = cpu_to_be32(NBD_REP_SERVER);
222     if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
223         LOG("write failed (reply type)");
224         return -EINVAL;
225     }
226     len = cpu_to_be32(name_len + sizeof(len));
227     if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
228         LOG("write failed (length)");
229         return -EINVAL;
230     }
231     len = cpu_to_be32(name_len);
232     if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
233         LOG("write failed (length)");
234         return -EINVAL;
235     }
236     if (nbd_negotiate_write(csock, exp->name, name_len) != name_len) {
237         LOG("write failed (buffer)");
238         return -EINVAL;
239     }
240     return 0;
241 }
242 
243 static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
244 {
245     int csock;
246     NBDExport *exp;
247 
248     csock = client->sock;
249     if (length) {
250         if (nbd_negotiate_drop_sync(csock, length) != length) {
251             return -EIO;
252         }
253         return nbd_negotiate_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
254     }
255 
256     /* For each export, send a NBD_REP_SERVER reply. */
257     QTAILQ_FOREACH(exp, &exports, next) {
258         if (nbd_negotiate_send_rep_list(csock, exp)) {
259             return -EINVAL;
260         }
261     }
262     /* Finish with a NBD_REP_ACK. */
263     return nbd_negotiate_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
264 }
265 
266 static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
267 {
268     int rc = -EINVAL, csock = client->sock;
269     char name[256];
270 
271     /* Client sends:
272         [20 ..  xx]   export name (length bytes)
273      */
274     TRACE("Checking length");
275     if (length > 255) {
276         LOG("Bad length received");
277         goto fail;
278     }
279     if (nbd_negotiate_read(csock, name, length) != length) {
280         LOG("read failed");
281         goto fail;
282     }
283     name[length] = '\0';
284 
285     client->exp = nbd_export_find(name);
286     if (!client->exp) {
287         LOG("export not found");
288         goto fail;
289     }
290 
291     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
292     nbd_export_get(client->exp);
293     rc = 0;
294 fail:
295     return rc;
296 }
297 
298 static int nbd_negotiate_options(NBDClient *client)
299 {
300     int csock = client->sock;
301     uint32_t flags;
302 
303     /* Client sends:
304         [ 0 ..   3]   client flags
305 
306         [ 0 ..   7]   NBD_OPTS_MAGIC
307         [ 8 ..  11]   NBD option
308         [12 ..  15]   Data length
309         ...           Rest of request
310 
311         [ 0 ..   7]   NBD_OPTS_MAGIC
312         [ 8 ..  11]   Second NBD option
313         [12 ..  15]   Data length
314         ...           Rest of request
315     */
316 
317     if (nbd_negotiate_read(csock, &flags, sizeof(flags)) != sizeof(flags)) {
318         LOG("read failed");
319         return -EIO;
320     }
321     TRACE("Checking client flags");
322     be32_to_cpus(&flags);
323     if (flags != 0 && flags != NBD_FLAG_C_FIXED_NEWSTYLE) {
324         LOG("Bad client flags received");
325         return -EIO;
326     }
327 
328     while (1) {
329         int ret;
330         uint32_t tmp, length;
331         uint64_t magic;
332 
333         if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
334             LOG("read failed");
335             return -EINVAL;
336         }
337         TRACE("Checking opts magic");
338         if (magic != be64_to_cpu(NBD_OPTS_MAGIC)) {
339             LOG("Bad magic received");
340             return -EINVAL;
341         }
342 
343         if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
344             LOG("read failed");
345             return -EINVAL;
346         }
347 
348         if (nbd_negotiate_read(csock, &length,
349                                sizeof(length)) != sizeof(length)) {
350             LOG("read failed");
351             return -EINVAL;
352         }
353         length = be32_to_cpu(length);
354 
355         TRACE("Checking option");
356         switch (be32_to_cpu(tmp)) {
357         case NBD_OPT_LIST:
358             ret = nbd_negotiate_handle_list(client, length);
359             if (ret < 0) {
360                 return ret;
361             }
362             break;
363 
364         case NBD_OPT_ABORT:
365             return -EINVAL;
366 
367         case NBD_OPT_EXPORT_NAME:
368             return nbd_negotiate_handle_export_name(client, length);
369 
370         default:
371             tmp = be32_to_cpu(tmp);
372             LOG("Unsupported option 0x%x", tmp);
373             nbd_negotiate_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
374             return -EINVAL;
375         }
376     }
377 }
378 
379 typedef struct {
380     NBDClient *client;
381     Coroutine *co;
382 } NBDClientNewData;
383 
384 static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
385 {
386     NBDClient *client = data->client;
387     int csock = client->sock;
388     char buf[8 + 8 + 8 + 128];
389     int rc;
390     const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
391                          NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA);
392 
393     /* Negotiation header without options:
394         [ 0 ..   7]   passwd       ("NBDMAGIC")
395         [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
396         [16 ..  23]   size
397         [24 ..  25]   server flags (0)
398         [26 ..  27]   export flags
399         [28 .. 151]   reserved     (0)
400 
401        Negotiation header with options, part 1:
402         [ 0 ..   7]   passwd       ("NBDMAGIC")
403         [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
404         [16 ..  17]   server flags (0)
405 
406        part 2 (after options are sent):
407         [18 ..  25]   size
408         [26 ..  27]   export flags
409         [28 .. 151]   reserved     (0)
410      */
411 
412     rc = -EINVAL;
413 
414     TRACE("Beginning negotiation.");
415     memset(buf, 0, sizeof(buf));
416     memcpy(buf, "NBDMAGIC", 8);
417     if (client->exp) {
418         assert ((client->exp->nbdflags & ~65535) == 0);
419         cpu_to_be64w((uint64_t*)(buf + 8), NBD_CLIENT_MAGIC);
420         cpu_to_be64w((uint64_t*)(buf + 16), client->exp->size);
421         cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
422     } else {
423         cpu_to_be64w((uint64_t*)(buf + 8), NBD_OPTS_MAGIC);
424         cpu_to_be16w((uint16_t *)(buf + 16), NBD_FLAG_FIXED_NEWSTYLE);
425     }
426 
427     if (client->exp) {
428         if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
429             LOG("write failed");
430             goto fail;
431         }
432     } else {
433         if (nbd_negotiate_write(csock, buf, 18) != 18) {
434             LOG("write failed");
435             goto fail;
436         }
437         rc = nbd_negotiate_options(client);
438         if (rc != 0) {
439             LOG("option negotiation failed");
440             goto fail;
441         }
442 
443         assert ((client->exp->nbdflags & ~65535) == 0);
444         cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
445         cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
446         if (nbd_negotiate_write(csock, buf + 18,
447                                 sizeof(buf) - 18) != sizeof(buf) - 18) {
448             LOG("write failed");
449             goto fail;
450         }
451     }
452 
453     TRACE("Negotiation succeeded.");
454     rc = 0;
455 fail:
456     return rc;
457 }
458 
459 #ifdef __linux__
460 
461 int nbd_disconnect(int fd)
462 {
463     ioctl(fd, NBD_CLEAR_QUE);
464     ioctl(fd, NBD_DISCONNECT);
465     ioctl(fd, NBD_CLEAR_SOCK);
466     return 0;
467 }
468 
469 #else
470 
471 int nbd_disconnect(int fd)
472 {
473     return -ENOTSUP;
474 }
475 #endif
476 
477 static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
478 {
479     uint8_t buf[NBD_REQUEST_SIZE];
480     uint32_t magic;
481     ssize_t ret;
482 
483     ret = read_sync(csock, buf, sizeof(buf));
484     if (ret < 0) {
485         return ret;
486     }
487 
488     if (ret != sizeof(buf)) {
489         LOG("read failed");
490         return -EINVAL;
491     }
492 
493     /* Request
494        [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
495        [ 4 ..  7]   type    (0 == READ, 1 == WRITE)
496        [ 8 .. 15]   handle
497        [16 .. 23]   from
498        [24 .. 27]   len
499      */
500 
501     magic = be32_to_cpup((uint32_t*)buf);
502     request->type  = be32_to_cpup((uint32_t*)(buf + 4));
503     request->handle = be64_to_cpup((uint64_t*)(buf + 8));
504     request->from  = be64_to_cpup((uint64_t*)(buf + 16));
505     request->len   = be32_to_cpup((uint32_t*)(buf + 24));
506 
507     TRACE("Got request: "
508           "{ magic = 0x%x, .type = %d, from = %" PRIu64" , len = %u }",
509           magic, request->type, request->from, request->len);
510 
511     if (magic != NBD_REQUEST_MAGIC) {
512         LOG("invalid magic (got 0x%x)", magic);
513         return -EINVAL;
514     }
515     return 0;
516 }
517 
518 static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
519 {
520     uint8_t buf[NBD_REPLY_SIZE];
521     ssize_t ret;
522 
523     reply->error = system_errno_to_nbd_errno(reply->error);
524 
525     /* Reply
526        [ 0 ..  3]    magic   (NBD_REPLY_MAGIC)
527        [ 4 ..  7]    error   (0 == no error)
528        [ 7 .. 15]    handle
529      */
530     cpu_to_be32w((uint32_t*)buf, NBD_REPLY_MAGIC);
531     cpu_to_be32w((uint32_t*)(buf + 4), reply->error);
532     cpu_to_be64w((uint64_t*)(buf + 8), reply->handle);
533 
534     TRACE("Sending response to client");
535 
536     ret = write_sync(csock, buf, sizeof(buf));
537     if (ret < 0) {
538         return ret;
539     }
540 
541     if (ret != sizeof(buf)) {
542         LOG("writing to socket failed");
543         return -EINVAL;
544     }
545     return 0;
546 }
547 
548 #define MAX_NBD_REQUESTS 16
549 
550 void nbd_client_get(NBDClient *client)
551 {
552     client->refcount++;
553 }
554 
555 void nbd_client_put(NBDClient *client)
556 {
557     if (--client->refcount == 0) {
558         /* The last reference should be dropped by client->close,
559          * which is called by client_close.
560          */
561         assert(client->closing);
562 
563         nbd_unset_handlers(client);
564         close(client->sock);
565         client->sock = -1;
566         if (client->exp) {
567             QTAILQ_REMOVE(&client->exp->clients, client, next);
568             nbd_export_put(client->exp);
569         }
570         g_free(client);
571     }
572 }
573 
574 static void client_close(NBDClient *client)
575 {
576     if (client->closing) {
577         return;
578     }
579 
580     client->closing = true;
581 
582     /* Force requests to finish.  They will drop their own references,
583      * then we'll close the socket and free the NBDClient.
584      */
585     shutdown(client->sock, 2);
586 
587     /* Also tell the client, so that they release their reference.  */
588     if (client->close) {
589         client->close(client);
590     }
591 }
592 
593 static NBDRequest *nbd_request_get(NBDClient *client)
594 {
595     NBDRequest *req;
596 
597     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
598     client->nb_requests++;
599     nbd_update_can_read(client);
600 
601     req = g_new0(NBDRequest, 1);
602     nbd_client_get(client);
603     req->client = client;
604     return req;
605 }
606 
607 static void nbd_request_put(NBDRequest *req)
608 {
609     NBDClient *client = req->client;
610 
611     if (req->data) {
612         qemu_vfree(req->data);
613     }
614     g_free(req);
615 
616     client->nb_requests--;
617     nbd_update_can_read(client);
618     nbd_client_put(client);
619 }
620 
621 static void blk_aio_attached(AioContext *ctx, void *opaque)
622 {
623     NBDExport *exp = opaque;
624     NBDClient *client;
625 
626     TRACE("Export %s: Attaching clients to AIO context %p\n", exp->name, ctx);
627 
628     exp->ctx = ctx;
629 
630     QTAILQ_FOREACH(client, &exp->clients, next) {
631         nbd_set_handlers(client);
632     }
633 }
634 
635 static void blk_aio_detach(void *opaque)
636 {
637     NBDExport *exp = opaque;
638     NBDClient *client;
639 
640     TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx);
641 
642     QTAILQ_FOREACH(client, &exp->clients, next) {
643         nbd_unset_handlers(client);
644     }
645 
646     exp->ctx = NULL;
647 }
648 
649 static void nbd_eject_notifier(Notifier *n, void *data)
650 {
651     NBDExport *exp = container_of(n, NBDExport, eject_notifier);
652     nbd_export_close(exp);
653 }
654 
655 NBDExport *nbd_export_new(BlockBackend *blk, off_t dev_offset, off_t size,
656                           uint32_t nbdflags, void (*close)(NBDExport *),
657                           Error **errp)
658 {
659     NBDExport *exp = g_malloc0(sizeof(NBDExport));
660     exp->refcount = 1;
661     QTAILQ_INIT(&exp->clients);
662     exp->blk = blk;
663     exp->dev_offset = dev_offset;
664     exp->nbdflags = nbdflags;
665     exp->size = size < 0 ? blk_getlength(blk) : size;
666     if (exp->size < 0) {
667         error_setg_errno(errp, -exp->size,
668                          "Failed to determine the NBD export's length");
669         goto fail;
670     }
671     exp->size -= exp->size % BDRV_SECTOR_SIZE;
672 
673     exp->close = close;
674     exp->ctx = blk_get_aio_context(blk);
675     blk_ref(blk);
676     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
677 
678     exp->eject_notifier.notify = nbd_eject_notifier;
679     blk_add_remove_bs_notifier(blk, &exp->eject_notifier);
680 
681     /*
682      * NBD exports are used for non-shared storage migration.  Make sure
683      * that BDRV_O_INACTIVE is cleared and the image is ready for write
684      * access since the export could be available before migration handover.
685      */
686     aio_context_acquire(exp->ctx);
687     blk_invalidate_cache(blk, NULL);
688     aio_context_release(exp->ctx);
689     return exp;
690 
691 fail:
692     g_free(exp);
693     return NULL;
694 }
695 
696 NBDExport *nbd_export_find(const char *name)
697 {
698     NBDExport *exp;
699     QTAILQ_FOREACH(exp, &exports, next) {
700         if (strcmp(name, exp->name) == 0) {
701             return exp;
702         }
703     }
704 
705     return NULL;
706 }
707 
708 void nbd_export_set_name(NBDExport *exp, const char *name)
709 {
710     if (exp->name == name) {
711         return;
712     }
713 
714     nbd_export_get(exp);
715     if (exp->name != NULL) {
716         g_free(exp->name);
717         exp->name = NULL;
718         QTAILQ_REMOVE(&exports, exp, next);
719         nbd_export_put(exp);
720     }
721     if (name != NULL) {
722         nbd_export_get(exp);
723         exp->name = g_strdup(name);
724         QTAILQ_INSERT_TAIL(&exports, exp, next);
725     }
726     nbd_export_put(exp);
727 }
728 
729 void nbd_export_close(NBDExport *exp)
730 {
731     NBDClient *client, *next;
732 
733     nbd_export_get(exp);
734     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
735         client_close(client);
736     }
737     nbd_export_set_name(exp, NULL);
738     nbd_export_put(exp);
739 }
740 
741 void nbd_export_get(NBDExport *exp)
742 {
743     assert(exp->refcount > 0);
744     exp->refcount++;
745 }
746 
747 void nbd_export_put(NBDExport *exp)
748 {
749     assert(exp->refcount > 0);
750     if (exp->refcount == 1) {
751         nbd_export_close(exp);
752     }
753 
754     if (--exp->refcount == 0) {
755         assert(exp->name == NULL);
756 
757         if (exp->close) {
758             exp->close(exp);
759         }
760 
761         if (exp->blk) {
762             notifier_remove(&exp->eject_notifier);
763             blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
764                                             blk_aio_detach, exp);
765             blk_unref(exp->blk);
766             exp->blk = NULL;
767         }
768 
769         g_free(exp);
770     }
771 }
772 
773 BlockBackend *nbd_export_get_blockdev(NBDExport *exp)
774 {
775     return exp->blk;
776 }
777 
778 void nbd_export_close_all(void)
779 {
780     NBDExport *exp, *next;
781 
782     QTAILQ_FOREACH_SAFE(exp, &exports, next, next) {
783         nbd_export_close(exp);
784     }
785 }
786 
787 static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
788                                  int len)
789 {
790     NBDClient *client = req->client;
791     int csock = client->sock;
792     ssize_t rc, ret;
793 
794     qemu_co_mutex_lock(&client->send_lock);
795     client->send_coroutine = qemu_coroutine_self();
796     nbd_set_handlers(client);
797 
798     if (!len) {
799         rc = nbd_send_reply(csock, reply);
800     } else {
801         socket_set_cork(csock, 1);
802         rc = nbd_send_reply(csock, reply);
803         if (rc >= 0) {
804             ret = qemu_co_send(csock, req->data, len);
805             if (ret != len) {
806                 rc = -EIO;
807             }
808         }
809         socket_set_cork(csock, 0);
810     }
811 
812     client->send_coroutine = NULL;
813     nbd_set_handlers(client);
814     qemu_co_mutex_unlock(&client->send_lock);
815     return rc;
816 }
817 
818 static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
819 {
820     NBDClient *client = req->client;
821     int csock = client->sock;
822     uint32_t command;
823     ssize_t rc;
824 
825     client->recv_coroutine = qemu_coroutine_self();
826     nbd_update_can_read(client);
827 
828     rc = nbd_receive_request(csock, request);
829     if (rc < 0) {
830         if (rc != -EAGAIN) {
831             rc = -EIO;
832         }
833         goto out;
834     }
835 
836     if ((request->from + request->len) < request->from) {
837         LOG("integer overflow detected! "
838             "you're probably being attacked");
839         rc = -EINVAL;
840         goto out;
841     }
842 
843     TRACE("Decoding type");
844 
845     command = request->type & NBD_CMD_MASK_COMMAND;
846     if (command == NBD_CMD_READ || command == NBD_CMD_WRITE) {
847         if (request->len > NBD_MAX_BUFFER_SIZE) {
848             LOG("len (%u) is larger than max len (%u)",
849                 request->len, NBD_MAX_BUFFER_SIZE);
850             rc = -EINVAL;
851             goto out;
852         }
853 
854         req->data = blk_try_blockalign(client->exp->blk, request->len);
855         if (req->data == NULL) {
856             rc = -ENOMEM;
857             goto out;
858         }
859     }
860     if (command == NBD_CMD_WRITE) {
861         TRACE("Reading %u byte(s)", request->len);
862 
863         if (qemu_co_recv(csock, req->data, request->len) != request->len) {
864             LOG("reading from socket failed");
865             rc = -EIO;
866             goto out;
867         }
868     }
869     rc = 0;
870 
871 out:
872     client->recv_coroutine = NULL;
873     nbd_update_can_read(client);
874 
875     return rc;
876 }
877 
878 static void nbd_trip(void *opaque)
879 {
880     NBDClient *client = opaque;
881     NBDExport *exp = client->exp;
882     NBDRequest *req;
883     struct nbd_request request;
884     struct nbd_reply reply;
885     ssize_t ret;
886     uint32_t command;
887 
888     TRACE("Reading request.");
889     if (client->closing) {
890         return;
891     }
892 
893     req = nbd_request_get(client);
894     ret = nbd_co_receive_request(req, &request);
895     if (ret == -EAGAIN) {
896         goto done;
897     }
898     if (ret == -EIO) {
899         goto out;
900     }
901 
902     reply.handle = request.handle;
903     reply.error = 0;
904 
905     if (ret < 0) {
906         reply.error = -ret;
907         goto error_reply;
908     }
909     command = request.type & NBD_CMD_MASK_COMMAND;
910     if (command != NBD_CMD_DISC && (request.from + request.len) > exp->size) {
911             LOG("From: %" PRIu64 ", Len: %u, Size: %" PRIu64
912             ", Offset: %" PRIu64 "\n",
913                     request.from, request.len,
914                     (uint64_t)exp->size, (uint64_t)exp->dev_offset);
915         LOG("requested operation past EOF--bad client?");
916         goto invalid_request;
917     }
918 
919     if (client->closing) {
920         /*
921          * The client may be closed when we are blocked in
922          * nbd_co_receive_request()
923          */
924         goto done;
925     }
926 
927     switch (command) {
928     case NBD_CMD_READ:
929         TRACE("Request type is READ");
930 
931         if (request.type & NBD_CMD_FLAG_FUA) {
932             ret = blk_co_flush(exp->blk);
933             if (ret < 0) {
934                 LOG("flush failed");
935                 reply.error = -ret;
936                 goto error_reply;
937             }
938         }
939 
940         ret = blk_read(exp->blk,
941                        (request.from + exp->dev_offset) / BDRV_SECTOR_SIZE,
942                        req->data, request.len / BDRV_SECTOR_SIZE);
943         if (ret < 0) {
944             LOG("reading from file failed");
945             reply.error = -ret;
946             goto error_reply;
947         }
948 
949         TRACE("Read %u byte(s)", request.len);
950         if (nbd_co_send_reply(req, &reply, request.len) < 0)
951             goto out;
952         break;
953     case NBD_CMD_WRITE:
954         TRACE("Request type is WRITE");
955 
956         if (exp->nbdflags & NBD_FLAG_READ_ONLY) {
957             TRACE("Server is read-only, return error");
958             reply.error = EROFS;
959             goto error_reply;
960         }
961 
962         TRACE("Writing to device");
963 
964         ret = blk_write(exp->blk,
965                         (request.from + exp->dev_offset) / BDRV_SECTOR_SIZE,
966                         req->data, request.len / BDRV_SECTOR_SIZE);
967         if (ret < 0) {
968             LOG("writing to file failed");
969             reply.error = -ret;
970             goto error_reply;
971         }
972 
973         if (request.type & NBD_CMD_FLAG_FUA) {
974             ret = blk_co_flush(exp->blk);
975             if (ret < 0) {
976                 LOG("flush failed");
977                 reply.error = -ret;
978                 goto error_reply;
979             }
980         }
981 
982         if (nbd_co_send_reply(req, &reply, 0) < 0) {
983             goto out;
984         }
985         break;
986     case NBD_CMD_DISC:
987         TRACE("Request type is DISCONNECT");
988         errno = 0;
989         goto out;
990     case NBD_CMD_FLUSH:
991         TRACE("Request type is FLUSH");
992 
993         ret = blk_co_flush(exp->blk);
994         if (ret < 0) {
995             LOG("flush failed");
996             reply.error = -ret;
997         }
998         if (nbd_co_send_reply(req, &reply, 0) < 0) {
999             goto out;
1000         }
1001         break;
1002     case NBD_CMD_TRIM:
1003         TRACE("Request type is TRIM");
1004         ret = blk_co_discard(exp->blk, (request.from + exp->dev_offset)
1005                                        / BDRV_SECTOR_SIZE,
1006                              request.len / BDRV_SECTOR_SIZE);
1007         if (ret < 0) {
1008             LOG("discard failed");
1009             reply.error = -ret;
1010         }
1011         if (nbd_co_send_reply(req, &reply, 0) < 0) {
1012             goto out;
1013         }
1014         break;
1015     default:
1016         LOG("invalid request type (%u) received", request.type);
1017     invalid_request:
1018         reply.error = EINVAL;
1019     error_reply:
1020         if (nbd_co_send_reply(req, &reply, 0) < 0) {
1021             goto out;
1022         }
1023         break;
1024     }
1025 
1026     TRACE("Request/Reply complete");
1027 
1028 done:
1029     nbd_request_put(req);
1030     return;
1031 
1032 out:
1033     nbd_request_put(req);
1034     client_close(client);
1035 }
1036 
1037 static void nbd_read(void *opaque)
1038 {
1039     NBDClient *client = opaque;
1040 
1041     if (client->recv_coroutine) {
1042         qemu_coroutine_enter(client->recv_coroutine, NULL);
1043     } else {
1044         qemu_coroutine_enter(qemu_coroutine_create(nbd_trip), client);
1045     }
1046 }
1047 
1048 static void nbd_restart_write(void *opaque)
1049 {
1050     NBDClient *client = opaque;
1051 
1052     qemu_coroutine_enter(client->send_coroutine, NULL);
1053 }
1054 
1055 static void nbd_set_handlers(NBDClient *client)
1056 {
1057     if (client->exp && client->exp->ctx) {
1058         aio_set_fd_handler(client->exp->ctx, client->sock,
1059                            true,
1060                            client->can_read ? nbd_read : NULL,
1061                            client->send_coroutine ? nbd_restart_write : NULL,
1062                            client);
1063     }
1064 }
1065 
1066 static void nbd_unset_handlers(NBDClient *client)
1067 {
1068     if (client->exp && client->exp->ctx) {
1069         aio_set_fd_handler(client->exp->ctx, client->sock,
1070                            true, NULL, NULL, NULL);
1071     }
1072 }
1073 
1074 static void nbd_update_can_read(NBDClient *client)
1075 {
1076     bool can_read = client->recv_coroutine ||
1077                     client->nb_requests < MAX_NBD_REQUESTS;
1078 
1079     if (can_read != client->can_read) {
1080         client->can_read = can_read;
1081         nbd_set_handlers(client);
1082 
1083         /* There is no need to invoke aio_notify(), since aio_set_fd_handler()
1084          * in nbd_set_handlers() will have taken care of that */
1085     }
1086 }
1087 
1088 static coroutine_fn void nbd_co_client_start(void *opaque)
1089 {
1090     NBDClientNewData *data = opaque;
1091     NBDClient *client = data->client;
1092     NBDExport *exp = client->exp;
1093 
1094     if (exp) {
1095         nbd_export_get(exp);
1096     }
1097     if (nbd_negotiate(data)) {
1098         client_close(client);
1099         goto out;
1100     }
1101     qemu_co_mutex_init(&client->send_lock);
1102     nbd_set_handlers(client);
1103 
1104     if (exp) {
1105         QTAILQ_INSERT_TAIL(&exp->clients, client, next);
1106     }
1107 out:
1108     g_free(data);
1109 }
1110 
1111 void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
1112 {
1113     NBDClient *client;
1114     NBDClientNewData *data = g_new(NBDClientNewData, 1);
1115 
1116     client = g_malloc0(sizeof(NBDClient));
1117     client->refcount = 1;
1118     client->exp = exp;
1119     client->sock = csock;
1120     client->can_read = true;
1121     client->close = close_fn;
1122 
1123     data->client = client;
1124     data->co = qemu_coroutine_create(nbd_co_client_start);
1125     qemu_coroutine_enter(data->co, data);
1126 }
1127