xref: /openbmc/qemu/tests/vhost-user-bridge.c (revision 7ebcfe569211f6ff5402b558b85e2ce1e1066cf6)
1 /*
2  * Vhost User Bridge
3  *
4  * Copyright (c) 2015 Red Hat, Inc.
5  *
6  * Authors:
7  *  Victor Kaplansky <victork@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12 
13 /*
14  * TODO:
15  *     - main should get parameters from the command line.
16  *     - implement all request handlers. Still not implemented:
17  *          vubr_get_queue_num_exec()
18  *          vubr_send_rarp_exec()
19  *     - test for broken requests and virtqueue.
20  *     - implement features defined by Virtio 1.0 spec.
21  *     - support mergeable buffers and indirect descriptors.
22  *     - implement clean shutdown.
23  *     - implement non-blocking writes to UDP backend.
24  *     - implement polling strategy.
25  *     - implement clean starting/stopping of vq processing
26  *     - implement clean starting/stopping of used and buffers
27  *       dirty page logging.
28  */
29 
30 #define _FILE_OFFSET_BITS 64
31 
32 #include <stddef.h>
33 #include <assert.h>
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <stdint.h>
37 #include <inttypes.h>
38 #include <string.h>
39 #include <unistd.h>
40 #include <errno.h>
41 #include <sys/types.h>
42 #include <sys/socket.h>
43 #include <sys/un.h>
44 #include <sys/unistd.h>
45 #include <sys/mman.h>
46 #include <sys/eventfd.h>
47 #include <arpa/inet.h>
48 
49 #include <linux/vhost.h>
50 
51 #include "qemu/atomic.h"
52 #include "standard-headers/linux/virtio_net.h"
53 #include "standard-headers/linux/virtio_ring.h"
54 
55 #define VHOST_USER_BRIDGE_DEBUG 1
56 
57 #define DPRINT(...) \
58     do { \
59         if (VHOST_USER_BRIDGE_DEBUG) { \
60             printf(__VA_ARGS__); \
61         } \
62     } while (0)
63 
64 typedef void (*CallbackFunc)(int sock, void *ctx);
65 
66 typedef struct Event {
67     void *ctx;
68     CallbackFunc callback;
69 } Event;
70 
71 typedef struct Dispatcher {
72     int max_sock;
73     fd_set fdset;
74     Event events[FD_SETSIZE];
75 } Dispatcher;
76 
77 static void
78 vubr_die(const char *s)
79 {
80     perror(s);
81     exit(1);
82 }
83 
84 static int
85 dispatcher_init(Dispatcher *dispr)
86 {
87     FD_ZERO(&dispr->fdset);
88     dispr->max_sock = -1;
89     return 0;
90 }
91 
92 static int
93 dispatcher_add(Dispatcher *dispr, int sock, void *ctx, CallbackFunc cb)
94 {
95     if (sock >= FD_SETSIZE) {
96         fprintf(stderr,
97                 "Error: Failed to add new event. sock %d should be less than %d\n",
98                 sock, FD_SETSIZE);
99         return -1;
100     }
101 
102     dispr->events[sock].ctx = ctx;
103     dispr->events[sock].callback = cb;
104 
105     FD_SET(sock, &dispr->fdset);
106     if (sock > dispr->max_sock) {
107         dispr->max_sock = sock;
108     }
109     DPRINT("Added sock %d for watching. max_sock: %d\n",
110            sock, dispr->max_sock);
111     return 0;
112 }
113 
114 #if 0
115 /* dispatcher_remove() is not currently in use but may be useful
116  * in the future. */
117 static int
118 dispatcher_remove(Dispatcher *dispr, int sock)
119 {
120     if (sock >= FD_SETSIZE) {
121         fprintf(stderr,
122                 "Error: Failed to remove event. sock %d should be less than %d\n",
123                 sock, FD_SETSIZE);
124         return -1;
125     }
126 
127     FD_CLR(sock, &dispr->fdset);
128     return 0;
129 }
130 #endif
131 
132 /* timeout in us */
133 static int
134 dispatcher_wait(Dispatcher *dispr, uint32_t timeout)
135 {
136     struct timeval tv;
137     tv.tv_sec = timeout / 1000000;
138     tv.tv_usec = timeout % 1000000;
139 
140     fd_set fdset = dispr->fdset;
141 
142     /* wait until some of sockets become readable. */
143     int rc = select(dispr->max_sock + 1, &fdset, 0, 0, &tv);
144 
145     if (rc == -1) {
146         vubr_die("select");
147     }
148 
149     /* Timeout */
150     if (rc == 0) {
151         return 0;
152     }
153 
154     /* Now call callback for every ready socket. */
155 
156     int sock;
157     for (sock = 0; sock < dispr->max_sock + 1; sock++)
158         if (FD_ISSET(sock, &fdset)) {
159             Event *e = &dispr->events[sock];
160             e->callback(sock, e->ctx);
161         }
162 
163     return 0;
164 }
165 
166 typedef struct VubrVirtq {
167     int call_fd;
168     int kick_fd;
169     uint32_t size;
170     uint16_t last_avail_index;
171     uint16_t last_used_index;
172     struct vring_desc *desc;
173     struct vring_avail *avail;
174     struct vring_used *used;
175     uint64_t log_guest_addr;
176     int enable;
177 } VubrVirtq;
178 
179 /* Based on qemu/hw/virtio/vhost-user.c */
180 
181 #define VHOST_MEMORY_MAX_NREGIONS    8
182 #define VHOST_USER_F_PROTOCOL_FEATURES 30
183 
184 #define VHOST_LOG_PAGE 4096
185 
186 enum VhostUserProtocolFeature {
187     VHOST_USER_PROTOCOL_F_MQ = 0,
188     VHOST_USER_PROTOCOL_F_LOG_SHMFD = 1,
189     VHOST_USER_PROTOCOL_F_RARP = 2,
190 
191     VHOST_USER_PROTOCOL_F_MAX
192 };
193 
194 #define VHOST_USER_PROTOCOL_FEATURE_MASK ((1 << VHOST_USER_PROTOCOL_F_MAX) - 1)
195 
196 typedef enum VhostUserRequest {
197     VHOST_USER_NONE = 0,
198     VHOST_USER_GET_FEATURES = 1,
199     VHOST_USER_SET_FEATURES = 2,
200     VHOST_USER_SET_OWNER = 3,
201     VHOST_USER_RESET_OWNER = 4,
202     VHOST_USER_SET_MEM_TABLE = 5,
203     VHOST_USER_SET_LOG_BASE = 6,
204     VHOST_USER_SET_LOG_FD = 7,
205     VHOST_USER_SET_VRING_NUM = 8,
206     VHOST_USER_SET_VRING_ADDR = 9,
207     VHOST_USER_SET_VRING_BASE = 10,
208     VHOST_USER_GET_VRING_BASE = 11,
209     VHOST_USER_SET_VRING_KICK = 12,
210     VHOST_USER_SET_VRING_CALL = 13,
211     VHOST_USER_SET_VRING_ERR = 14,
212     VHOST_USER_GET_PROTOCOL_FEATURES = 15,
213     VHOST_USER_SET_PROTOCOL_FEATURES = 16,
214     VHOST_USER_GET_QUEUE_NUM = 17,
215     VHOST_USER_SET_VRING_ENABLE = 18,
216     VHOST_USER_SEND_RARP = 19,
217     VHOST_USER_MAX
218 } VhostUserRequest;
219 
220 typedef struct VhostUserMemoryRegion {
221     uint64_t guest_phys_addr;
222     uint64_t memory_size;
223     uint64_t userspace_addr;
224     uint64_t mmap_offset;
225 } VhostUserMemoryRegion;
226 
227 typedef struct VhostUserMemory {
228     uint32_t nregions;
229     uint32_t padding;
230     VhostUserMemoryRegion regions[VHOST_MEMORY_MAX_NREGIONS];
231 } VhostUserMemory;
232 
233 typedef struct VhostUserLog {
234     uint64_t mmap_size;
235     uint64_t mmap_offset;
236 } VhostUserLog;
237 
238 typedef struct VhostUserMsg {
239     VhostUserRequest request;
240 
241 #define VHOST_USER_VERSION_MASK     (0x3)
242 #define VHOST_USER_REPLY_MASK       (0x1<<2)
243     uint32_t flags;
244     uint32_t size; /* the following payload size */
245     union {
246 #define VHOST_USER_VRING_IDX_MASK   (0xff)
247 #define VHOST_USER_VRING_NOFD_MASK  (0x1<<8)
248         uint64_t u64;
249         struct vhost_vring_state state;
250         struct vhost_vring_addr addr;
251         VhostUserMemory memory;
252         VhostUserLog log;
253     } payload;
254     int fds[VHOST_MEMORY_MAX_NREGIONS];
255     int fd_num;
256 } QEMU_PACKED VhostUserMsg;
257 
258 #define VHOST_USER_HDR_SIZE offsetof(VhostUserMsg, payload.u64)
259 
260 /* The version of the protocol we support */
261 #define VHOST_USER_VERSION    (0x1)
262 
263 #define MAX_NR_VIRTQUEUE (8)
264 
265 typedef struct VubrDevRegion {
266     /* Guest Physical address. */
267     uint64_t gpa;
268     /* Memory region size. */
269     uint64_t size;
270     /* QEMU virtual address (userspace). */
271     uint64_t qva;
272     /* Starting offset in our mmaped space. */
273     uint64_t mmap_offset;
274     /* Start address of mmaped space. */
275     uint64_t mmap_addr;
276 } VubrDevRegion;
277 
278 typedef struct VubrDev {
279     int sock;
280     Dispatcher dispatcher;
281     uint32_t nregions;
282     VubrDevRegion regions[VHOST_MEMORY_MAX_NREGIONS];
283     VubrVirtq vq[MAX_NR_VIRTQUEUE];
284     int log_call_fd;
285     uint64_t log_size;
286     uint8_t *log_table;
287     int backend_udp_sock;
288     struct sockaddr_in backend_udp_dest;
289     int ready;
290     uint64_t features;
291 } VubrDev;
292 
293 static const char *vubr_request_str[] = {
294     [VHOST_USER_NONE]                   =  "VHOST_USER_NONE",
295     [VHOST_USER_GET_FEATURES]           =  "VHOST_USER_GET_FEATURES",
296     [VHOST_USER_SET_FEATURES]           =  "VHOST_USER_SET_FEATURES",
297     [VHOST_USER_SET_OWNER]              =  "VHOST_USER_SET_OWNER",
298     [VHOST_USER_RESET_OWNER]           =  "VHOST_USER_RESET_OWNER",
299     [VHOST_USER_SET_MEM_TABLE]          =  "VHOST_USER_SET_MEM_TABLE",
300     [VHOST_USER_SET_LOG_BASE]           =  "VHOST_USER_SET_LOG_BASE",
301     [VHOST_USER_SET_LOG_FD]             =  "VHOST_USER_SET_LOG_FD",
302     [VHOST_USER_SET_VRING_NUM]          =  "VHOST_USER_SET_VRING_NUM",
303     [VHOST_USER_SET_VRING_ADDR]         =  "VHOST_USER_SET_VRING_ADDR",
304     [VHOST_USER_SET_VRING_BASE]         =  "VHOST_USER_SET_VRING_BASE",
305     [VHOST_USER_GET_VRING_BASE]         =  "VHOST_USER_GET_VRING_BASE",
306     [VHOST_USER_SET_VRING_KICK]         =  "VHOST_USER_SET_VRING_KICK",
307     [VHOST_USER_SET_VRING_CALL]         =  "VHOST_USER_SET_VRING_CALL",
308     [VHOST_USER_SET_VRING_ERR]          =  "VHOST_USER_SET_VRING_ERR",
309     [VHOST_USER_GET_PROTOCOL_FEATURES]  =  "VHOST_USER_GET_PROTOCOL_FEATURES",
310     [VHOST_USER_SET_PROTOCOL_FEATURES]  =  "VHOST_USER_SET_PROTOCOL_FEATURES",
311     [VHOST_USER_GET_QUEUE_NUM]          =  "VHOST_USER_GET_QUEUE_NUM",
312     [VHOST_USER_SET_VRING_ENABLE]       =  "VHOST_USER_SET_VRING_ENABLE",
313     [VHOST_USER_SEND_RARP]              =  "VHOST_USER_SEND_RARP",
314     [VHOST_USER_MAX]                    =  "VHOST_USER_MAX",
315 };
316 
317 static void
318 print_buffer(uint8_t *buf, size_t len)
319 {
320     int i;
321     printf("Raw buffer:\n");
322     for (i = 0; i < len; i++) {
323         if (i % 16 == 0) {
324             printf("\n");
325         }
326         if (i % 4 == 0) {
327             printf("   ");
328         }
329         printf("%02x ", buf[i]);
330     }
331     printf("\n............................................................\n");
332 }
333 
334 /* Translate guest physical address to our virtual address.  */
335 static uint64_t
336 gpa_to_va(VubrDev *dev, uint64_t guest_addr)
337 {
338     int i;
339 
340     /* Find matching memory region.  */
341     for (i = 0; i < dev->nregions; i++) {
342         VubrDevRegion *r = &dev->regions[i];
343 
344         if ((guest_addr >= r->gpa) && (guest_addr < (r->gpa + r->size))) {
345             return guest_addr - r->gpa + r->mmap_addr + r->mmap_offset;
346         }
347     }
348 
349     assert(!"address not found in regions");
350     return 0;
351 }
352 
353 /* Translate qemu virtual address to our virtual address.  */
354 static uint64_t
355 qva_to_va(VubrDev *dev, uint64_t qemu_addr)
356 {
357     int i;
358 
359     /* Find matching memory region.  */
360     for (i = 0; i < dev->nregions; i++) {
361         VubrDevRegion *r = &dev->regions[i];
362 
363         if ((qemu_addr >= r->qva) && (qemu_addr < (r->qva + r->size))) {
364             return qemu_addr - r->qva + r->mmap_addr + r->mmap_offset;
365         }
366     }
367 
368     assert(!"address not found in regions");
369     return 0;
370 }
371 
372 static void
373 vubr_message_read(int conn_fd, VhostUserMsg *vmsg)
374 {
375     char control[CMSG_SPACE(VHOST_MEMORY_MAX_NREGIONS * sizeof(int))] = { };
376     struct iovec iov = {
377         .iov_base = (char *)vmsg,
378         .iov_len = VHOST_USER_HDR_SIZE,
379     };
380     struct msghdr msg = {
381         .msg_iov = &iov,
382         .msg_iovlen = 1,
383         .msg_control = control,
384         .msg_controllen = sizeof(control),
385     };
386     size_t fd_size;
387     struct cmsghdr *cmsg;
388     int rc;
389 
390     rc = recvmsg(conn_fd, &msg, 0);
391 
392     if (rc == 0) {
393         vubr_die("recvmsg");
394         fprintf(stderr, "Peer disconnected.\n");
395         exit(1);
396     }
397     if (rc < 0) {
398         vubr_die("recvmsg");
399     }
400 
401     vmsg->fd_num = 0;
402     for (cmsg = CMSG_FIRSTHDR(&msg);
403          cmsg != NULL;
404          cmsg = CMSG_NXTHDR(&msg, cmsg))
405     {
406         if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
407             fd_size = cmsg->cmsg_len - CMSG_LEN(0);
408             vmsg->fd_num = fd_size / sizeof(int);
409             memcpy(vmsg->fds, CMSG_DATA(cmsg), fd_size);
410             break;
411         }
412     }
413 
414     if (vmsg->size > sizeof(vmsg->payload)) {
415         fprintf(stderr,
416                 "Error: too big message request: %d, size: vmsg->size: %u, "
417                 "while sizeof(vmsg->payload) = %lu\n",
418                 vmsg->request, vmsg->size, sizeof(vmsg->payload));
419         exit(1);
420     }
421 
422     if (vmsg->size) {
423         rc = read(conn_fd, &vmsg->payload, vmsg->size);
424         if (rc == 0) {
425             vubr_die("recvmsg");
426             fprintf(stderr, "Peer disconnected.\n");
427             exit(1);
428         }
429         if (rc < 0) {
430             vubr_die("recvmsg");
431         }
432 
433         assert(rc == vmsg->size);
434     }
435 }
436 
437 static void
438 vubr_message_write(int conn_fd, VhostUserMsg *vmsg)
439 {
440     int rc;
441 
442     do {
443         rc = write(conn_fd, vmsg, VHOST_USER_HDR_SIZE + vmsg->size);
444     } while (rc < 0 && errno == EINTR);
445 
446     if (rc < 0) {
447         vubr_die("write");
448     }
449 }
450 
451 static void
452 vubr_backend_udp_sendbuf(VubrDev *dev, uint8_t *buf, size_t len)
453 {
454     int slen = sizeof(struct sockaddr_in);
455 
456     if (sendto(dev->backend_udp_sock, buf, len, 0,
457                (struct sockaddr *) &dev->backend_udp_dest, slen) == -1) {
458         vubr_die("sendto()");
459     }
460 }
461 
462 static int
463 vubr_backend_udp_recvbuf(VubrDev *dev, uint8_t *buf, size_t buflen)
464 {
465     int slen = sizeof(struct sockaddr_in);
466     int rc;
467 
468     rc = recvfrom(dev->backend_udp_sock, buf, buflen, 0,
469                   (struct sockaddr *) &dev->backend_udp_dest,
470                   (socklen_t *)&slen);
471     if (rc == -1) {
472         vubr_die("recvfrom()");
473     }
474 
475     return rc;
476 }
477 
478 static void
479 vubr_consume_raw_packet(VubrDev *dev, uint8_t *buf, uint32_t len)
480 {
481     int hdrlen = sizeof(struct virtio_net_hdr_v1);
482 
483     if (VHOST_USER_BRIDGE_DEBUG) {
484         print_buffer(buf, len);
485     }
486     vubr_backend_udp_sendbuf(dev, buf + hdrlen, len - hdrlen);
487 }
488 
489 /* Kick the log_call_fd if required. */
490 static void
491 vubr_log_kick(VubrDev *dev)
492 {
493     if (dev->log_call_fd != -1) {
494         DPRINT("Kicking the QEMU's log...\n");
495         eventfd_write(dev->log_call_fd, 1);
496     }
497 }
498 
499 /* Kick the guest if necessary. */
500 static void
501 vubr_virtqueue_kick(VubrVirtq *vq)
502 {
503     if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) {
504         DPRINT("Kicking the guest...\n");
505         eventfd_write(vq->call_fd, 1);
506     }
507 }
508 
509 static void
510 vubr_log_page(uint8_t *log_table, uint64_t page)
511 {
512     DPRINT("Logged dirty guest page: %"PRId64"\n", page);
513     atomic_or(&log_table[page / 8], 1 << (page % 8));
514 }
515 
516 static void
517 vubr_log_write(VubrDev *dev, uint64_t address, uint64_t length)
518 {
519     uint64_t page;
520 
521     if (!(dev->features & (1ULL << VHOST_F_LOG_ALL)) ||
522         !dev->log_table || !length) {
523         return;
524     }
525 
526     assert(dev->log_size > ((address + length - 1) / VHOST_LOG_PAGE / 8));
527 
528     page = address / VHOST_LOG_PAGE;
529     while (page * VHOST_LOG_PAGE < address + length) {
530         vubr_log_page(dev->log_table, page);
531         page += VHOST_LOG_PAGE;
532     }
533     vubr_log_kick(dev);
534 }
535 
536 static void
537 vubr_post_buffer(VubrDev *dev, VubrVirtq *vq, uint8_t *buf, int32_t len)
538 {
539     struct vring_desc *desc = vq->desc;
540     struct vring_avail *avail = vq->avail;
541     struct vring_used *used = vq->used;
542     uint64_t log_guest_addr = vq->log_guest_addr;
543 
544     unsigned int size = vq->size;
545 
546     uint16_t avail_index = atomic_mb_read(&avail->idx);
547 
548     /* We check the available descriptors before posting the
549      * buffer, so here we assume that enough available
550      * descriptors. */
551     assert(vq->last_avail_index != avail_index);
552     uint16_t a_index = vq->last_avail_index % size;
553     uint16_t u_index = vq->last_used_index % size;
554     uint16_t d_index = avail->ring[a_index];
555 
556     int i = d_index;
557 
558     DPRINT("Post packet to guest on vq:\n");
559     DPRINT("    size             = %d\n", vq->size);
560     DPRINT("    last_avail_index = %d\n", vq->last_avail_index);
561     DPRINT("    last_used_index  = %d\n", vq->last_used_index);
562     DPRINT("    a_index = %d\n", a_index);
563     DPRINT("    u_index = %d\n", u_index);
564     DPRINT("    d_index = %d\n", d_index);
565     DPRINT("    desc[%d].addr    = 0x%016"PRIx64"\n", i, desc[i].addr);
566     DPRINT("    desc[%d].len     = %d\n", i, desc[i].len);
567     DPRINT("    desc[%d].flags   = %d\n", i, desc[i].flags);
568     DPRINT("    avail->idx = %d\n", avail_index);
569     DPRINT("    used->idx  = %d\n", used->idx);
570 
571     if (!(desc[i].flags & VRING_DESC_F_WRITE)) {
572         /* FIXME: we should find writable descriptor. */
573         fprintf(stderr, "Error: descriptor is not writable. Exiting.\n");
574         exit(1);
575     }
576 
577     void *chunk_start = (void *)gpa_to_va(dev, desc[i].addr);
578     uint32_t chunk_len = desc[i].len;
579 
580     if (len <= chunk_len) {
581         memcpy(chunk_start, buf, len);
582         vubr_log_write(dev, desc[i].addr, len);
583     } else {
584         fprintf(stderr,
585                 "Received too long packet from the backend. Dropping...\n");
586         return;
587     }
588 
589     /* Add descriptor to the used ring. */
590     used->ring[u_index].id = d_index;
591     used->ring[u_index].len = len;
592     vubr_log_write(dev,
593                    log_guest_addr + offsetof(struct vring_used, ring[u_index]),
594                    sizeof(used->ring[u_index]));
595 
596     vq->last_avail_index++;
597     vq->last_used_index++;
598 
599     atomic_mb_set(&used->idx, vq->last_used_index);
600     vubr_log_write(dev,
601                    log_guest_addr + offsetof(struct vring_used, idx),
602                    sizeof(used->idx));
603 
604     /* Kick the guest if necessary. */
605     vubr_virtqueue_kick(vq);
606 }
607 
608 static int
609 vubr_process_desc(VubrDev *dev, VubrVirtq *vq)
610 {
611     struct vring_desc *desc = vq->desc;
612     struct vring_avail *avail = vq->avail;
613     struct vring_used *used = vq->used;
614     uint64_t log_guest_addr = vq->log_guest_addr;
615 
616     unsigned int size = vq->size;
617 
618     uint16_t a_index = vq->last_avail_index % size;
619     uint16_t u_index = vq->last_used_index % size;
620     uint16_t d_index = avail->ring[a_index];
621 
622     uint32_t i, len = 0;
623     size_t buf_size = 4096;
624     uint8_t buf[4096];
625 
626     DPRINT("Chunks: ");
627     i = d_index;
628     do {
629         void *chunk_start = (void *)gpa_to_va(dev, desc[i].addr);
630         uint32_t chunk_len = desc[i].len;
631 
632         assert(!(desc[i].flags & VRING_DESC_F_WRITE));
633 
634         if (len + chunk_len < buf_size) {
635             memcpy(buf + len, chunk_start, chunk_len);
636             DPRINT("%d ", chunk_len);
637         } else {
638             fprintf(stderr, "Error: too long packet. Dropping...\n");
639             break;
640         }
641 
642         len += chunk_len;
643 
644         if (!(desc[i].flags & VRING_DESC_F_NEXT)) {
645             break;
646         }
647 
648         i = desc[i].next;
649     } while (1);
650     DPRINT("\n");
651 
652     if (!len) {
653         return -1;
654     }
655 
656     /* Add descriptor to the used ring. */
657     used->ring[u_index].id = d_index;
658     used->ring[u_index].len = len;
659     vubr_log_write(dev,
660                    log_guest_addr + offsetof(struct vring_used, ring[u_index]),
661                    sizeof(used->ring[u_index]));
662 
663     vubr_consume_raw_packet(dev, buf, len);
664 
665     return 0;
666 }
667 
668 static void
669 vubr_process_avail(VubrDev *dev, VubrVirtq *vq)
670 {
671     struct vring_avail *avail = vq->avail;
672     struct vring_used *used = vq->used;
673     uint64_t log_guest_addr = vq->log_guest_addr;
674 
675     while (vq->last_avail_index != atomic_mb_read(&avail->idx)) {
676         vubr_process_desc(dev, vq);
677         vq->last_avail_index++;
678         vq->last_used_index++;
679     }
680 
681     atomic_mb_set(&used->idx, vq->last_used_index);
682     vubr_log_write(dev,
683                    log_guest_addr + offsetof(struct vring_used, idx),
684                    sizeof(used->idx));
685 }
686 
687 static void
688 vubr_backend_recv_cb(int sock, void *ctx)
689 {
690     VubrDev *dev = (VubrDev *) ctx;
691     VubrVirtq *rx_vq = &dev->vq[0];
692     uint8_t buf[4096];
693     struct virtio_net_hdr_v1 *hdr = (struct virtio_net_hdr_v1 *)buf;
694     int hdrlen = sizeof(struct virtio_net_hdr_v1);
695     int buflen = sizeof(buf);
696     int len;
697 
698     if (!dev->ready) {
699         return;
700     }
701 
702     DPRINT("\n\n   ***   IN UDP RECEIVE CALLBACK    ***\n\n");
703 
704     uint16_t avail_index = atomic_mb_read(&rx_vq->avail->idx);
705 
706     /* If there is no available descriptors, just do nothing.
707      * The buffer will be handled by next arrived UDP packet,
708      * or next kick on receive virtq. */
709     if (rx_vq->last_avail_index == avail_index) {
710         DPRINT("Got UDP packet, but no available descriptors on RX virtq.\n");
711         return;
712     }
713 
714     len = vubr_backend_udp_recvbuf(dev, buf + hdrlen, buflen - hdrlen);
715 
716     *hdr = (struct virtio_net_hdr_v1) { };
717     hdr->num_buffers = 1;
718     vubr_post_buffer(dev, rx_vq, buf, len + hdrlen);
719 }
720 
721 static void
722 vubr_kick_cb(int sock, void *ctx)
723 {
724     VubrDev *dev = (VubrDev *) ctx;
725     eventfd_t kick_data;
726     ssize_t rc;
727 
728     rc = eventfd_read(sock, &kick_data);
729     if (rc == -1) {
730         vubr_die("eventfd_read()");
731     } else {
732         DPRINT("Got kick_data: %016"PRIx64"\n", kick_data);
733         vubr_process_avail(dev, &dev->vq[1]);
734     }
735 }
736 
737 static int
738 vubr_none_exec(VubrDev *dev, VhostUserMsg *vmsg)
739 {
740     DPRINT("Function %s() not implemented yet.\n", __func__);
741     return 0;
742 }
743 
744 static int
745 vubr_get_features_exec(VubrDev *dev, VhostUserMsg *vmsg)
746 {
747     vmsg->payload.u64 =
748             ((1ULL << VIRTIO_NET_F_MRG_RXBUF) |
749              (1ULL << VHOST_F_LOG_ALL) |
750              (1ULL << VHOST_USER_F_PROTOCOL_FEATURES));
751 
752     vmsg->size = sizeof(vmsg->payload.u64);
753 
754     DPRINT("Sending back to guest u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
755 
756     /* Reply */
757     return 1;
758 }
759 
760 static int
761 vubr_set_features_exec(VubrDev *dev, VhostUserMsg *vmsg)
762 {
763     DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
764     dev->features = vmsg->payload.u64;
765     return 0;
766 }
767 
768 static int
769 vubr_set_owner_exec(VubrDev *dev, VhostUserMsg *vmsg)
770 {
771     return 0;
772 }
773 
774 static void
775 vubr_close_log(VubrDev *dev)
776 {
777     if (dev->log_table) {
778         if (munmap(dev->log_table, dev->log_size) != 0) {
779             vubr_die("munmap()");
780         }
781 
782         dev->log_table = 0;
783     }
784     if (dev->log_call_fd != -1) {
785         close(dev->log_call_fd);
786         dev->log_call_fd = -1;
787     }
788 }
789 
790 static int
791 vubr_reset_device_exec(VubrDev *dev, VhostUserMsg *vmsg)
792 {
793     vubr_close_log(dev);
794     dev->ready = 0;
795     dev->features = 0;
796     return 0;
797 }
798 
799 static int
800 vubr_set_mem_table_exec(VubrDev *dev, VhostUserMsg *vmsg)
801 {
802     int i;
803     VhostUserMemory *memory = &vmsg->payload.memory;
804     dev->nregions = memory->nregions;
805 
806     DPRINT("Nregions: %d\n", memory->nregions);
807     for (i = 0; i < dev->nregions; i++) {
808         void *mmap_addr;
809         VhostUserMemoryRegion *msg_region = &memory->regions[i];
810         VubrDevRegion *dev_region = &dev->regions[i];
811 
812         DPRINT("Region %d\n", i);
813         DPRINT("    guest_phys_addr: 0x%016"PRIx64"\n",
814                msg_region->guest_phys_addr);
815         DPRINT("    memory_size:     0x%016"PRIx64"\n",
816                msg_region->memory_size);
817         DPRINT("    userspace_addr   0x%016"PRIx64"\n",
818                msg_region->userspace_addr);
819         DPRINT("    mmap_offset      0x%016"PRIx64"\n",
820                msg_region->mmap_offset);
821 
822         dev_region->gpa = msg_region->guest_phys_addr;
823         dev_region->size = msg_region->memory_size;
824         dev_region->qva = msg_region->userspace_addr;
825         dev_region->mmap_offset = msg_region->mmap_offset;
826 
827         /* We don't use offset argument of mmap() since the
828          * mapped address has to be page aligned, and we use huge
829          * pages.  */
830         mmap_addr = mmap(0, dev_region->size + dev_region->mmap_offset,
831                          PROT_READ | PROT_WRITE, MAP_SHARED,
832                          vmsg->fds[i], 0);
833 
834         if (mmap_addr == MAP_FAILED) {
835             vubr_die("mmap");
836         }
837 
838         dev_region->mmap_addr = (uint64_t) mmap_addr;
839         DPRINT("    mmap_addr:       0x%016"PRIx64"\n", dev_region->mmap_addr);
840     }
841 
842     return 0;
843 }
844 
845 static int
846 vubr_set_log_base_exec(VubrDev *dev, VhostUserMsg *vmsg)
847 {
848     int fd;
849     uint64_t log_mmap_size, log_mmap_offset;
850     void *rc;
851 
852     assert(vmsg->fd_num == 1);
853     fd = vmsg->fds[0];
854 
855     assert(vmsg->size == sizeof(vmsg->payload.log));
856     log_mmap_offset = vmsg->payload.log.mmap_offset;
857     log_mmap_size = vmsg->payload.log.mmap_size;
858     DPRINT("Log mmap_offset: %"PRId64"\n", log_mmap_offset);
859     DPRINT("Log mmap_size:   %"PRId64"\n", log_mmap_size);
860 
861     rc = mmap(0, log_mmap_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd,
862               log_mmap_offset);
863     if (rc == MAP_FAILED) {
864         vubr_die("mmap");
865     }
866     dev->log_table = rc;
867     dev->log_size = log_mmap_size;
868 
869     vmsg->size = sizeof(vmsg->payload.u64);
870     /* Reply */
871     return 1;
872 }
873 
874 static int
875 vubr_set_log_fd_exec(VubrDev *dev, VhostUserMsg *vmsg)
876 {
877     assert(vmsg->fd_num == 1);
878     dev->log_call_fd = vmsg->fds[0];
879     DPRINT("Got log_call_fd: %d\n", vmsg->fds[0]);
880     return 0;
881 }
882 
883 static int
884 vubr_set_vring_num_exec(VubrDev *dev, VhostUserMsg *vmsg)
885 {
886     unsigned int index = vmsg->payload.state.index;
887     unsigned int num = vmsg->payload.state.num;
888 
889     DPRINT("State.index: %d\n", index);
890     DPRINT("State.num:   %d\n", num);
891     dev->vq[index].size = num;
892     return 0;
893 }
894 
895 static int
896 vubr_set_vring_addr_exec(VubrDev *dev, VhostUserMsg *vmsg)
897 {
898     struct vhost_vring_addr *vra = &vmsg->payload.addr;
899     unsigned int index = vra->index;
900     VubrVirtq *vq = &dev->vq[index];
901 
902     DPRINT("vhost_vring_addr:\n");
903     DPRINT("    index:  %d\n", vra->index);
904     DPRINT("    flags:  %d\n", vra->flags);
905     DPRINT("    desc_user_addr:   0x%016llx\n", vra->desc_user_addr);
906     DPRINT("    used_user_addr:   0x%016llx\n", vra->used_user_addr);
907     DPRINT("    avail_user_addr:  0x%016llx\n", vra->avail_user_addr);
908     DPRINT("    log_guest_addr:   0x%016llx\n", vra->log_guest_addr);
909 
910     vq->desc = (struct vring_desc *)qva_to_va(dev, vra->desc_user_addr);
911     vq->used = (struct vring_used *)qva_to_va(dev, vra->used_user_addr);
912     vq->avail = (struct vring_avail *)qva_to_va(dev, vra->avail_user_addr);
913     vq->log_guest_addr = vra->log_guest_addr;
914 
915     DPRINT("Setting virtq addresses:\n");
916     DPRINT("    vring_desc  at %p\n", vq->desc);
917     DPRINT("    vring_used  at %p\n", vq->used);
918     DPRINT("    vring_avail at %p\n", vq->avail);
919 
920     vq->last_used_index = vq->used->idx;
921     return 0;
922 }
923 
924 static int
925 vubr_set_vring_base_exec(VubrDev *dev, VhostUserMsg *vmsg)
926 {
927     unsigned int index = vmsg->payload.state.index;
928     unsigned int num = vmsg->payload.state.num;
929 
930     DPRINT("State.index: %d\n", index);
931     DPRINT("State.num:   %d\n", num);
932     dev->vq[index].last_avail_index = num;
933 
934     return 0;
935 }
936 
937 static int
938 vubr_get_vring_base_exec(VubrDev *dev, VhostUserMsg *vmsg)
939 {
940     unsigned int index = vmsg->payload.state.index;
941 
942     DPRINT("State.index: %d\n", index);
943     vmsg->payload.state.num = dev->vq[index].last_avail_index;
944     vmsg->size = sizeof(vmsg->payload.state);
945     /* FIXME: this is a work-around for a bug in QEMU enabling
946      * too early vrings. When protocol features are enabled,
947      * we have to respect * VHOST_USER_SET_VRING_ENABLE request. */
948     dev->ready = 0;
949 
950     /* Reply */
951     return 1;
952 }
953 
954 static int
955 vubr_set_vring_kick_exec(VubrDev *dev, VhostUserMsg *vmsg)
956 {
957     uint64_t u64_arg = vmsg->payload.u64;
958     int index = u64_arg & VHOST_USER_VRING_IDX_MASK;
959 
960     DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
961 
962     assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0);
963     assert(vmsg->fd_num == 1);
964 
965     dev->vq[index].kick_fd = vmsg->fds[0];
966     DPRINT("Got kick_fd: %d for vq: %d\n", vmsg->fds[0], index);
967 
968     if (index % 2 == 1) {
969         /* TX queue. */
970         dispatcher_add(&dev->dispatcher, dev->vq[index].kick_fd,
971                        dev, vubr_kick_cb);
972 
973         DPRINT("Waiting for kicks on fd: %d for vq: %d\n",
974                dev->vq[index].kick_fd, index);
975     }
976     /* We temporarily use this hack to determine that both TX and RX
977      * queues are set up and ready for processing.
978      * FIXME: we need to rely in VHOST_USER_SET_VRING_ENABLE and
979      * actual kicks. */
980     if (dev->vq[0].kick_fd != -1 &&
981         dev->vq[1].kick_fd != -1) {
982         dev->ready = 1;
983         DPRINT("vhost-user-bridge is ready for processing queues.\n");
984     }
985     return 0;
986 
987 }
988 
989 static int
990 vubr_set_vring_call_exec(VubrDev *dev, VhostUserMsg *vmsg)
991 {
992     uint64_t u64_arg = vmsg->payload.u64;
993     int index = u64_arg & VHOST_USER_VRING_IDX_MASK;
994 
995     DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
996     assert((u64_arg & VHOST_USER_VRING_NOFD_MASK) == 0);
997     assert(vmsg->fd_num == 1);
998 
999     dev->vq[index].call_fd = vmsg->fds[0];
1000     DPRINT("Got call_fd: %d for vq: %d\n", vmsg->fds[0], index);
1001 
1002     return 0;
1003 }
1004 
1005 static int
1006 vubr_set_vring_err_exec(VubrDev *dev, VhostUserMsg *vmsg)
1007 {
1008     DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
1009     return 0;
1010 }
1011 
1012 static int
1013 vubr_get_protocol_features_exec(VubrDev *dev, VhostUserMsg *vmsg)
1014 {
1015     vmsg->payload.u64 = 1ULL << VHOST_USER_PROTOCOL_F_LOG_SHMFD;
1016     DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
1017     vmsg->size = sizeof(vmsg->payload.u64);
1018 
1019     /* Reply */
1020     return 1;
1021 }
1022 
1023 static int
1024 vubr_set_protocol_features_exec(VubrDev *dev, VhostUserMsg *vmsg)
1025 {
1026     /* FIXME: unimplented */
1027     DPRINT("u64: 0x%016"PRIx64"\n", vmsg->payload.u64);
1028     return 0;
1029 }
1030 
1031 static int
1032 vubr_get_queue_num_exec(VubrDev *dev, VhostUserMsg *vmsg)
1033 {
1034     DPRINT("Function %s() not implemented yet.\n", __func__);
1035     return 0;
1036 }
1037 
1038 static int
1039 vubr_set_vring_enable_exec(VubrDev *dev, VhostUserMsg *vmsg)
1040 {
1041     unsigned int index = vmsg->payload.state.index;
1042     unsigned int enable = vmsg->payload.state.num;
1043 
1044     DPRINT("State.index: %d\n", index);
1045     DPRINT("State.enable:   %d\n", enable);
1046     dev->vq[index].enable = enable;
1047     return 0;
1048 }
1049 
1050 static int
1051 vubr_send_rarp_exec(VubrDev *dev, VhostUserMsg *vmsg)
1052 {
1053     DPRINT("Function %s() not implemented yet.\n", __func__);
1054     return 0;
1055 }
1056 
1057 static int
1058 vubr_execute_request(VubrDev *dev, VhostUserMsg *vmsg)
1059 {
1060     /* Print out generic part of the request. */
1061     DPRINT(
1062            "==================   Vhost user message from QEMU   ==================\n");
1063     DPRINT("Request: %s (%d)\n", vubr_request_str[vmsg->request],
1064            vmsg->request);
1065     DPRINT("Flags:   0x%x\n", vmsg->flags);
1066     DPRINT("Size:    %d\n", vmsg->size);
1067 
1068     if (vmsg->fd_num) {
1069         int i;
1070         DPRINT("Fds:");
1071         for (i = 0; i < vmsg->fd_num; i++) {
1072             DPRINT(" %d", vmsg->fds[i]);
1073         }
1074         DPRINT("\n");
1075     }
1076 
1077     switch (vmsg->request) {
1078     case VHOST_USER_NONE:
1079         return vubr_none_exec(dev, vmsg);
1080     case VHOST_USER_GET_FEATURES:
1081         return vubr_get_features_exec(dev, vmsg);
1082     case VHOST_USER_SET_FEATURES:
1083         return vubr_set_features_exec(dev, vmsg);
1084     case VHOST_USER_SET_OWNER:
1085         return vubr_set_owner_exec(dev, vmsg);
1086     case VHOST_USER_RESET_OWNER:
1087         return vubr_reset_device_exec(dev, vmsg);
1088     case VHOST_USER_SET_MEM_TABLE:
1089         return vubr_set_mem_table_exec(dev, vmsg);
1090     case VHOST_USER_SET_LOG_BASE:
1091         return vubr_set_log_base_exec(dev, vmsg);
1092     case VHOST_USER_SET_LOG_FD:
1093         return vubr_set_log_fd_exec(dev, vmsg);
1094     case VHOST_USER_SET_VRING_NUM:
1095         return vubr_set_vring_num_exec(dev, vmsg);
1096     case VHOST_USER_SET_VRING_ADDR:
1097         return vubr_set_vring_addr_exec(dev, vmsg);
1098     case VHOST_USER_SET_VRING_BASE:
1099         return vubr_set_vring_base_exec(dev, vmsg);
1100     case VHOST_USER_GET_VRING_BASE:
1101         return vubr_get_vring_base_exec(dev, vmsg);
1102     case VHOST_USER_SET_VRING_KICK:
1103         return vubr_set_vring_kick_exec(dev, vmsg);
1104     case VHOST_USER_SET_VRING_CALL:
1105         return vubr_set_vring_call_exec(dev, vmsg);
1106     case VHOST_USER_SET_VRING_ERR:
1107         return vubr_set_vring_err_exec(dev, vmsg);
1108     case VHOST_USER_GET_PROTOCOL_FEATURES:
1109         return vubr_get_protocol_features_exec(dev, vmsg);
1110     case VHOST_USER_SET_PROTOCOL_FEATURES:
1111         return vubr_set_protocol_features_exec(dev, vmsg);
1112     case VHOST_USER_GET_QUEUE_NUM:
1113         return vubr_get_queue_num_exec(dev, vmsg);
1114     case VHOST_USER_SET_VRING_ENABLE:
1115         return vubr_set_vring_enable_exec(dev, vmsg);
1116     case VHOST_USER_SEND_RARP:
1117         return vubr_send_rarp_exec(dev, vmsg);
1118 
1119     case VHOST_USER_MAX:
1120         assert(vmsg->request != VHOST_USER_MAX);
1121     }
1122     return 0;
1123 }
1124 
1125 static void
1126 vubr_receive_cb(int sock, void *ctx)
1127 {
1128     VubrDev *dev = (VubrDev *) ctx;
1129     VhostUserMsg vmsg;
1130     int reply_requested;
1131 
1132     vubr_message_read(sock, &vmsg);
1133     reply_requested = vubr_execute_request(dev, &vmsg);
1134     if (reply_requested) {
1135         /* Set the version in the flags when sending the reply */
1136         vmsg.flags &= ~VHOST_USER_VERSION_MASK;
1137         vmsg.flags |= VHOST_USER_VERSION;
1138         vmsg.flags |= VHOST_USER_REPLY_MASK;
1139         vubr_message_write(sock, &vmsg);
1140     }
1141 }
1142 
1143 static void
1144 vubr_accept_cb(int sock, void *ctx)
1145 {
1146     VubrDev *dev = (VubrDev *)ctx;
1147     int conn_fd;
1148     struct sockaddr_un un;
1149     socklen_t len = sizeof(un);
1150 
1151     conn_fd = accept(sock, (struct sockaddr *) &un, &len);
1152     if (conn_fd == -1) {
1153         vubr_die("accept()");
1154     }
1155     DPRINT("Got connection from remote peer on sock %d\n", conn_fd);
1156     dispatcher_add(&dev->dispatcher, conn_fd, ctx, vubr_receive_cb);
1157 }
1158 
1159 static VubrDev *
1160 vubr_new(const char *path)
1161 {
1162     VubrDev *dev = (VubrDev *) calloc(1, sizeof(VubrDev));
1163     dev->nregions = 0;
1164     int i;
1165     struct sockaddr_un un;
1166     size_t len;
1167 
1168     for (i = 0; i < MAX_NR_VIRTQUEUE; i++) {
1169         dev->vq[i] = (VubrVirtq) {
1170             .call_fd = -1, .kick_fd = -1,
1171             .size = 0,
1172             .last_avail_index = 0, .last_used_index = 0,
1173             .desc = 0, .avail = 0, .used = 0,
1174             .enable = 0,
1175         };
1176     }
1177 
1178     /* Init log */
1179     dev->log_call_fd = -1;
1180     dev->log_size = 0;
1181     dev->log_table = 0;
1182     dev->ready = 0;
1183     dev->features = 0;
1184 
1185     /* Get a UNIX socket. */
1186     dev->sock = socket(AF_UNIX, SOCK_STREAM, 0);
1187     if (dev->sock == -1) {
1188         vubr_die("socket");
1189     }
1190 
1191     un.sun_family = AF_UNIX;
1192     strcpy(un.sun_path, path);
1193     len = sizeof(un.sun_family) + strlen(path);
1194     unlink(path);
1195 
1196     if (bind(dev->sock, (struct sockaddr *) &un, len) == -1) {
1197         vubr_die("bind");
1198     }
1199 
1200     if (listen(dev->sock, 1) == -1) {
1201         vubr_die("listen");
1202     }
1203 
1204     dispatcher_init(&dev->dispatcher);
1205     dispatcher_add(&dev->dispatcher, dev->sock, (void *)dev,
1206                    vubr_accept_cb);
1207 
1208     DPRINT("Waiting for connections on UNIX socket %s ...\n", path);
1209     return dev;
1210 }
1211 
1212 static void
1213 vubr_backend_udp_setup(VubrDev *dev,
1214                        const char *local_host,
1215                        uint16_t local_port,
1216                        const char *dest_host,
1217                        uint16_t dest_port)
1218 {
1219     int sock;
1220     struct sockaddr_in si_local = {
1221         .sin_family = AF_INET,
1222         .sin_port = htons(local_port),
1223     };
1224 
1225     if (inet_aton(local_host, &si_local.sin_addr) == 0) {
1226         fprintf(stderr, "inet_aton() failed.\n");
1227         exit(1);
1228     }
1229 
1230     /* setup destination for sends */
1231     dev->backend_udp_dest = (struct sockaddr_in) {
1232         .sin_family = AF_INET,
1233         .sin_port = htons(dest_port),
1234     };
1235     if (inet_aton(dest_host, &dev->backend_udp_dest.sin_addr) == 0) {
1236         fprintf(stderr, "inet_aton() failed.\n");
1237         exit(1);
1238     }
1239 
1240     sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
1241     if (sock == -1) {
1242         vubr_die("socket");
1243     }
1244 
1245     if (bind(sock, (struct sockaddr *)&si_local, sizeof(si_local)) == -1) {
1246         vubr_die("bind");
1247     }
1248 
1249     dev->backend_udp_sock = sock;
1250     dispatcher_add(&dev->dispatcher, sock, dev, vubr_backend_recv_cb);
1251     DPRINT("Waiting for data from udp backend on %s:%d...\n",
1252            local_host, local_port);
1253 }
1254 
1255 static void
1256 vubr_run(VubrDev *dev)
1257 {
1258     while (1) {
1259         /* timeout 200ms */
1260         dispatcher_wait(&dev->dispatcher, 200000);
1261         /* Here one can try polling strategy. */
1262     }
1263 }
1264 
1265 int
1266 main(int argc, char *argv[])
1267 {
1268     VubrDev *dev;
1269 
1270     dev = vubr_new("/tmp/vubr.sock");
1271     if (!dev) {
1272         return 1;
1273     }
1274 
1275     vubr_backend_udp_setup(dev,
1276                                  "127.0.0.1", 4444,
1277                                  "127.0.0.1", 5555);
1278     vubr_run(dev);
1279     return 0;
1280 }
1281