xref: /openbmc/qemu/net/stream.c (revision 58d49b5895f2e0b5cfe4b2901bf24f3320b74f29)
15166fe0aSLaurent Vivier /*
25166fe0aSLaurent Vivier  * QEMU System Emulator
35166fe0aSLaurent Vivier  *
45166fe0aSLaurent Vivier  * Copyright (c) 2003-2008 Fabrice Bellard
55166fe0aSLaurent Vivier  * Copyright (c) 2022 Red Hat, Inc.
65166fe0aSLaurent Vivier  *
75166fe0aSLaurent Vivier  * Permission is hereby granted, free of charge, to any person obtaining a copy
85166fe0aSLaurent Vivier  * of this software and associated documentation files (the "Software"), to deal
95166fe0aSLaurent Vivier  * in the Software without restriction, including without limitation the rights
105166fe0aSLaurent Vivier  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
115166fe0aSLaurent Vivier  * copies of the Software, and to permit persons to whom the Software is
125166fe0aSLaurent Vivier  * furnished to do so, subject to the following conditions:
135166fe0aSLaurent Vivier  *
145166fe0aSLaurent Vivier  * The above copyright notice and this permission notice shall be included in
155166fe0aSLaurent Vivier  * all copies or substantial portions of the Software.
165166fe0aSLaurent Vivier  *
175166fe0aSLaurent Vivier  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
185166fe0aSLaurent Vivier  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
195166fe0aSLaurent Vivier  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
205166fe0aSLaurent Vivier  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
215166fe0aSLaurent Vivier  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
225166fe0aSLaurent Vivier  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
235166fe0aSLaurent Vivier  * THE SOFTWARE.
245166fe0aSLaurent Vivier  */
255166fe0aSLaurent Vivier 
265166fe0aSLaurent Vivier #include "qemu/osdep.h"
275166fe0aSLaurent Vivier 
285166fe0aSLaurent Vivier #include "net/net.h"
295166fe0aSLaurent Vivier #include "clients.h"
305166fe0aSLaurent Vivier #include "monitor/monitor.h"
315166fe0aSLaurent Vivier #include "qapi/error.h"
325166fe0aSLaurent Vivier #include "qemu/error-report.h"
335166fe0aSLaurent Vivier #include "qemu/option.h"
345166fe0aSLaurent Vivier #include "qemu/sockets.h"
355166fe0aSLaurent Vivier #include "qemu/iov.h"
365166fe0aSLaurent Vivier #include "qemu/main-loop.h"
375166fe0aSLaurent Vivier #include "qemu/cutils.h"
381f9c890fSLaurent Vivier #include "io/channel.h"
391f9c890fSLaurent Vivier #include "io/channel-socket.h"
401f9c890fSLaurent Vivier #include "io/net-listener.h"
41e506fee8SLaurent Vivier #include "qapi/qapi-events-net.h"
42148fbf0dSLaurent Vivier #include "qapi/qapi-visit-sockets.h"
43148fbf0dSLaurent Vivier #include "qapi/clone-visitor.h"
445166fe0aSLaurent Vivier 
455166fe0aSLaurent Vivier typedef struct NetStreamState {
465166fe0aSLaurent Vivier     NetClientState nc;
471f9c890fSLaurent Vivier     QIOChannel *listen_ioc;
481f9c890fSLaurent Vivier     QIONetListener *listener;
491f9c890fSLaurent Vivier     QIOChannel *ioc;
501f9c890fSLaurent Vivier     guint ioc_read_tag;
511f9c890fSLaurent Vivier     guint ioc_write_tag;
525166fe0aSLaurent Vivier     SocketReadState rs;
535166fe0aSLaurent Vivier     unsigned int send_index;      /* number of bytes sent*/
54*c40e962dSDaniil Tatianin     uint32_t reconnect_ms;
55148fbf0dSLaurent Vivier     guint timer_tag;
56148fbf0dSLaurent Vivier     SocketAddress *addr;
575166fe0aSLaurent Vivier } NetStreamState;
585166fe0aSLaurent Vivier 
591f9c890fSLaurent Vivier static void net_stream_listen(QIONetListener *listener,
601f9c890fSLaurent Vivier                               QIOChannelSocket *cioc,
611f9c890fSLaurent Vivier                               void *opaque);
62148fbf0dSLaurent Vivier static void net_stream_arm_reconnect(NetStreamState *s);
635166fe0aSLaurent Vivier 
net_stream_writable(QIOChannel * ioc,GIOCondition condition,gpointer data)641f9c890fSLaurent Vivier static gboolean net_stream_writable(QIOChannel *ioc,
651f9c890fSLaurent Vivier                                     GIOCondition condition,
661f9c890fSLaurent Vivier                                     gpointer data)
675166fe0aSLaurent Vivier {
681f9c890fSLaurent Vivier     NetStreamState *s = data;
695166fe0aSLaurent Vivier 
701f9c890fSLaurent Vivier     s->ioc_write_tag = 0;
715166fe0aSLaurent Vivier 
725166fe0aSLaurent Vivier     qemu_flush_queued_packets(&s->nc);
731f9c890fSLaurent Vivier 
741f9c890fSLaurent Vivier     return G_SOURCE_REMOVE;
755166fe0aSLaurent Vivier }
765166fe0aSLaurent Vivier 
net_stream_receive(NetClientState * nc,const uint8_t * buf,size_t size)775166fe0aSLaurent Vivier static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
785166fe0aSLaurent Vivier                                   size_t size)
795166fe0aSLaurent Vivier {
805166fe0aSLaurent Vivier     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
815166fe0aSLaurent Vivier     uint32_t len = htonl(size);
825166fe0aSLaurent Vivier     struct iovec iov[] = {
835166fe0aSLaurent Vivier         {
845166fe0aSLaurent Vivier             .iov_base = &len,
855166fe0aSLaurent Vivier             .iov_len  = sizeof(len),
865166fe0aSLaurent Vivier         }, {
875166fe0aSLaurent Vivier             .iov_base = (void *)buf,
885166fe0aSLaurent Vivier             .iov_len  = size,
895166fe0aSLaurent Vivier         },
905166fe0aSLaurent Vivier     };
911f9c890fSLaurent Vivier     struct iovec local_iov[2];
921f9c890fSLaurent Vivier     unsigned int nlocal_iov;
935166fe0aSLaurent Vivier     size_t remaining;
945166fe0aSLaurent Vivier     ssize_t ret;
955166fe0aSLaurent Vivier 
965166fe0aSLaurent Vivier     remaining = iov_size(iov, 2) - s->send_index;
971f9c890fSLaurent Vivier     nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
981f9c890fSLaurent Vivier     ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
991f9c890fSLaurent Vivier     if (ret == QIO_CHANNEL_ERR_BLOCK) {
1005166fe0aSLaurent Vivier         ret = 0; /* handled further down */
1015166fe0aSLaurent Vivier     }
1025166fe0aSLaurent Vivier     if (ret == -1) {
1035166fe0aSLaurent Vivier         s->send_index = 0;
1045166fe0aSLaurent Vivier         return -errno;
1055166fe0aSLaurent Vivier     }
1065166fe0aSLaurent Vivier     if (ret < (ssize_t)remaining) {
1075166fe0aSLaurent Vivier         s->send_index += ret;
1081f9c890fSLaurent Vivier         s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
1091f9c890fSLaurent Vivier                                                  net_stream_writable, s, NULL);
1105166fe0aSLaurent Vivier         return 0;
1115166fe0aSLaurent Vivier     }
1125166fe0aSLaurent Vivier     s->send_index = 0;
1135166fe0aSLaurent Vivier     return size;
1145166fe0aSLaurent Vivier }
1155166fe0aSLaurent Vivier 
1161f9c890fSLaurent Vivier static gboolean net_stream_send(QIOChannel *ioc,
1171f9c890fSLaurent Vivier                                 GIOCondition condition,
1181f9c890fSLaurent Vivier                                 gpointer data);
1191f9c890fSLaurent Vivier 
net_stream_send_completed(NetClientState * nc,ssize_t len)1205166fe0aSLaurent Vivier static void net_stream_send_completed(NetClientState *nc, ssize_t len)
1215166fe0aSLaurent Vivier {
1225166fe0aSLaurent Vivier     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
1235166fe0aSLaurent Vivier 
1241f9c890fSLaurent Vivier     if (!s->ioc_read_tag) {
1251f9c890fSLaurent Vivier         s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
1261f9c890fSLaurent Vivier                                                 net_stream_send, s, NULL);
1275166fe0aSLaurent Vivier     }
1285166fe0aSLaurent Vivier }
1295166fe0aSLaurent Vivier 
net_stream_rs_finalize(SocketReadState * rs)1305166fe0aSLaurent Vivier static void net_stream_rs_finalize(SocketReadState *rs)
1315166fe0aSLaurent Vivier {
1325166fe0aSLaurent Vivier     NetStreamState *s = container_of(rs, NetStreamState, rs);
1335166fe0aSLaurent Vivier 
1345166fe0aSLaurent Vivier     if (qemu_send_packet_async(&s->nc, rs->buf,
1355166fe0aSLaurent Vivier                                rs->packet_len,
1365166fe0aSLaurent Vivier                                net_stream_send_completed) == 0) {
1371f9c890fSLaurent Vivier         if (s->ioc_read_tag) {
1381f9c890fSLaurent Vivier             g_source_remove(s->ioc_read_tag);
1391f9c890fSLaurent Vivier             s->ioc_read_tag = 0;
1401f9c890fSLaurent Vivier         }
1415166fe0aSLaurent Vivier     }
1425166fe0aSLaurent Vivier }
1435166fe0aSLaurent Vivier 
net_stream_send(QIOChannel * ioc,GIOCondition condition,gpointer data)1441f9c890fSLaurent Vivier static gboolean net_stream_send(QIOChannel *ioc,
1451f9c890fSLaurent Vivier                                 GIOCondition condition,
1461f9c890fSLaurent Vivier                                 gpointer data)
1475166fe0aSLaurent Vivier {
1481f9c890fSLaurent Vivier     NetStreamState *s = data;
1495166fe0aSLaurent Vivier     int size;
1505166fe0aSLaurent Vivier     int ret;
1511f9c890fSLaurent Vivier     char buf1[NET_BUFSIZE];
1521f9c890fSLaurent Vivier     const char *buf;
1535166fe0aSLaurent Vivier 
1541f9c890fSLaurent Vivier     size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
1555166fe0aSLaurent Vivier     if (size < 0) {
1565166fe0aSLaurent Vivier         if (errno != EWOULDBLOCK) {
1575166fe0aSLaurent Vivier             goto eoc;
1585166fe0aSLaurent Vivier         }
1595166fe0aSLaurent Vivier     } else if (size == 0) {
1605166fe0aSLaurent Vivier         /* end of connection */
1615166fe0aSLaurent Vivier     eoc:
1621f9c890fSLaurent Vivier         s->ioc_read_tag = 0;
1631f9c890fSLaurent Vivier         if (s->ioc_write_tag) {
1641f9c890fSLaurent Vivier             g_source_remove(s->ioc_write_tag);
1651f9c890fSLaurent Vivier             s->ioc_write_tag = 0;
1665166fe0aSLaurent Vivier         }
1671f9c890fSLaurent Vivier         if (s->listener) {
1689cd67f0cSDaniel P. Berrangé             qemu_set_info_str(&s->nc, "listening");
1691f9c890fSLaurent Vivier             qio_net_listener_set_client_func(s->listener, net_stream_listen,
1701f9c890fSLaurent Vivier                                              s, NULL);
1711f9c890fSLaurent Vivier         }
1721f9c890fSLaurent Vivier         object_unref(OBJECT(s->ioc));
1731f9c890fSLaurent Vivier         s->ioc = NULL;
1745166fe0aSLaurent Vivier 
1755166fe0aSLaurent Vivier         net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
1765166fe0aSLaurent Vivier         s->nc.link_down = true;
1775166fe0aSLaurent Vivier 
178e506fee8SLaurent Vivier         qapi_event_send_netdev_stream_disconnected(s->nc.name);
179148fbf0dSLaurent Vivier         net_stream_arm_reconnect(s);
180e506fee8SLaurent Vivier 
1811f9c890fSLaurent Vivier         return G_SOURCE_REMOVE;
1825166fe0aSLaurent Vivier     }
1835166fe0aSLaurent Vivier     buf = buf1;
1845166fe0aSLaurent Vivier 
1851f9c890fSLaurent Vivier     ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
1865166fe0aSLaurent Vivier 
1875166fe0aSLaurent Vivier     if (ret == -1) {
1885166fe0aSLaurent Vivier         goto eoc;
1895166fe0aSLaurent Vivier     }
1901f9c890fSLaurent Vivier 
1911f9c890fSLaurent Vivier     return G_SOURCE_CONTINUE;
1925166fe0aSLaurent Vivier }
1935166fe0aSLaurent Vivier 
net_stream_cleanup(NetClientState * nc)1945166fe0aSLaurent Vivier static void net_stream_cleanup(NetClientState *nc)
1955166fe0aSLaurent Vivier {
1965166fe0aSLaurent Vivier     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
197148fbf0dSLaurent Vivier     if (s->timer_tag) {
198148fbf0dSLaurent Vivier         g_source_remove(s->timer_tag);
199148fbf0dSLaurent Vivier         s->timer_tag = 0;
200148fbf0dSLaurent Vivier     }
201148fbf0dSLaurent Vivier     if (s->addr) {
202148fbf0dSLaurent Vivier         qapi_free_SocketAddress(s->addr);
203148fbf0dSLaurent Vivier         s->addr = NULL;
204148fbf0dSLaurent Vivier     }
2051f9c890fSLaurent Vivier     if (s->ioc) {
2061f9c890fSLaurent Vivier         if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
2071f9c890fSLaurent Vivier             if (s->ioc_read_tag) {
2081f9c890fSLaurent Vivier                 g_source_remove(s->ioc_read_tag);
2091f9c890fSLaurent Vivier                 s->ioc_read_tag = 0;
2105166fe0aSLaurent Vivier             }
2111f9c890fSLaurent Vivier             if (s->ioc_write_tag) {
2121f9c890fSLaurent Vivier                 g_source_remove(s->ioc_write_tag);
2131f9c890fSLaurent Vivier                 s->ioc_write_tag = 0;
2145166fe0aSLaurent Vivier             }
2155166fe0aSLaurent Vivier         }
2161f9c890fSLaurent Vivier         object_unref(OBJECT(s->ioc));
2171f9c890fSLaurent Vivier         s->ioc = NULL;
2181f9c890fSLaurent Vivier     }
2191f9c890fSLaurent Vivier     if (s->listen_ioc) {
2201f9c890fSLaurent Vivier         if (s->listener) {
2211f9c890fSLaurent Vivier             qio_net_listener_disconnect(s->listener);
2221f9c890fSLaurent Vivier             object_unref(OBJECT(s->listener));
2231f9c890fSLaurent Vivier             s->listener = NULL;
2241f9c890fSLaurent Vivier         }
2251f9c890fSLaurent Vivier         object_unref(OBJECT(s->listen_ioc));
2261f9c890fSLaurent Vivier         s->listen_ioc = NULL;
2271f9c890fSLaurent Vivier     }
2285166fe0aSLaurent Vivier }
2295166fe0aSLaurent Vivier 
2305166fe0aSLaurent Vivier static NetClientInfo net_stream_info = {
2315166fe0aSLaurent Vivier     .type = NET_CLIENT_DRIVER_STREAM,
2325166fe0aSLaurent Vivier     .size = sizeof(NetStreamState),
2335166fe0aSLaurent Vivier     .receive = net_stream_receive,
2345166fe0aSLaurent Vivier     .cleanup = net_stream_cleanup,
2355166fe0aSLaurent Vivier };
2365166fe0aSLaurent Vivier 
net_stream_listen(QIONetListener * listener,QIOChannelSocket * cioc,void * opaque)2371f9c890fSLaurent Vivier static void net_stream_listen(QIONetListener *listener,
2381f9c890fSLaurent Vivier                               QIOChannelSocket *cioc,
2391f9c890fSLaurent Vivier                               void *opaque)
2405166fe0aSLaurent Vivier {
2415166fe0aSLaurent Vivier     NetStreamState *s = opaque;
2421f9c890fSLaurent Vivier     SocketAddress *addr;
2431f9c890fSLaurent Vivier     char *uri;
2445166fe0aSLaurent Vivier 
2451f9c890fSLaurent Vivier     object_ref(OBJECT(cioc));
2465166fe0aSLaurent Vivier 
2471f9c890fSLaurent Vivier     qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
2481f9c890fSLaurent Vivier 
2491f9c890fSLaurent Vivier     s->ioc = QIO_CHANNEL(cioc);
2501f9c890fSLaurent Vivier     qio_channel_set_name(s->ioc, "stream-server");
2515166fe0aSLaurent Vivier     s->nc.link_down = false;
25213c6be96SLaurent Vivier 
2531f9c890fSLaurent Vivier     s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
2541f9c890fSLaurent Vivier                                             s, NULL);
25513c6be96SLaurent Vivier 
2561f9c890fSLaurent Vivier     if (cioc->localAddr.ss_family == AF_UNIX) {
2571f9c890fSLaurent Vivier         addr = qio_channel_socket_get_local_address(cioc, NULL);
2581f9c890fSLaurent Vivier     } else {
2591f9c890fSLaurent Vivier         addr = qio_channel_socket_get_remote_address(cioc, NULL);
26013c6be96SLaurent Vivier     }
2611f9c890fSLaurent Vivier     g_assert(addr != NULL);
2621f9c890fSLaurent Vivier     uri = socket_uri(addr);
263ac149498SStefan Weil via     qemu_set_info_str(&s->nc, "%s", uri);
2641f9c890fSLaurent Vivier     g_free(uri);
265e506fee8SLaurent Vivier     qapi_event_send_netdev_stream_connected(s->nc.name, addr);
2661f9c890fSLaurent Vivier     qapi_free_SocketAddress(addr);
26713c6be96SLaurent Vivier }
2681f9c890fSLaurent Vivier 
net_stream_server_listening(QIOTask * task,gpointer opaque)2691f9c890fSLaurent Vivier static void net_stream_server_listening(QIOTask *task, gpointer opaque)
2701f9c890fSLaurent Vivier {
2711f9c890fSLaurent Vivier     NetStreamState *s = opaque;
2721f9c890fSLaurent Vivier     QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
2731f9c890fSLaurent Vivier     SocketAddress *addr;
2741f9c890fSLaurent Vivier     int ret;
2759cd67f0cSDaniel P. Berrangé     Error *err = NULL;
2761f9c890fSLaurent Vivier 
2779cd67f0cSDaniel P. Berrangé     if (qio_task_propagate_error(task, &err)) {
2789cd67f0cSDaniel P. Berrangé         qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err));
2799cd67f0cSDaniel P. Berrangé         error_free(err);
2801f9c890fSLaurent Vivier         return;
2811f9c890fSLaurent Vivier     }
2821f9c890fSLaurent Vivier 
2831f9c890fSLaurent Vivier     addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
2841f9c890fSLaurent Vivier     g_assert(addr != NULL);
2851f9c890fSLaurent Vivier     ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
2861f9c890fSLaurent Vivier     if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
2871f9c890fSLaurent Vivier         qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
2881f9c890fSLaurent Vivier                           addr->u.fd.str, -ret);
2891f9c890fSLaurent Vivier         return;
2901f9c890fSLaurent Vivier     }
2911f9c890fSLaurent Vivier     g_assert(ret == 0);
2921f9c890fSLaurent Vivier     qapi_free_SocketAddress(addr);
2931f9c890fSLaurent Vivier 
2941f9c890fSLaurent Vivier     s->nc.link_down = true;
2951f9c890fSLaurent Vivier     s->listener = qio_net_listener_new();
2961f9c890fSLaurent Vivier 
297cc91ca64SDaniel P. Berrangé     qemu_set_info_str(&s->nc, "listening");
2981f9c890fSLaurent Vivier     net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
2991f9c890fSLaurent Vivier     qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
3001f9c890fSLaurent Vivier     qio_net_listener_add(s->listener, listen_sioc);
3015166fe0aSLaurent Vivier }
3025166fe0aSLaurent Vivier 
net_stream_server_init(NetClientState * peer,const char * model,const char * name,SocketAddress * addr,Error ** errp)3035166fe0aSLaurent Vivier static int net_stream_server_init(NetClientState *peer,
3045166fe0aSLaurent Vivier                                   const char *model,
3055166fe0aSLaurent Vivier                                   const char *name,
3065166fe0aSLaurent Vivier                                   SocketAddress *addr,
3075166fe0aSLaurent Vivier                                   Error **errp)
3085166fe0aSLaurent Vivier {
3095166fe0aSLaurent Vivier     NetClientState *nc;
3105166fe0aSLaurent Vivier     NetStreamState *s;
3111f9c890fSLaurent Vivier     QIOChannelSocket *listen_sioc = qio_channel_socket_new();
3125166fe0aSLaurent Vivier 
3135166fe0aSLaurent Vivier     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
3145166fe0aSLaurent Vivier     s = DO_UPCAST(NetStreamState, nc, nc);
315cc91ca64SDaniel P. Berrangé     qemu_set_info_str(&s->nc, "initializing");
3161f9c890fSLaurent Vivier 
3171f9c890fSLaurent Vivier     s->listen_ioc = QIO_CHANNEL(listen_sioc);
3181f9c890fSLaurent Vivier     qio_channel_socket_listen_async(listen_sioc, addr, 0,
3191f9c890fSLaurent Vivier                                     net_stream_server_listening, s,
3201f9c890fSLaurent Vivier                                     NULL, NULL);
3211f9c890fSLaurent Vivier 
3221f9c890fSLaurent Vivier     return 0;
3231f9c890fSLaurent Vivier }
3241f9c890fSLaurent Vivier 
net_stream_client_connected(QIOTask * task,gpointer opaque)3251f9c890fSLaurent Vivier static void net_stream_client_connected(QIOTask *task, gpointer opaque)
3261f9c890fSLaurent Vivier {
3271f9c890fSLaurent Vivier     NetStreamState *s = opaque;
3281f9c890fSLaurent Vivier     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
3291f9c890fSLaurent Vivier     SocketAddress *addr;
3301f9c890fSLaurent Vivier     gchar *uri;
3311f9c890fSLaurent Vivier     int ret;
3329cd67f0cSDaniel P. Berrangé     Error *err = NULL;
3331f9c890fSLaurent Vivier 
3349cd67f0cSDaniel P. Berrangé     if (qio_task_propagate_error(task, &err)) {
3359cd67f0cSDaniel P. Berrangé         qemu_set_info_str(&s->nc, "error: %s", error_get_pretty(err));
3369cd67f0cSDaniel P. Berrangé         error_free(err);
3371f9c890fSLaurent Vivier         goto error;
3381f9c890fSLaurent Vivier     }
3391f9c890fSLaurent Vivier 
3401f9c890fSLaurent Vivier     addr = qio_channel_socket_get_remote_address(sioc, NULL);
3411f9c890fSLaurent Vivier     g_assert(addr != NULL);
3421f9c890fSLaurent Vivier     uri = socket_uri(addr);
343ac149498SStefan Weil via     qemu_set_info_str(&s->nc, "%s", uri);
3441f9c890fSLaurent Vivier     g_free(uri);
3451f9c890fSLaurent Vivier 
3461f9c890fSLaurent Vivier     ret = qemu_socket_try_set_nonblock(sioc->fd);
3471f9c890fSLaurent Vivier     if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
3481f9c890fSLaurent Vivier         qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
3491f9c890fSLaurent Vivier                           addr->u.fd.str, -ret);
3501f9c890fSLaurent Vivier         qapi_free_SocketAddress(addr);
3511f9c890fSLaurent Vivier         goto error;
3521f9c890fSLaurent Vivier     }
3531f9c890fSLaurent Vivier     g_assert(ret == 0);
3541f9c890fSLaurent Vivier 
3555166fe0aSLaurent Vivier     net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
3565166fe0aSLaurent Vivier 
3571f9c890fSLaurent Vivier     /* Disable Nagle algorithm on TCP sockets to reduce latency */
3581f9c890fSLaurent Vivier     qio_channel_set_delay(s->ioc, false);
3591f9c890fSLaurent Vivier 
3601f9c890fSLaurent Vivier     s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
3611f9c890fSLaurent Vivier                                             s, NULL);
3621f9c890fSLaurent Vivier     s->nc.link_down = false;
363e506fee8SLaurent Vivier     qapi_event_send_netdev_stream_connected(s->nc.name, addr);
3641f9c890fSLaurent Vivier     qapi_free_SocketAddress(addr);
3651f9c890fSLaurent Vivier 
3661f9c890fSLaurent Vivier     return;
3671f9c890fSLaurent Vivier error:
3681f9c890fSLaurent Vivier     object_unref(OBJECT(s->ioc));
3691f9c890fSLaurent Vivier     s->ioc = NULL;
370148fbf0dSLaurent Vivier     net_stream_arm_reconnect(s);
371148fbf0dSLaurent Vivier }
372148fbf0dSLaurent Vivier 
net_stream_reconnect(gpointer data)373148fbf0dSLaurent Vivier static gboolean net_stream_reconnect(gpointer data)
374148fbf0dSLaurent Vivier {
375148fbf0dSLaurent Vivier     NetStreamState *s = data;
376148fbf0dSLaurent Vivier     QIOChannelSocket *sioc;
377148fbf0dSLaurent Vivier 
378148fbf0dSLaurent Vivier     s->timer_tag = 0;
379148fbf0dSLaurent Vivier 
380148fbf0dSLaurent Vivier     sioc = qio_channel_socket_new();
381148fbf0dSLaurent Vivier     s->ioc = QIO_CHANNEL(sioc);
382148fbf0dSLaurent Vivier     qio_channel_socket_connect_async(sioc, s->addr,
383148fbf0dSLaurent Vivier                                      net_stream_client_connected, s,
384148fbf0dSLaurent Vivier                                      NULL, NULL);
385148fbf0dSLaurent Vivier     return G_SOURCE_REMOVE;
386148fbf0dSLaurent Vivier }
387148fbf0dSLaurent Vivier 
net_stream_arm_reconnect(NetStreamState * s)388148fbf0dSLaurent Vivier static void net_stream_arm_reconnect(NetStreamState *s)
389148fbf0dSLaurent Vivier {
390*c40e962dSDaniil Tatianin     if (s->reconnect_ms && s->timer_tag == 0) {
3919cd67f0cSDaniel P. Berrangé         qemu_set_info_str(&s->nc, "connecting");
392*c40e962dSDaniil Tatianin         s->timer_tag = g_timeout_add(s->reconnect_ms, net_stream_reconnect, s);
393148fbf0dSLaurent Vivier     }
3945166fe0aSLaurent Vivier }
3955166fe0aSLaurent Vivier 
net_stream_client_init(NetClientState * peer,const char * model,const char * name,SocketAddress * addr,uint32_t reconnect_ms,Error ** errp)3965166fe0aSLaurent Vivier static int net_stream_client_init(NetClientState *peer,
3975166fe0aSLaurent Vivier                                   const char *model,
3985166fe0aSLaurent Vivier                                   const char *name,
3995166fe0aSLaurent Vivier                                   SocketAddress *addr,
400*c40e962dSDaniil Tatianin                                   uint32_t reconnect_ms,
4015166fe0aSLaurent Vivier                                   Error **errp)
4025166fe0aSLaurent Vivier {
4035166fe0aSLaurent Vivier     NetStreamState *s;
4041f9c890fSLaurent Vivier     NetClientState *nc;
4051f9c890fSLaurent Vivier     QIOChannelSocket *sioc = qio_channel_socket_new();
4065166fe0aSLaurent Vivier 
4071f9c890fSLaurent Vivier     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
4081f9c890fSLaurent Vivier     s = DO_UPCAST(NetStreamState, nc, nc);
409cc91ca64SDaniel P. Berrangé     qemu_set_info_str(&s->nc, "connecting");
4105166fe0aSLaurent Vivier 
4111f9c890fSLaurent Vivier     s->ioc = QIO_CHANNEL(sioc);
4121f9c890fSLaurent Vivier     s->nc.link_down = true;
4135166fe0aSLaurent Vivier 
414*c40e962dSDaniil Tatianin     s->reconnect_ms = reconnect_ms;
415*c40e962dSDaniil Tatianin     if (reconnect_ms) {
416148fbf0dSLaurent Vivier         s->addr = QAPI_CLONE(SocketAddress, addr);
417148fbf0dSLaurent Vivier     }
4181f9c890fSLaurent Vivier     qio_channel_socket_connect_async(sioc, addr,
4191f9c890fSLaurent Vivier                                      net_stream_client_connected, s,
4201f9c890fSLaurent Vivier                                      NULL, NULL);
42113c6be96SLaurent Vivier 
4225166fe0aSLaurent Vivier     return 0;
4235166fe0aSLaurent Vivier }
4245166fe0aSLaurent Vivier 
net_init_stream(const Netdev * netdev,const char * name,NetClientState * peer,Error ** errp)4255166fe0aSLaurent Vivier int net_init_stream(const Netdev *netdev, const char *name,
4265166fe0aSLaurent Vivier                     NetClientState *peer, Error **errp)
4275166fe0aSLaurent Vivier {
4285166fe0aSLaurent Vivier     const NetdevStreamOptions *sock;
4295166fe0aSLaurent Vivier 
4305166fe0aSLaurent Vivier     assert(netdev->type == NET_CLIENT_DRIVER_STREAM);
4315166fe0aSLaurent Vivier     sock = &netdev->u.stream;
4325166fe0aSLaurent Vivier 
4335166fe0aSLaurent Vivier     if (!sock->has_server || !sock->server) {
434*c40e962dSDaniil Tatianin         uint32_t reconnect_ms = 0;
435*c40e962dSDaniil Tatianin 
436*c40e962dSDaniil Tatianin         if (sock->has_reconnect && sock->has_reconnect_ms) {
437*c40e962dSDaniil Tatianin             error_setg(errp, "'reconnect' and 'reconnect-ms' are mutually "
438*c40e962dSDaniil Tatianin                              "exclusive");
439*c40e962dSDaniil Tatianin             return -1;
440*c40e962dSDaniil Tatianin         } else if (sock->has_reconnect_ms) {
441*c40e962dSDaniil Tatianin             reconnect_ms = sock->reconnect_ms;
442*c40e962dSDaniil Tatianin         } else if (sock->has_reconnect) {
443*c40e962dSDaniil Tatianin             reconnect_ms = sock->reconnect * 1000u;
444148fbf0dSLaurent Vivier         }
445*c40e962dSDaniil Tatianin 
446*c40e962dSDaniil Tatianin         return net_stream_client_init(peer, "stream", name, sock->addr,
447*c40e962dSDaniil Tatianin                                       reconnect_ms, errp);
448*c40e962dSDaniil Tatianin     }
449*c40e962dSDaniil Tatianin     if (sock->has_reconnect || sock->has_reconnect_ms) {
450*c40e962dSDaniil Tatianin         error_setg(errp, "'reconnect' and 'reconnect-ms' options are "
451*c40e962dSDaniil Tatianin                          "incompatible with socket in server mode");
452148fbf0dSLaurent Vivier         return -1;
4535166fe0aSLaurent Vivier     }
4545166fe0aSLaurent Vivier     return net_stream_server_init(peer, "stream", name, sock->addr, errp);
4555166fe0aSLaurent Vivier }
456