xref: /openbmc/qemu/nbd/server.c (revision 42f7a448)
1 /*
2  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
3  *
4  *  Network Block Device Server Side
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; under version 2 of the License.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "nbd-internal.h"
20 
21 static int system_errno_to_nbd_errno(int err)
22 {
23     switch (err) {
24     case 0:
25         return NBD_SUCCESS;
26     case EPERM:
27         return NBD_EPERM;
28     case EIO:
29         return NBD_EIO;
30     case ENOMEM:
31         return NBD_ENOMEM;
32 #ifdef EDQUOT
33     case EDQUOT:
34 #endif
35     case EFBIG:
36     case ENOSPC:
37         return NBD_ENOSPC;
38     case EINVAL:
39     default:
40         return NBD_EINVAL;
41     }
42 }
43 
44 /* Definitions for opaque data types */
45 
46 typedef struct NBDRequest NBDRequest;
47 
48 struct NBDRequest {
49     QSIMPLEQ_ENTRY(NBDRequest) entry;
50     NBDClient *client;
51     uint8_t *data;
52 };
53 
54 struct NBDExport {
55     int refcount;
56     void (*close)(NBDExport *exp);
57 
58     BlockBackend *blk;
59     char *name;
60     off_t dev_offset;
61     off_t size;
62     uint32_t nbdflags;
63     QTAILQ_HEAD(, NBDClient) clients;
64     QTAILQ_ENTRY(NBDExport) next;
65 
66     AioContext *ctx;
67 };
68 
69 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
70 
71 struct NBDClient {
72     int refcount;
73     void (*close)(NBDClient *client);
74 
75     NBDExport *exp;
76     int sock;
77 
78     Coroutine *recv_coroutine;
79 
80     CoMutex send_lock;
81     Coroutine *send_coroutine;
82 
83     bool can_read;
84 
85     QTAILQ_ENTRY(NBDClient) next;
86     int nb_requests;
87     bool closing;
88 };
89 
90 /* That's all folks */
91 
92 static void nbd_set_handlers(NBDClient *client);
93 static void nbd_unset_handlers(NBDClient *client);
94 static void nbd_update_can_read(NBDClient *client);
95 
96 static void nbd_negotiate_continue(void *opaque)
97 {
98     qemu_coroutine_enter(opaque, NULL);
99 }
100 
101 static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
102 {
103     ssize_t ret;
104 
105     assert(qemu_in_coroutine());
106     /* Negotiation are always in main loop. */
107     qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
108                         qemu_coroutine_self());
109     ret = read_sync(fd, buffer, size);
110     qemu_set_fd_handler(fd, NULL, NULL, NULL);
111     return ret;
112 
113 }
114 
115 static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
116 {
117     ssize_t ret;
118 
119     assert(qemu_in_coroutine());
120     /* Negotiation are always in main loop. */
121     qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
122                         qemu_coroutine_self());
123     ret = write_sync(fd, buffer, size);
124     qemu_set_fd_handler(fd, NULL, NULL, NULL);
125     return ret;
126 }
127 
128 static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
129 {
130     ssize_t ret, dropped = size;
131     uint8_t *buffer = g_malloc(MIN(65536, size));
132 
133     while (size > 0) {
134         ret = nbd_negotiate_read(fd, buffer, MIN(65536, size));
135         if (ret < 0) {
136             g_free(buffer);
137             return ret;
138         }
139 
140         assert(ret <= size);
141         size -= ret;
142     }
143 
144     g_free(buffer);
145     return dropped;
146 }
147 
148 /* Basic flow for negotiation
149 
150    Server         Client
151    Negotiate
152 
153    or
154 
155    Server         Client
156    Negotiate #1
157                   Option
158    Negotiate #2
159 
160    ----
161 
162    followed by
163 
164    Server         Client
165                   Request
166    Response
167                   Request
168    Response
169                   ...
170    ...
171                   Request (type == 2)
172 
173 */
174 
175 static int nbd_negotiate_send_rep(int csock, uint32_t type, uint32_t opt)
176 {
177     uint64_t magic;
178     uint32_t len;
179 
180     magic = cpu_to_be64(NBD_REP_MAGIC);
181     if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
182         LOG("write failed (rep magic)");
183         return -EINVAL;
184     }
185     opt = cpu_to_be32(opt);
186     if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
187         LOG("write failed (rep opt)");
188         return -EINVAL;
189     }
190     type = cpu_to_be32(type);
191     if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
192         LOG("write failed (rep type)");
193         return -EINVAL;
194     }
195     len = cpu_to_be32(0);
196     if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
197         LOG("write failed (rep data length)");
198         return -EINVAL;
199     }
200     return 0;
201 }
202 
203 static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
204 {
205     uint64_t magic, name_len;
206     uint32_t opt, type, len;
207 
208     name_len = strlen(exp->name);
209     magic = cpu_to_be64(NBD_REP_MAGIC);
210     if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
211         LOG("write failed (magic)");
212         return -EINVAL;
213      }
214     opt = cpu_to_be32(NBD_OPT_LIST);
215     if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
216         LOG("write failed (opt)");
217         return -EINVAL;
218     }
219     type = cpu_to_be32(NBD_REP_SERVER);
220     if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
221         LOG("write failed (reply type)");
222         return -EINVAL;
223     }
224     len = cpu_to_be32(name_len + sizeof(len));
225     if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
226         LOG("write failed (length)");
227         return -EINVAL;
228     }
229     len = cpu_to_be32(name_len);
230     if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
231         LOG("write failed (length)");
232         return -EINVAL;
233     }
234     if (nbd_negotiate_write(csock, exp->name, name_len) != name_len) {
235         LOG("write failed (buffer)");
236         return -EINVAL;
237     }
238     return 0;
239 }
240 
241 static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
242 {
243     int csock;
244     NBDExport *exp;
245 
246     csock = client->sock;
247     if (length) {
248         if (nbd_negotiate_drop_sync(csock, length) != length) {
249             return -EIO;
250         }
251         return nbd_negotiate_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
252     }
253 
254     /* For each export, send a NBD_REP_SERVER reply. */
255     QTAILQ_FOREACH(exp, &exports, next) {
256         if (nbd_negotiate_send_rep_list(csock, exp)) {
257             return -EINVAL;
258         }
259     }
260     /* Finish with a NBD_REP_ACK. */
261     return nbd_negotiate_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
262 }
263 
264 static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
265 {
266     int rc = -EINVAL, csock = client->sock;
267     char name[256];
268 
269     /* Client sends:
270         [20 ..  xx]   export name (length bytes)
271      */
272     TRACE("Checking length");
273     if (length > 255) {
274         LOG("Bad length received");
275         goto fail;
276     }
277     if (nbd_negotiate_read(csock, name, length) != length) {
278         LOG("read failed");
279         goto fail;
280     }
281     name[length] = '\0';
282 
283     client->exp = nbd_export_find(name);
284     if (!client->exp) {
285         LOG("export not found");
286         goto fail;
287     }
288 
289     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
290     nbd_export_get(client->exp);
291     rc = 0;
292 fail:
293     return rc;
294 }
295 
296 static int nbd_negotiate_options(NBDClient *client)
297 {
298     int csock = client->sock;
299     uint32_t flags;
300 
301     /* Client sends:
302         [ 0 ..   3]   client flags
303 
304         [ 0 ..   7]   NBD_OPTS_MAGIC
305         [ 8 ..  11]   NBD option
306         [12 ..  15]   Data length
307         ...           Rest of request
308 
309         [ 0 ..   7]   NBD_OPTS_MAGIC
310         [ 8 ..  11]   Second NBD option
311         [12 ..  15]   Data length
312         ...           Rest of request
313     */
314 
315     if (nbd_negotiate_read(csock, &flags, sizeof(flags)) != sizeof(flags)) {
316         LOG("read failed");
317         return -EIO;
318     }
319     TRACE("Checking client flags");
320     be32_to_cpus(&flags);
321     if (flags != 0 && flags != NBD_FLAG_C_FIXED_NEWSTYLE) {
322         LOG("Bad client flags received");
323         return -EIO;
324     }
325 
326     while (1) {
327         int ret;
328         uint32_t tmp, length;
329         uint64_t magic;
330 
331         if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
332             LOG("read failed");
333             return -EINVAL;
334         }
335         TRACE("Checking opts magic");
336         if (magic != be64_to_cpu(NBD_OPTS_MAGIC)) {
337             LOG("Bad magic received");
338             return -EINVAL;
339         }
340 
341         if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
342             LOG("read failed");
343             return -EINVAL;
344         }
345 
346         if (nbd_negotiate_read(csock, &length,
347                                sizeof(length)) != sizeof(length)) {
348             LOG("read failed");
349             return -EINVAL;
350         }
351         length = be32_to_cpu(length);
352 
353         TRACE("Checking option");
354         switch (be32_to_cpu(tmp)) {
355         case NBD_OPT_LIST:
356             ret = nbd_negotiate_handle_list(client, length);
357             if (ret < 0) {
358                 return ret;
359             }
360             break;
361 
362         case NBD_OPT_ABORT:
363             return -EINVAL;
364 
365         case NBD_OPT_EXPORT_NAME:
366             return nbd_negotiate_handle_export_name(client, length);
367 
368         default:
369             tmp = be32_to_cpu(tmp);
370             LOG("Unsupported option 0x%x", tmp);
371             nbd_negotiate_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
372             return -EINVAL;
373         }
374     }
375 }
376 
377 typedef struct {
378     NBDClient *client;
379     Coroutine *co;
380 } NBDClientNewData;
381 
382 static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
383 {
384     NBDClient *client = data->client;
385     int csock = client->sock;
386     char buf[8 + 8 + 8 + 128];
387     int rc;
388     const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
389                          NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA);
390 
391     /* Negotiation header without options:
392         [ 0 ..   7]   passwd       ("NBDMAGIC")
393         [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
394         [16 ..  23]   size
395         [24 ..  25]   server flags (0)
396         [26 ..  27]   export flags
397         [28 .. 151]   reserved     (0)
398 
399        Negotiation header with options, part 1:
400         [ 0 ..   7]   passwd       ("NBDMAGIC")
401         [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
402         [16 ..  17]   server flags (0)
403 
404        part 2 (after options are sent):
405         [18 ..  25]   size
406         [26 ..  27]   export flags
407         [28 .. 151]   reserved     (0)
408      */
409 
410     rc = -EINVAL;
411 
412     TRACE("Beginning negotiation.");
413     memset(buf, 0, sizeof(buf));
414     memcpy(buf, "NBDMAGIC", 8);
415     if (client->exp) {
416         assert ((client->exp->nbdflags & ~65535) == 0);
417         cpu_to_be64w((uint64_t*)(buf + 8), NBD_CLIENT_MAGIC);
418         cpu_to_be64w((uint64_t*)(buf + 16), client->exp->size);
419         cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
420     } else {
421         cpu_to_be64w((uint64_t*)(buf + 8), NBD_OPTS_MAGIC);
422         cpu_to_be16w((uint16_t *)(buf + 16), NBD_FLAG_FIXED_NEWSTYLE);
423     }
424 
425     if (client->exp) {
426         if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
427             LOG("write failed");
428             goto fail;
429         }
430     } else {
431         if (nbd_negotiate_write(csock, buf, 18) != 18) {
432             LOG("write failed");
433             goto fail;
434         }
435         rc = nbd_negotiate_options(client);
436         if (rc != 0) {
437             LOG("option negotiation failed");
438             goto fail;
439         }
440 
441         assert ((client->exp->nbdflags & ~65535) == 0);
442         cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
443         cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
444         if (nbd_negotiate_write(csock, buf + 18,
445                                 sizeof(buf) - 18) != sizeof(buf) - 18) {
446             LOG("write failed");
447             goto fail;
448         }
449     }
450 
451     TRACE("Negotiation succeeded.");
452     rc = 0;
453 fail:
454     return rc;
455 }
456 
457 #ifdef __linux__
458 
459 int nbd_disconnect(int fd)
460 {
461     ioctl(fd, NBD_CLEAR_QUE);
462     ioctl(fd, NBD_DISCONNECT);
463     ioctl(fd, NBD_CLEAR_SOCK);
464     return 0;
465 }
466 
467 #else
468 
469 int nbd_disconnect(int fd)
470 {
471     return -ENOTSUP;
472 }
473 #endif
474 
475 static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
476 {
477     uint8_t buf[NBD_REQUEST_SIZE];
478     uint32_t magic;
479     ssize_t ret;
480 
481     ret = read_sync(csock, buf, sizeof(buf));
482     if (ret < 0) {
483         return ret;
484     }
485 
486     if (ret != sizeof(buf)) {
487         LOG("read failed");
488         return -EINVAL;
489     }
490 
491     /* Request
492        [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
493        [ 4 ..  7]   type    (0 == READ, 1 == WRITE)
494        [ 8 .. 15]   handle
495        [16 .. 23]   from
496        [24 .. 27]   len
497      */
498 
499     magic = be32_to_cpup((uint32_t*)buf);
500     request->type  = be32_to_cpup((uint32_t*)(buf + 4));
501     request->handle = be64_to_cpup((uint64_t*)(buf + 8));
502     request->from  = be64_to_cpup((uint64_t*)(buf + 16));
503     request->len   = be32_to_cpup((uint32_t*)(buf + 24));
504 
505     TRACE("Got request: "
506           "{ magic = 0x%x, .type = %d, from = %" PRIu64" , len = %u }",
507           magic, request->type, request->from, request->len);
508 
509     if (magic != NBD_REQUEST_MAGIC) {
510         LOG("invalid magic (got 0x%x)", magic);
511         return -EINVAL;
512     }
513     return 0;
514 }
515 
516 static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
517 {
518     uint8_t buf[NBD_REPLY_SIZE];
519     ssize_t ret;
520 
521     reply->error = system_errno_to_nbd_errno(reply->error);
522 
523     /* Reply
524        [ 0 ..  3]    magic   (NBD_REPLY_MAGIC)
525        [ 4 ..  7]    error   (0 == no error)
526        [ 7 .. 15]    handle
527      */
528     cpu_to_be32w((uint32_t*)buf, NBD_REPLY_MAGIC);
529     cpu_to_be32w((uint32_t*)(buf + 4), reply->error);
530     cpu_to_be64w((uint64_t*)(buf + 8), reply->handle);
531 
532     TRACE("Sending response to client");
533 
534     ret = write_sync(csock, buf, sizeof(buf));
535     if (ret < 0) {
536         return ret;
537     }
538 
539     if (ret != sizeof(buf)) {
540         LOG("writing to socket failed");
541         return -EINVAL;
542     }
543     return 0;
544 }
545 
546 #define MAX_NBD_REQUESTS 16
547 
548 void nbd_client_get(NBDClient *client)
549 {
550     client->refcount++;
551 }
552 
553 void nbd_client_put(NBDClient *client)
554 {
555     if (--client->refcount == 0) {
556         /* The last reference should be dropped by client->close,
557          * which is called by client_close.
558          */
559         assert(client->closing);
560 
561         nbd_unset_handlers(client);
562         close(client->sock);
563         client->sock = -1;
564         if (client->exp) {
565             QTAILQ_REMOVE(&client->exp->clients, client, next);
566             nbd_export_put(client->exp);
567         }
568         g_free(client);
569     }
570 }
571 
572 static void client_close(NBDClient *client)
573 {
574     if (client->closing) {
575         return;
576     }
577 
578     client->closing = true;
579 
580     /* Force requests to finish.  They will drop their own references,
581      * then we'll close the socket and free the NBDClient.
582      */
583     shutdown(client->sock, 2);
584 
585     /* Also tell the client, so that they release their reference.  */
586     if (client->close) {
587         client->close(client);
588     }
589 }
590 
591 static NBDRequest *nbd_request_get(NBDClient *client)
592 {
593     NBDRequest *req;
594 
595     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
596     client->nb_requests++;
597     nbd_update_can_read(client);
598 
599     req = g_new0(NBDRequest, 1);
600     nbd_client_get(client);
601     req->client = client;
602     return req;
603 }
604 
605 static void nbd_request_put(NBDRequest *req)
606 {
607     NBDClient *client = req->client;
608 
609     if (req->data) {
610         qemu_vfree(req->data);
611     }
612     g_free(req);
613 
614     client->nb_requests--;
615     nbd_update_can_read(client);
616     nbd_client_put(client);
617 }
618 
619 static void blk_aio_attached(AioContext *ctx, void *opaque)
620 {
621     NBDExport *exp = opaque;
622     NBDClient *client;
623 
624     TRACE("Export %s: Attaching clients to AIO context %p\n", exp->name, ctx);
625 
626     exp->ctx = ctx;
627 
628     QTAILQ_FOREACH(client, &exp->clients, next) {
629         nbd_set_handlers(client);
630     }
631 }
632 
633 static void blk_aio_detach(void *opaque)
634 {
635     NBDExport *exp = opaque;
636     NBDClient *client;
637 
638     TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx);
639 
640     QTAILQ_FOREACH(client, &exp->clients, next) {
641         nbd_unset_handlers(client);
642     }
643 
644     exp->ctx = NULL;
645 }
646 
647 NBDExport *nbd_export_new(BlockBackend *blk, off_t dev_offset, off_t size,
648                           uint32_t nbdflags, void (*close)(NBDExport *),
649                           Error **errp)
650 {
651     NBDExport *exp = g_malloc0(sizeof(NBDExport));
652     exp->refcount = 1;
653     QTAILQ_INIT(&exp->clients);
654     exp->blk = blk;
655     exp->dev_offset = dev_offset;
656     exp->nbdflags = nbdflags;
657     exp->size = size < 0 ? blk_getlength(blk) : size;
658     if (exp->size < 0) {
659         error_setg_errno(errp, -exp->size,
660                          "Failed to determine the NBD export's length");
661         goto fail;
662     }
663     exp->size -= exp->size % BDRV_SECTOR_SIZE;
664 
665     exp->close = close;
666     exp->ctx = blk_get_aio_context(blk);
667     blk_ref(blk);
668     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
669     /*
670      * NBD exports are used for non-shared storage migration.  Make sure
671      * that BDRV_O_INACTIVE is cleared and the image is ready for write
672      * access since the export could be available before migration handover.
673      */
674     aio_context_acquire(exp->ctx);
675     blk_invalidate_cache(blk, NULL);
676     aio_context_release(exp->ctx);
677     return exp;
678 
679 fail:
680     g_free(exp);
681     return NULL;
682 }
683 
684 NBDExport *nbd_export_find(const char *name)
685 {
686     NBDExport *exp;
687     QTAILQ_FOREACH(exp, &exports, next) {
688         if (strcmp(name, exp->name) == 0) {
689             return exp;
690         }
691     }
692 
693     return NULL;
694 }
695 
696 void nbd_export_set_name(NBDExport *exp, const char *name)
697 {
698     if (exp->name == name) {
699         return;
700     }
701 
702     nbd_export_get(exp);
703     if (exp->name != NULL) {
704         g_free(exp->name);
705         exp->name = NULL;
706         QTAILQ_REMOVE(&exports, exp, next);
707         nbd_export_put(exp);
708     }
709     if (name != NULL) {
710         nbd_export_get(exp);
711         exp->name = g_strdup(name);
712         QTAILQ_INSERT_TAIL(&exports, exp, next);
713     }
714     nbd_export_put(exp);
715 }
716 
717 void nbd_export_close(NBDExport *exp)
718 {
719     NBDClient *client, *next;
720 
721     nbd_export_get(exp);
722     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
723         client_close(client);
724     }
725     nbd_export_set_name(exp, NULL);
726     nbd_export_put(exp);
727 }
728 
729 void nbd_export_get(NBDExport *exp)
730 {
731     assert(exp->refcount > 0);
732     exp->refcount++;
733 }
734 
735 void nbd_export_put(NBDExport *exp)
736 {
737     assert(exp->refcount > 0);
738     if (exp->refcount == 1) {
739         nbd_export_close(exp);
740     }
741 
742     if (--exp->refcount == 0) {
743         assert(exp->name == NULL);
744 
745         if (exp->close) {
746             exp->close(exp);
747         }
748 
749         if (exp->blk) {
750             blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
751                                             blk_aio_detach, exp);
752             blk_unref(exp->blk);
753             exp->blk = NULL;
754         }
755 
756         g_free(exp);
757     }
758 }
759 
760 BlockBackend *nbd_export_get_blockdev(NBDExport *exp)
761 {
762     return exp->blk;
763 }
764 
765 void nbd_export_close_all(void)
766 {
767     NBDExport *exp, *next;
768 
769     QTAILQ_FOREACH_SAFE(exp, &exports, next, next) {
770         nbd_export_close(exp);
771     }
772 }
773 
774 static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
775                                  int len)
776 {
777     NBDClient *client = req->client;
778     int csock = client->sock;
779     ssize_t rc, ret;
780 
781     qemu_co_mutex_lock(&client->send_lock);
782     client->send_coroutine = qemu_coroutine_self();
783     nbd_set_handlers(client);
784 
785     if (!len) {
786         rc = nbd_send_reply(csock, reply);
787     } else {
788         socket_set_cork(csock, 1);
789         rc = nbd_send_reply(csock, reply);
790         if (rc >= 0) {
791             ret = qemu_co_send(csock, req->data, len);
792             if (ret != len) {
793                 rc = -EIO;
794             }
795         }
796         socket_set_cork(csock, 0);
797     }
798 
799     client->send_coroutine = NULL;
800     nbd_set_handlers(client);
801     qemu_co_mutex_unlock(&client->send_lock);
802     return rc;
803 }
804 
805 static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
806 {
807     NBDClient *client = req->client;
808     int csock = client->sock;
809     uint32_t command;
810     ssize_t rc;
811 
812     client->recv_coroutine = qemu_coroutine_self();
813     nbd_update_can_read(client);
814 
815     rc = nbd_receive_request(csock, request);
816     if (rc < 0) {
817         if (rc != -EAGAIN) {
818             rc = -EIO;
819         }
820         goto out;
821     }
822 
823     if ((request->from + request->len) < request->from) {
824         LOG("integer overflow detected! "
825             "you're probably being attacked");
826         rc = -EINVAL;
827         goto out;
828     }
829 
830     TRACE("Decoding type");
831 
832     command = request->type & NBD_CMD_MASK_COMMAND;
833     if (command == NBD_CMD_READ || command == NBD_CMD_WRITE) {
834         if (request->len > NBD_MAX_BUFFER_SIZE) {
835             LOG("len (%u) is larger than max len (%u)",
836                 request->len, NBD_MAX_BUFFER_SIZE);
837             rc = -EINVAL;
838             goto out;
839         }
840 
841         req->data = blk_try_blockalign(client->exp->blk, request->len);
842         if (req->data == NULL) {
843             rc = -ENOMEM;
844             goto out;
845         }
846     }
847     if (command == NBD_CMD_WRITE) {
848         TRACE("Reading %u byte(s)", request->len);
849 
850         if (qemu_co_recv(csock, req->data, request->len) != request->len) {
851             LOG("reading from socket failed");
852             rc = -EIO;
853             goto out;
854         }
855     }
856     rc = 0;
857 
858 out:
859     client->recv_coroutine = NULL;
860     nbd_update_can_read(client);
861 
862     return rc;
863 }
864 
865 static void nbd_trip(void *opaque)
866 {
867     NBDClient *client = opaque;
868     NBDExport *exp = client->exp;
869     NBDRequest *req;
870     struct nbd_request request;
871     struct nbd_reply reply;
872     ssize_t ret;
873     uint32_t command;
874 
875     TRACE("Reading request.");
876     if (client->closing) {
877         return;
878     }
879 
880     req = nbd_request_get(client);
881     ret = nbd_co_receive_request(req, &request);
882     if (ret == -EAGAIN) {
883         goto done;
884     }
885     if (ret == -EIO) {
886         goto out;
887     }
888 
889     reply.handle = request.handle;
890     reply.error = 0;
891 
892     if (ret < 0) {
893         reply.error = -ret;
894         goto error_reply;
895     }
896     command = request.type & NBD_CMD_MASK_COMMAND;
897     if (command != NBD_CMD_DISC && (request.from + request.len) > exp->size) {
898             LOG("From: %" PRIu64 ", Len: %u, Size: %" PRIu64
899             ", Offset: %" PRIu64 "\n",
900                     request.from, request.len,
901                     (uint64_t)exp->size, (uint64_t)exp->dev_offset);
902         LOG("requested operation past EOF--bad client?");
903         goto invalid_request;
904     }
905 
906     if (client->closing) {
907         /*
908          * The client may be closed when we are blocked in
909          * nbd_co_receive_request()
910          */
911         goto done;
912     }
913 
914     switch (command) {
915     case NBD_CMD_READ:
916         TRACE("Request type is READ");
917 
918         if (request.type & NBD_CMD_FLAG_FUA) {
919             ret = blk_co_flush(exp->blk);
920             if (ret < 0) {
921                 LOG("flush failed");
922                 reply.error = -ret;
923                 goto error_reply;
924             }
925         }
926 
927         ret = blk_read(exp->blk,
928                        (request.from + exp->dev_offset) / BDRV_SECTOR_SIZE,
929                        req->data, request.len / BDRV_SECTOR_SIZE);
930         if (ret < 0) {
931             LOG("reading from file failed");
932             reply.error = -ret;
933             goto error_reply;
934         }
935 
936         TRACE("Read %u byte(s)", request.len);
937         if (nbd_co_send_reply(req, &reply, request.len) < 0)
938             goto out;
939         break;
940     case NBD_CMD_WRITE:
941         TRACE("Request type is WRITE");
942 
943         if (exp->nbdflags & NBD_FLAG_READ_ONLY) {
944             TRACE("Server is read-only, return error");
945             reply.error = EROFS;
946             goto error_reply;
947         }
948 
949         TRACE("Writing to device");
950 
951         ret = blk_write(exp->blk,
952                         (request.from + exp->dev_offset) / BDRV_SECTOR_SIZE,
953                         req->data, request.len / BDRV_SECTOR_SIZE);
954         if (ret < 0) {
955             LOG("writing to file failed");
956             reply.error = -ret;
957             goto error_reply;
958         }
959 
960         if (request.type & NBD_CMD_FLAG_FUA) {
961             ret = blk_co_flush(exp->blk);
962             if (ret < 0) {
963                 LOG("flush failed");
964                 reply.error = -ret;
965                 goto error_reply;
966             }
967         }
968 
969         if (nbd_co_send_reply(req, &reply, 0) < 0) {
970             goto out;
971         }
972         break;
973     case NBD_CMD_DISC:
974         TRACE("Request type is DISCONNECT");
975         errno = 0;
976         goto out;
977     case NBD_CMD_FLUSH:
978         TRACE("Request type is FLUSH");
979 
980         ret = blk_co_flush(exp->blk);
981         if (ret < 0) {
982             LOG("flush failed");
983             reply.error = -ret;
984         }
985         if (nbd_co_send_reply(req, &reply, 0) < 0) {
986             goto out;
987         }
988         break;
989     case NBD_CMD_TRIM:
990         TRACE("Request type is TRIM");
991         ret = blk_co_discard(exp->blk, (request.from + exp->dev_offset)
992                                        / BDRV_SECTOR_SIZE,
993                              request.len / BDRV_SECTOR_SIZE);
994         if (ret < 0) {
995             LOG("discard failed");
996             reply.error = -ret;
997         }
998         if (nbd_co_send_reply(req, &reply, 0) < 0) {
999             goto out;
1000         }
1001         break;
1002     default:
1003         LOG("invalid request type (%u) received", request.type);
1004     invalid_request:
1005         reply.error = EINVAL;
1006     error_reply:
1007         if (nbd_co_send_reply(req, &reply, 0) < 0) {
1008             goto out;
1009         }
1010         break;
1011     }
1012 
1013     TRACE("Request/Reply complete");
1014 
1015 done:
1016     nbd_request_put(req);
1017     return;
1018 
1019 out:
1020     nbd_request_put(req);
1021     client_close(client);
1022 }
1023 
1024 static void nbd_read(void *opaque)
1025 {
1026     NBDClient *client = opaque;
1027 
1028     if (client->recv_coroutine) {
1029         qemu_coroutine_enter(client->recv_coroutine, NULL);
1030     } else {
1031         qemu_coroutine_enter(qemu_coroutine_create(nbd_trip), client);
1032     }
1033 }
1034 
1035 static void nbd_restart_write(void *opaque)
1036 {
1037     NBDClient *client = opaque;
1038 
1039     qemu_coroutine_enter(client->send_coroutine, NULL);
1040 }
1041 
1042 static void nbd_set_handlers(NBDClient *client)
1043 {
1044     if (client->exp && client->exp->ctx) {
1045         aio_set_fd_handler(client->exp->ctx, client->sock,
1046                            true,
1047                            client->can_read ? nbd_read : NULL,
1048                            client->send_coroutine ? nbd_restart_write : NULL,
1049                            client);
1050     }
1051 }
1052 
1053 static void nbd_unset_handlers(NBDClient *client)
1054 {
1055     if (client->exp && client->exp->ctx) {
1056         aio_set_fd_handler(client->exp->ctx, client->sock,
1057                            true, NULL, NULL, NULL);
1058     }
1059 }
1060 
1061 static void nbd_update_can_read(NBDClient *client)
1062 {
1063     bool can_read = client->recv_coroutine ||
1064                     client->nb_requests < MAX_NBD_REQUESTS;
1065 
1066     if (can_read != client->can_read) {
1067         client->can_read = can_read;
1068         nbd_set_handlers(client);
1069 
1070         /* There is no need to invoke aio_notify(), since aio_set_fd_handler()
1071          * in nbd_set_handlers() will have taken care of that */
1072     }
1073 }
1074 
1075 static coroutine_fn void nbd_co_client_start(void *opaque)
1076 {
1077     NBDClientNewData *data = opaque;
1078     NBDClient *client = data->client;
1079     NBDExport *exp = client->exp;
1080 
1081     if (exp) {
1082         nbd_export_get(exp);
1083     }
1084     if (nbd_negotiate(data)) {
1085         shutdown(client->sock, 2);
1086         client->close(client);
1087         goto out;
1088     }
1089     qemu_co_mutex_init(&client->send_lock);
1090     nbd_set_handlers(client);
1091 
1092     if (exp) {
1093         QTAILQ_INSERT_TAIL(&exp->clients, client, next);
1094     }
1095 out:
1096     g_free(data);
1097 }
1098 
1099 void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
1100 {
1101     NBDClient *client;
1102     NBDClientNewData *data = g_new(NBDClientNewData, 1);
1103 
1104     client = g_malloc0(sizeof(NBDClient));
1105     client->refcount = 1;
1106     client->exp = exp;
1107     client->sock = csock;
1108     client->can_read = true;
1109     client->close = close_fn;
1110 
1111     data->client = client;
1112     data->co = qemu_coroutine_create(nbd_co_client_start);
1113     qemu_coroutine_enter(data->co, data);
1114 }
1115