xref: /openbmc/qemu/nbd/server.c (revision d341d9f3)
1 /*
2  *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
3  *
4  *  Network Block Device Server Side
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; under version 2 of the License.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program; if not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "nbd-internal.h"
20 
21 static int system_errno_to_nbd_errno(int err)
22 {
23     switch (err) {
24     case 0:
25         return NBD_SUCCESS;
26     case EPERM:
27         return NBD_EPERM;
28     case EIO:
29         return NBD_EIO;
30     case ENOMEM:
31         return NBD_ENOMEM;
32 #ifdef EDQUOT
33     case EDQUOT:
34 #endif
35     case EFBIG:
36     case ENOSPC:
37         return NBD_ENOSPC;
38     case EINVAL:
39     default:
40         return NBD_EINVAL;
41     }
42 }
43 
44 /* Definitions for opaque data types */
45 
46 typedef struct NBDRequest NBDRequest;
47 
48 struct NBDRequest {
49     QSIMPLEQ_ENTRY(NBDRequest) entry;
50     NBDClient *client;
51     uint8_t *data;
52 };
53 
54 struct NBDExport {
55     int refcount;
56     void (*close)(NBDExport *exp);
57 
58     BlockBackend *blk;
59     char *name;
60     off_t dev_offset;
61     off_t size;
62     uint32_t nbdflags;
63     QTAILQ_HEAD(, NBDClient) clients;
64     QTAILQ_ENTRY(NBDExport) next;
65 
66     AioContext *ctx;
67 };
68 
69 static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
70 
71 struct NBDClient {
72     int refcount;
73     void (*close)(NBDClient *client);
74 
75     NBDExport *exp;
76     int sock;
77 
78     Coroutine *recv_coroutine;
79 
80     CoMutex send_lock;
81     Coroutine *send_coroutine;
82 
83     bool can_read;
84 
85     QTAILQ_ENTRY(NBDClient) next;
86     int nb_requests;
87     bool closing;
88 };
89 
90 /* That's all folks */
91 
92 static void nbd_set_handlers(NBDClient *client);
93 static void nbd_unset_handlers(NBDClient *client);
94 static void nbd_update_can_read(NBDClient *client);
95 
96 static void nbd_negotiate_continue(void *opaque)
97 {
98     qemu_coroutine_enter(opaque, NULL);
99 }
100 
101 static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
102 {
103     ssize_t ret;
104 
105     assert(qemu_in_coroutine());
106     /* Negotiation are always in main loop. */
107     qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
108                         qemu_coroutine_self());
109     ret = read_sync(fd, buffer, size);
110     qemu_set_fd_handler(fd, NULL, NULL, NULL);
111     return ret;
112 
113 }
114 
115 static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
116 {
117     ssize_t ret;
118 
119     assert(qemu_in_coroutine());
120     /* Negotiation are always in main loop. */
121     qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
122                         qemu_coroutine_self());
123     ret = write_sync(fd, buffer, size);
124     qemu_set_fd_handler(fd, NULL, NULL, NULL);
125     return ret;
126 }
127 
128 static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
129 {
130     ssize_t ret, dropped = size;
131     uint8_t *buffer = g_malloc(MIN(65536, size));
132 
133     while (size > 0) {
134         ret = nbd_negotiate_read(fd, buffer, MIN(65536, size));
135         if (ret < 0) {
136             g_free(buffer);
137             return ret;
138         }
139 
140         assert(ret <= size);
141         size -= ret;
142     }
143 
144     g_free(buffer);
145     return dropped;
146 }
147 
148 /* Basic flow for negotiation
149 
150    Server         Client
151    Negotiate
152 
153    or
154 
155    Server         Client
156    Negotiate #1
157                   Option
158    Negotiate #2
159 
160    ----
161 
162    followed by
163 
164    Server         Client
165                   Request
166    Response
167                   Request
168    Response
169                   ...
170    ...
171                   Request (type == 2)
172 
173 */
174 
175 static int nbd_negotiate_send_rep(int csock, uint32_t type, uint32_t opt)
176 {
177     uint64_t magic;
178     uint32_t len;
179 
180     magic = cpu_to_be64(NBD_REP_MAGIC);
181     if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
182         LOG("write failed (rep magic)");
183         return -EINVAL;
184     }
185     opt = cpu_to_be32(opt);
186     if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
187         LOG("write failed (rep opt)");
188         return -EINVAL;
189     }
190     type = cpu_to_be32(type);
191     if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
192         LOG("write failed (rep type)");
193         return -EINVAL;
194     }
195     len = cpu_to_be32(0);
196     if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
197         LOG("write failed (rep data length)");
198         return -EINVAL;
199     }
200     return 0;
201 }
202 
203 static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
204 {
205     uint64_t magic, name_len;
206     uint32_t opt, type, len;
207 
208     name_len = strlen(exp->name);
209     magic = cpu_to_be64(NBD_REP_MAGIC);
210     if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
211         LOG("write failed (magic)");
212         return -EINVAL;
213      }
214     opt = cpu_to_be32(NBD_OPT_LIST);
215     if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
216         LOG("write failed (opt)");
217         return -EINVAL;
218     }
219     type = cpu_to_be32(NBD_REP_SERVER);
220     if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
221         LOG("write failed (reply type)");
222         return -EINVAL;
223     }
224     len = cpu_to_be32(name_len + sizeof(len));
225     if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
226         LOG("write failed (length)");
227         return -EINVAL;
228     }
229     len = cpu_to_be32(name_len);
230     if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
231         LOG("write failed (length)");
232         return -EINVAL;
233     }
234     if (nbd_negotiate_write(csock, exp->name, name_len) != name_len) {
235         LOG("write failed (buffer)");
236         return -EINVAL;
237     }
238     return 0;
239 }
240 
241 static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
242 {
243     int csock;
244     NBDExport *exp;
245 
246     csock = client->sock;
247     if (length) {
248         if (nbd_negotiate_drop_sync(csock, length) != length) {
249             return -EIO;
250         }
251         return nbd_negotiate_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST);
252     }
253 
254     /* For each export, send a NBD_REP_SERVER reply. */
255     QTAILQ_FOREACH(exp, &exports, next) {
256         if (nbd_negotiate_send_rep_list(csock, exp)) {
257             return -EINVAL;
258         }
259     }
260     /* Finish with a NBD_REP_ACK. */
261     return nbd_negotiate_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
262 }
263 
264 static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
265 {
266     int rc = -EINVAL, csock = client->sock;
267     char name[256];
268 
269     /* Client sends:
270         [20 ..  xx]   export name (length bytes)
271      */
272     TRACE("Checking length");
273     if (length > 255) {
274         LOG("Bad length received");
275         goto fail;
276     }
277     if (nbd_negotiate_read(csock, name, length) != length) {
278         LOG("read failed");
279         goto fail;
280     }
281     name[length] = '\0';
282 
283     client->exp = nbd_export_find(name);
284     if (!client->exp) {
285         LOG("export not found");
286         goto fail;
287     }
288 
289     QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
290     nbd_export_get(client->exp);
291     rc = 0;
292 fail:
293     return rc;
294 }
295 
296 static int nbd_negotiate_options(NBDClient *client)
297 {
298     int csock = client->sock;
299     uint32_t flags;
300 
301     /* Client sends:
302         [ 0 ..   3]   client flags
303 
304         [ 0 ..   7]   NBD_OPTS_MAGIC
305         [ 8 ..  11]   NBD option
306         [12 ..  15]   Data length
307         ...           Rest of request
308 
309         [ 0 ..   7]   NBD_OPTS_MAGIC
310         [ 8 ..  11]   Second NBD option
311         [12 ..  15]   Data length
312         ...           Rest of request
313     */
314 
315     if (nbd_negotiate_read(csock, &flags, sizeof(flags)) != sizeof(flags)) {
316         LOG("read failed");
317         return -EIO;
318     }
319     TRACE("Checking client flags");
320     be32_to_cpus(&flags);
321     if (flags != 0 && flags != NBD_FLAG_C_FIXED_NEWSTYLE) {
322         LOG("Bad client flags received");
323         return -EIO;
324     }
325 
326     while (1) {
327         int ret;
328         uint32_t tmp, length;
329         uint64_t magic;
330 
331         if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
332             LOG("read failed");
333             return -EINVAL;
334         }
335         TRACE("Checking opts magic");
336         if (magic != be64_to_cpu(NBD_OPTS_MAGIC)) {
337             LOG("Bad magic received");
338             return -EINVAL;
339         }
340 
341         if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
342             LOG("read failed");
343             return -EINVAL;
344         }
345 
346         if (nbd_negotiate_read(csock, &length,
347                                sizeof(length)) != sizeof(length)) {
348             LOG("read failed");
349             return -EINVAL;
350         }
351         length = be32_to_cpu(length);
352 
353         TRACE("Checking option");
354         switch (be32_to_cpu(tmp)) {
355         case NBD_OPT_LIST:
356             ret = nbd_negotiate_handle_list(client, length);
357             if (ret < 0) {
358                 return ret;
359             }
360             break;
361 
362         case NBD_OPT_ABORT:
363             return -EINVAL;
364 
365         case NBD_OPT_EXPORT_NAME:
366             return nbd_negotiate_handle_export_name(client, length);
367 
368         default:
369             tmp = be32_to_cpu(tmp);
370             LOG("Unsupported option 0x%x", tmp);
371             nbd_negotiate_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
372             return -EINVAL;
373         }
374     }
375 }
376 
377 typedef struct {
378     NBDClient *client;
379     Coroutine *co;
380 } NBDClientNewData;
381 
382 static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
383 {
384     NBDClient *client = data->client;
385     int csock = client->sock;
386     char buf[8 + 8 + 8 + 128];
387     int rc;
388     const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
389                          NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA);
390 
391     /* Negotiation header without options:
392         [ 0 ..   7]   passwd       ("NBDMAGIC")
393         [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
394         [16 ..  23]   size
395         [24 ..  25]   server flags (0)
396         [26 ..  27]   export flags
397         [28 .. 151]   reserved     (0)
398 
399        Negotiation header with options, part 1:
400         [ 0 ..   7]   passwd       ("NBDMAGIC")
401         [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
402         [16 ..  17]   server flags (0)
403 
404        part 2 (after options are sent):
405         [18 ..  25]   size
406         [26 ..  27]   export flags
407         [28 .. 151]   reserved     (0)
408      */
409 
410     rc = -EINVAL;
411 
412     TRACE("Beginning negotiation.");
413     memset(buf, 0, sizeof(buf));
414     memcpy(buf, "NBDMAGIC", 8);
415     if (client->exp) {
416         assert ((client->exp->nbdflags & ~65535) == 0);
417         cpu_to_be64w((uint64_t*)(buf + 8), NBD_CLIENT_MAGIC);
418         cpu_to_be64w((uint64_t*)(buf + 16), client->exp->size);
419         cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
420     } else {
421         cpu_to_be64w((uint64_t*)(buf + 8), NBD_OPTS_MAGIC);
422         cpu_to_be16w((uint16_t *)(buf + 16), NBD_FLAG_FIXED_NEWSTYLE);
423     }
424 
425     if (client->exp) {
426         if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
427             LOG("write failed");
428             goto fail;
429         }
430     } else {
431         if (nbd_negotiate_write(csock, buf, 18) != 18) {
432             LOG("write failed");
433             goto fail;
434         }
435         rc = nbd_negotiate_options(client);
436         if (rc != 0) {
437             LOG("option negotiation failed");
438             goto fail;
439         }
440 
441         assert ((client->exp->nbdflags & ~65535) == 0);
442         cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
443         cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
444         if (nbd_negotiate_write(csock, buf + 18,
445                                 sizeof(buf) - 18) != sizeof(buf) - 18) {
446             LOG("write failed");
447             goto fail;
448         }
449     }
450 
451     TRACE("Negotiation succeeded.");
452     rc = 0;
453 fail:
454     return rc;
455 }
456 
457 #ifdef __linux__
458 
459 int nbd_disconnect(int fd)
460 {
461     ioctl(fd, NBD_CLEAR_QUE);
462     ioctl(fd, NBD_DISCONNECT);
463     ioctl(fd, NBD_CLEAR_SOCK);
464     return 0;
465 }
466 
467 #else
468 
469 int nbd_disconnect(int fd)
470 {
471     return -ENOTSUP;
472 }
473 #endif
474 
475 static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
476 {
477     uint8_t buf[NBD_REQUEST_SIZE];
478     uint32_t magic;
479     ssize_t ret;
480 
481     ret = read_sync(csock, buf, sizeof(buf));
482     if (ret < 0) {
483         return ret;
484     }
485 
486     if (ret != sizeof(buf)) {
487         LOG("read failed");
488         return -EINVAL;
489     }
490 
491     /* Request
492        [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
493        [ 4 ..  7]   type    (0 == READ, 1 == WRITE)
494        [ 8 .. 15]   handle
495        [16 .. 23]   from
496        [24 .. 27]   len
497      */
498 
499     magic = be32_to_cpup((uint32_t*)buf);
500     request->type  = be32_to_cpup((uint32_t*)(buf + 4));
501     request->handle = be64_to_cpup((uint64_t*)(buf + 8));
502     request->from  = be64_to_cpup((uint64_t*)(buf + 16));
503     request->len   = be32_to_cpup((uint32_t*)(buf + 24));
504 
505     TRACE("Got request: "
506           "{ magic = 0x%x, .type = %d, from = %" PRIu64" , len = %u }",
507           magic, request->type, request->from, request->len);
508 
509     if (magic != NBD_REQUEST_MAGIC) {
510         LOG("invalid magic (got 0x%x)", magic);
511         return -EINVAL;
512     }
513     return 0;
514 }
515 
516 static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
517 {
518     uint8_t buf[NBD_REPLY_SIZE];
519     ssize_t ret;
520 
521     reply->error = system_errno_to_nbd_errno(reply->error);
522 
523     /* Reply
524        [ 0 ..  3]    magic   (NBD_REPLY_MAGIC)
525        [ 4 ..  7]    error   (0 == no error)
526        [ 7 .. 15]    handle
527      */
528     cpu_to_be32w((uint32_t*)buf, NBD_REPLY_MAGIC);
529     cpu_to_be32w((uint32_t*)(buf + 4), reply->error);
530     cpu_to_be64w((uint64_t*)(buf + 8), reply->handle);
531 
532     TRACE("Sending response to client");
533 
534     ret = write_sync(csock, buf, sizeof(buf));
535     if (ret < 0) {
536         return ret;
537     }
538 
539     if (ret != sizeof(buf)) {
540         LOG("writing to socket failed");
541         return -EINVAL;
542     }
543     return 0;
544 }
545 
546 #define MAX_NBD_REQUESTS 16
547 
548 void nbd_client_get(NBDClient *client)
549 {
550     client->refcount++;
551 }
552 
553 void nbd_client_put(NBDClient *client)
554 {
555     if (--client->refcount == 0) {
556         /* The last reference should be dropped by client->close,
557          * which is called by client_close.
558          */
559         assert(client->closing);
560 
561         nbd_unset_handlers(client);
562         close(client->sock);
563         client->sock = -1;
564         if (client->exp) {
565             QTAILQ_REMOVE(&client->exp->clients, client, next);
566             nbd_export_put(client->exp);
567         }
568         g_free(client);
569     }
570 }
571 
572 static void client_close(NBDClient *client)
573 {
574     if (client->closing) {
575         return;
576     }
577 
578     client->closing = true;
579 
580     /* Force requests to finish.  They will drop their own references,
581      * then we'll close the socket and free the NBDClient.
582      */
583     shutdown(client->sock, 2);
584 
585     /* Also tell the client, so that they release their reference.  */
586     if (client->close) {
587         client->close(client);
588     }
589 }
590 
591 static NBDRequest *nbd_request_get(NBDClient *client)
592 {
593     NBDRequest *req;
594 
595     assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
596     client->nb_requests++;
597     nbd_update_can_read(client);
598 
599     req = g_new0(NBDRequest, 1);
600     nbd_client_get(client);
601     req->client = client;
602     return req;
603 }
604 
605 static void nbd_request_put(NBDRequest *req)
606 {
607     NBDClient *client = req->client;
608 
609     if (req->data) {
610         qemu_vfree(req->data);
611     }
612     g_free(req);
613 
614     client->nb_requests--;
615     nbd_update_can_read(client);
616     nbd_client_put(client);
617 }
618 
619 static void blk_aio_attached(AioContext *ctx, void *opaque)
620 {
621     NBDExport *exp = opaque;
622     NBDClient *client;
623 
624     TRACE("Export %s: Attaching clients to AIO context %p\n", exp->name, ctx);
625 
626     exp->ctx = ctx;
627 
628     QTAILQ_FOREACH(client, &exp->clients, next) {
629         nbd_set_handlers(client);
630     }
631 }
632 
633 static void blk_aio_detach(void *opaque)
634 {
635     NBDExport *exp = opaque;
636     NBDClient *client;
637 
638     TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name, exp->ctx);
639 
640     QTAILQ_FOREACH(client, &exp->clients, next) {
641         nbd_unset_handlers(client);
642     }
643 
644     exp->ctx = NULL;
645 }
646 
647 NBDExport *nbd_export_new(BlockBackend *blk, off_t dev_offset, off_t size,
648                           uint32_t nbdflags, void (*close)(NBDExport *),
649                           Error **errp)
650 {
651     NBDExport *exp = g_malloc0(sizeof(NBDExport));
652     exp->refcount = 1;
653     QTAILQ_INIT(&exp->clients);
654     exp->blk = blk;
655     exp->dev_offset = dev_offset;
656     exp->nbdflags = nbdflags;
657     exp->size = size < 0 ? blk_getlength(blk) : size;
658     if (exp->size < 0) {
659         error_setg_errno(errp, -exp->size,
660                          "Failed to determine the NBD export's length");
661         goto fail;
662     }
663     exp->size -= exp->size % BDRV_SECTOR_SIZE;
664 
665     exp->close = close;
666     exp->ctx = blk_get_aio_context(blk);
667     blk_ref(blk);
668     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
669     /*
670      * NBD exports are used for non-shared storage migration.  Make sure
671      * that BDRV_O_INACTIVE is cleared and the image is ready for write
672      * access since the export could be available before migration handover.
673      */
674     blk_invalidate_cache(blk, NULL);
675     return exp;
676 
677 fail:
678     g_free(exp);
679     return NULL;
680 }
681 
682 NBDExport *nbd_export_find(const char *name)
683 {
684     NBDExport *exp;
685     QTAILQ_FOREACH(exp, &exports, next) {
686         if (strcmp(name, exp->name) == 0) {
687             return exp;
688         }
689     }
690 
691     return NULL;
692 }
693 
694 void nbd_export_set_name(NBDExport *exp, const char *name)
695 {
696     if (exp->name == name) {
697         return;
698     }
699 
700     nbd_export_get(exp);
701     if (exp->name != NULL) {
702         g_free(exp->name);
703         exp->name = NULL;
704         QTAILQ_REMOVE(&exports, exp, next);
705         nbd_export_put(exp);
706     }
707     if (name != NULL) {
708         nbd_export_get(exp);
709         exp->name = g_strdup(name);
710         QTAILQ_INSERT_TAIL(&exports, exp, next);
711     }
712     nbd_export_put(exp);
713 }
714 
715 void nbd_export_close(NBDExport *exp)
716 {
717     NBDClient *client, *next;
718 
719     nbd_export_get(exp);
720     QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
721         client_close(client);
722     }
723     nbd_export_set_name(exp, NULL);
724     nbd_export_put(exp);
725 }
726 
727 void nbd_export_get(NBDExport *exp)
728 {
729     assert(exp->refcount > 0);
730     exp->refcount++;
731 }
732 
733 void nbd_export_put(NBDExport *exp)
734 {
735     assert(exp->refcount > 0);
736     if (exp->refcount == 1) {
737         nbd_export_close(exp);
738     }
739 
740     if (--exp->refcount == 0) {
741         assert(exp->name == NULL);
742 
743         if (exp->close) {
744             exp->close(exp);
745         }
746 
747         if (exp->blk) {
748             blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
749                                             blk_aio_detach, exp);
750             blk_unref(exp->blk);
751             exp->blk = NULL;
752         }
753 
754         g_free(exp);
755     }
756 }
757 
758 BlockBackend *nbd_export_get_blockdev(NBDExport *exp)
759 {
760     return exp->blk;
761 }
762 
763 void nbd_export_close_all(void)
764 {
765     NBDExport *exp, *next;
766 
767     QTAILQ_FOREACH_SAFE(exp, &exports, next, next) {
768         nbd_export_close(exp);
769     }
770 }
771 
772 static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply,
773                                  int len)
774 {
775     NBDClient *client = req->client;
776     int csock = client->sock;
777     ssize_t rc, ret;
778 
779     qemu_co_mutex_lock(&client->send_lock);
780     client->send_coroutine = qemu_coroutine_self();
781     nbd_set_handlers(client);
782 
783     if (!len) {
784         rc = nbd_send_reply(csock, reply);
785     } else {
786         socket_set_cork(csock, 1);
787         rc = nbd_send_reply(csock, reply);
788         if (rc >= 0) {
789             ret = qemu_co_send(csock, req->data, len);
790             if (ret != len) {
791                 rc = -EIO;
792             }
793         }
794         socket_set_cork(csock, 0);
795     }
796 
797     client->send_coroutine = NULL;
798     nbd_set_handlers(client);
799     qemu_co_mutex_unlock(&client->send_lock);
800     return rc;
801 }
802 
803 static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *request)
804 {
805     NBDClient *client = req->client;
806     int csock = client->sock;
807     uint32_t command;
808     ssize_t rc;
809 
810     client->recv_coroutine = qemu_coroutine_self();
811     nbd_update_can_read(client);
812 
813     rc = nbd_receive_request(csock, request);
814     if (rc < 0) {
815         if (rc != -EAGAIN) {
816             rc = -EIO;
817         }
818         goto out;
819     }
820 
821     if ((request->from + request->len) < request->from) {
822         LOG("integer overflow detected! "
823             "you're probably being attacked");
824         rc = -EINVAL;
825         goto out;
826     }
827 
828     TRACE("Decoding type");
829 
830     command = request->type & NBD_CMD_MASK_COMMAND;
831     if (command == NBD_CMD_READ || command == NBD_CMD_WRITE) {
832         if (request->len > NBD_MAX_BUFFER_SIZE) {
833             LOG("len (%u) is larger than max len (%u)",
834                 request->len, NBD_MAX_BUFFER_SIZE);
835             rc = -EINVAL;
836             goto out;
837         }
838 
839         req->data = blk_try_blockalign(client->exp->blk, request->len);
840         if (req->data == NULL) {
841             rc = -ENOMEM;
842             goto out;
843         }
844     }
845     if (command == NBD_CMD_WRITE) {
846         TRACE("Reading %u byte(s)", request->len);
847 
848         if (qemu_co_recv(csock, req->data, request->len) != request->len) {
849             LOG("reading from socket failed");
850             rc = -EIO;
851             goto out;
852         }
853     }
854     rc = 0;
855 
856 out:
857     client->recv_coroutine = NULL;
858     nbd_update_can_read(client);
859 
860     return rc;
861 }
862 
863 static void nbd_trip(void *opaque)
864 {
865     NBDClient *client = opaque;
866     NBDExport *exp = client->exp;
867     NBDRequest *req;
868     struct nbd_request request;
869     struct nbd_reply reply;
870     ssize_t ret;
871     uint32_t command;
872 
873     TRACE("Reading request.");
874     if (client->closing) {
875         return;
876     }
877 
878     req = nbd_request_get(client);
879     ret = nbd_co_receive_request(req, &request);
880     if (ret == -EAGAIN) {
881         goto done;
882     }
883     if (ret == -EIO) {
884         goto out;
885     }
886 
887     reply.handle = request.handle;
888     reply.error = 0;
889 
890     if (ret < 0) {
891         reply.error = -ret;
892         goto error_reply;
893     }
894     command = request.type & NBD_CMD_MASK_COMMAND;
895     if (command != NBD_CMD_DISC && (request.from + request.len) > exp->size) {
896             LOG("From: %" PRIu64 ", Len: %u, Size: %" PRIu64
897             ", Offset: %" PRIu64 "\n",
898                     request.from, request.len,
899                     (uint64_t)exp->size, (uint64_t)exp->dev_offset);
900         LOG("requested operation past EOF--bad client?");
901         goto invalid_request;
902     }
903 
904     if (client->closing) {
905         /*
906          * The client may be closed when we are blocked in
907          * nbd_co_receive_request()
908          */
909         goto done;
910     }
911 
912     switch (command) {
913     case NBD_CMD_READ:
914         TRACE("Request type is READ");
915 
916         if (request.type & NBD_CMD_FLAG_FUA) {
917             ret = blk_co_flush(exp->blk);
918             if (ret < 0) {
919                 LOG("flush failed");
920                 reply.error = -ret;
921                 goto error_reply;
922             }
923         }
924 
925         ret = blk_read(exp->blk,
926                        (request.from + exp->dev_offset) / BDRV_SECTOR_SIZE,
927                        req->data, request.len / BDRV_SECTOR_SIZE);
928         if (ret < 0) {
929             LOG("reading from file failed");
930             reply.error = -ret;
931             goto error_reply;
932         }
933 
934         TRACE("Read %u byte(s)", request.len);
935         if (nbd_co_send_reply(req, &reply, request.len) < 0)
936             goto out;
937         break;
938     case NBD_CMD_WRITE:
939         TRACE("Request type is WRITE");
940 
941         if (exp->nbdflags & NBD_FLAG_READ_ONLY) {
942             TRACE("Server is read-only, return error");
943             reply.error = EROFS;
944             goto error_reply;
945         }
946 
947         TRACE("Writing to device");
948 
949         ret = blk_write(exp->blk,
950                         (request.from + exp->dev_offset) / BDRV_SECTOR_SIZE,
951                         req->data, request.len / BDRV_SECTOR_SIZE);
952         if (ret < 0) {
953             LOG("writing to file failed");
954             reply.error = -ret;
955             goto error_reply;
956         }
957 
958         if (request.type & NBD_CMD_FLAG_FUA) {
959             ret = blk_co_flush(exp->blk);
960             if (ret < 0) {
961                 LOG("flush failed");
962                 reply.error = -ret;
963                 goto error_reply;
964             }
965         }
966 
967         if (nbd_co_send_reply(req, &reply, 0) < 0) {
968             goto out;
969         }
970         break;
971     case NBD_CMD_DISC:
972         TRACE("Request type is DISCONNECT");
973         errno = 0;
974         goto out;
975     case NBD_CMD_FLUSH:
976         TRACE("Request type is FLUSH");
977 
978         ret = blk_co_flush(exp->blk);
979         if (ret < 0) {
980             LOG("flush failed");
981             reply.error = -ret;
982         }
983         if (nbd_co_send_reply(req, &reply, 0) < 0) {
984             goto out;
985         }
986         break;
987     case NBD_CMD_TRIM:
988         TRACE("Request type is TRIM");
989         ret = blk_co_discard(exp->blk, (request.from + exp->dev_offset)
990                                        / BDRV_SECTOR_SIZE,
991                              request.len / BDRV_SECTOR_SIZE);
992         if (ret < 0) {
993             LOG("discard failed");
994             reply.error = -ret;
995         }
996         if (nbd_co_send_reply(req, &reply, 0) < 0) {
997             goto out;
998         }
999         break;
1000     default:
1001         LOG("invalid request type (%u) received", request.type);
1002     invalid_request:
1003         reply.error = EINVAL;
1004     error_reply:
1005         if (nbd_co_send_reply(req, &reply, 0) < 0) {
1006             goto out;
1007         }
1008         break;
1009     }
1010 
1011     TRACE("Request/Reply complete");
1012 
1013 done:
1014     nbd_request_put(req);
1015     return;
1016 
1017 out:
1018     nbd_request_put(req);
1019     client_close(client);
1020 }
1021 
1022 static void nbd_read(void *opaque)
1023 {
1024     NBDClient *client = opaque;
1025 
1026     if (client->recv_coroutine) {
1027         qemu_coroutine_enter(client->recv_coroutine, NULL);
1028     } else {
1029         qemu_coroutine_enter(qemu_coroutine_create(nbd_trip), client);
1030     }
1031 }
1032 
1033 static void nbd_restart_write(void *opaque)
1034 {
1035     NBDClient *client = opaque;
1036 
1037     qemu_coroutine_enter(client->send_coroutine, NULL);
1038 }
1039 
1040 static void nbd_set_handlers(NBDClient *client)
1041 {
1042     if (client->exp && client->exp->ctx) {
1043         aio_set_fd_handler(client->exp->ctx, client->sock,
1044                            true,
1045                            client->can_read ? nbd_read : NULL,
1046                            client->send_coroutine ? nbd_restart_write : NULL,
1047                            client);
1048     }
1049 }
1050 
1051 static void nbd_unset_handlers(NBDClient *client)
1052 {
1053     if (client->exp && client->exp->ctx) {
1054         aio_set_fd_handler(client->exp->ctx, client->sock,
1055                            true, NULL, NULL, NULL);
1056     }
1057 }
1058 
1059 static void nbd_update_can_read(NBDClient *client)
1060 {
1061     bool can_read = client->recv_coroutine ||
1062                     client->nb_requests < MAX_NBD_REQUESTS;
1063 
1064     if (can_read != client->can_read) {
1065         client->can_read = can_read;
1066         nbd_set_handlers(client);
1067 
1068         /* There is no need to invoke aio_notify(), since aio_set_fd_handler()
1069          * in nbd_set_handlers() will have taken care of that */
1070     }
1071 }
1072 
1073 static coroutine_fn void nbd_co_client_start(void *opaque)
1074 {
1075     NBDClientNewData *data = opaque;
1076     NBDClient *client = data->client;
1077     NBDExport *exp = client->exp;
1078 
1079     if (exp) {
1080         nbd_export_get(exp);
1081     }
1082     if (nbd_negotiate(data)) {
1083         shutdown(client->sock, 2);
1084         client->close(client);
1085         goto out;
1086     }
1087     qemu_co_mutex_init(&client->send_lock);
1088     nbd_set_handlers(client);
1089 
1090     if (exp) {
1091         QTAILQ_INSERT_TAIL(&exp->clients, client, next);
1092     }
1093 out:
1094     g_free(data);
1095 }
1096 
1097 void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
1098 {
1099     NBDClient *client;
1100     NBDClientNewData *data = g_new(NBDClientNewData, 1);
1101 
1102     client = g_malloc0(sizeof(NBDClient));
1103     client->refcount = 1;
1104     client->exp = exp;
1105     client->sock = csock;
1106     client->can_read = true;
1107     client->close = close_fn;
1108 
1109     data->client = client;
1110     data->co = qemu_coroutine_create(nbd_co_client_start);
1111     qemu_coroutine_enter(data->co, data);
1112 }
1113