xref: /openbmc/qemu/net/stream.c (revision 6fae7ce1488e3f5bdcc1747564ea68e7f6f0e931)
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2022 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 
28 #include "net/net.h"
29 #include "clients.h"
30 #include "qapi/error.h"
31 #include "io/net-listener.h"
32 #include "qapi/qapi-events-net.h"
33 #include "qapi/qapi-visit-sockets.h"
34 #include "qapi/clone-visitor.h"
35 
36 #include "stream_data.h"
37 
38 typedef struct NetStreamState {
39     NetStreamData data;
40     uint32_t reconnect_ms;
41     guint timer_tag;
42     SocketAddress *addr;
43 } NetStreamState;
44 
45 static void net_stream_arm_reconnect(NetStreamState *s);
46 
net_stream_receive(NetClientState * nc,const uint8_t * buf,size_t size)47 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
48                                   size_t size)
49 {
50     NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc);
51 
52     return net_stream_data_receive(d, buf, size);
53 }
54 
net_stream_send(QIOChannel * ioc,GIOCondition condition,gpointer data)55 static gboolean net_stream_send(QIOChannel *ioc,
56                                 GIOCondition condition,
57                                 gpointer data)
58 {
59     if (net_stream_data_send(ioc, condition, data) == G_SOURCE_REMOVE) {
60         NetStreamState *s = DO_UPCAST(NetStreamState, data, data);
61 
62         qapi_event_send_netdev_stream_disconnected(s->data.nc.name);
63         net_stream_arm_reconnect(s);
64 
65         return G_SOURCE_REMOVE;
66     }
67 
68     return G_SOURCE_CONTINUE;
69 }
70 
net_stream_cleanup(NetClientState * nc)71 static void net_stream_cleanup(NetClientState *nc)
72 {
73     NetStreamState *s = DO_UPCAST(NetStreamState, data.nc, nc);
74     if (s->timer_tag) {
75         g_source_remove(s->timer_tag);
76         s->timer_tag = 0;
77     }
78     if (s->addr) {
79         qapi_free_SocketAddress(s->addr);
80         s->addr = NULL;
81     }
82     if (s->data.ioc) {
83         if (QIO_CHANNEL_SOCKET(s->data.ioc)->fd != -1) {
84             if (s->data.ioc_read_tag) {
85                 g_source_remove(s->data.ioc_read_tag);
86                 s->data.ioc_read_tag = 0;
87             }
88             if (s->data.ioc_write_tag) {
89                 g_source_remove(s->data.ioc_write_tag);
90                 s->data.ioc_write_tag = 0;
91             }
92         }
93         object_unref(OBJECT(s->data.ioc));
94         s->data.ioc = NULL;
95     }
96     if (s->data.listen_ioc) {
97         if (s->data.listener) {
98             qio_net_listener_disconnect(s->data.listener);
99             object_unref(OBJECT(s->data.listener));
100             s->data.listener = NULL;
101         }
102         object_unref(OBJECT(s->data.listen_ioc));
103         s->data.listen_ioc = NULL;
104     }
105 }
106 
107 static NetClientInfo net_stream_info = {
108     .type = NET_CLIENT_DRIVER_STREAM,
109     .size = sizeof(NetStreamState),
110     .receive = net_stream_receive,
111     .cleanup = net_stream_cleanup,
112 };
113 
net_stream_listen(QIONetListener * listener,QIOChannelSocket * cioc,gpointer data)114 static void net_stream_listen(QIONetListener *listener,
115                                   QIOChannelSocket *cioc, gpointer data)
116 {
117     NetStreamData *d = data;
118     SocketAddress *addr;
119     char *uri;
120 
121     net_stream_data_listen(listener, cioc, data);
122 
123     if (cioc->localAddr.ss_family == AF_UNIX) {
124         addr = qio_channel_socket_get_local_address(cioc, NULL);
125     } else {
126         addr = qio_channel_socket_get_remote_address(cioc, NULL);
127     }
128     g_assert(addr != NULL);
129     uri = socket_uri(addr);
130     qemu_set_info_str(&d->nc, "%s", uri);
131     g_free(uri);
132     qapi_event_send_netdev_stream_connected(d->nc.name, addr);
133     qapi_free_SocketAddress(addr);
134 }
135 
net_stream_server_listening(QIOTask * task,gpointer opaque)136 static void net_stream_server_listening(QIOTask *task, gpointer opaque)
137 {
138     NetStreamData *d = opaque;
139     QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(d->listen_ioc);
140     SocketAddress *addr;
141     int ret;
142     Error *err = NULL;
143 
144     if (qio_task_propagate_error(task, &err)) {
145         qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err));
146         error_free(err);
147         return;
148     }
149 
150     addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
151     g_assert(addr != NULL);
152     ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
153     if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
154         qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)",
155                           addr->u.fd.str, -ret);
156         return;
157     }
158     g_assert(ret == 0);
159     qapi_free_SocketAddress(addr);
160 
161     d->nc.link_down = true;
162     d->listener = qio_net_listener_new();
163 
164     qemu_set_info_str(&d->nc, "listening");
165     net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
166     qio_net_listener_set_client_func(d->listener, d->listen, d,
167                                      NULL);
168     qio_net_listener_add(d->listener, listen_sioc);
169 }
170 
net_stream_server_init(NetClientState * peer,const char * model,const char * name,SocketAddress * addr,Error ** errp)171 static int net_stream_server_init(NetClientState *peer,
172                                   const char *model,
173                                   const char *name,
174                                   SocketAddress *addr,
175                                   Error **errp)
176 {
177     NetClientState *nc;
178     NetStreamData *d;
179     QIOChannelSocket *listen_sioc = qio_channel_socket_new();
180 
181     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
182     d = DO_UPCAST(NetStreamData, nc, nc);
183     d->send = net_stream_send;
184     d->listen = net_stream_listen;
185     qemu_set_info_str(&d->nc, "initializing");
186 
187     d->listen_ioc = QIO_CHANNEL(listen_sioc);
188     qio_channel_socket_listen_async(listen_sioc, addr, 0,
189                                     net_stream_server_listening, d,
190                                     NULL, NULL);
191 
192     return 0;
193 }
194 
net_stream_client_connected(QIOTask * task,gpointer opaque)195 static void net_stream_client_connected(QIOTask *task, gpointer opaque)
196 {
197     NetStreamState *s = opaque;
198     NetStreamData *d = &s->data;
199     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc);
200     SocketAddress *addr;
201     gchar *uri;
202 
203     if (net_stream_data_client_connected(task, d) == -1) {
204         net_stream_arm_reconnect(s);
205         return;
206     }
207 
208     addr = qio_channel_socket_get_remote_address(sioc, NULL);
209     g_assert(addr != NULL);
210     uri = socket_uri(addr);
211     qemu_set_info_str(&d->nc, "%s", uri);
212     g_free(uri);
213     qapi_event_send_netdev_stream_connected(d->nc.name, addr);
214     qapi_free_SocketAddress(addr);
215 }
216 
net_stream_reconnect(gpointer data)217 static gboolean net_stream_reconnect(gpointer data)
218 {
219     NetStreamState *s = data;
220     QIOChannelSocket *sioc;
221 
222     s->timer_tag = 0;
223 
224     sioc = qio_channel_socket_new();
225     s->data.ioc = QIO_CHANNEL(sioc);
226     qio_channel_socket_connect_async(sioc, s->addr,
227                                      net_stream_client_connected, s,
228                                      NULL, NULL);
229     return G_SOURCE_REMOVE;
230 }
231 
net_stream_arm_reconnect(NetStreamState * s)232 static void net_stream_arm_reconnect(NetStreamState *s)
233 {
234     if (s->reconnect_ms && s->timer_tag == 0) {
235         qemu_set_info_str(&s->data.nc, "connecting");
236         s->timer_tag = g_timeout_add(s->reconnect_ms, net_stream_reconnect, s);
237     }
238 }
239 
net_stream_client_init(NetClientState * peer,const char * model,const char * name,SocketAddress * addr,uint32_t reconnect_ms,Error ** errp)240 static int net_stream_client_init(NetClientState *peer,
241                                   const char *model,
242                                   const char *name,
243                                   SocketAddress *addr,
244                                   uint32_t reconnect_ms,
245                                   Error **errp)
246 {
247     NetStreamState *s;
248     NetClientState *nc;
249     QIOChannelSocket *sioc = qio_channel_socket_new();
250 
251     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
252     s = DO_UPCAST(NetStreamState, data.nc, nc);
253     qemu_set_info_str(&s->data.nc, "connecting");
254 
255     s->data.ioc = QIO_CHANNEL(sioc);
256     s->data.nc.link_down = true;
257     s->data.send = net_stream_send;
258     s->data.listen = net_stream_listen;
259 
260     s->reconnect_ms = reconnect_ms;
261     if (reconnect_ms) {
262         s->addr = QAPI_CLONE(SocketAddress, addr);
263     }
264     qio_channel_socket_connect_async(sioc, addr,
265                                      net_stream_client_connected, s,
266                                      NULL, NULL);
267 
268     return 0;
269 }
270 
net_init_stream(const Netdev * netdev,const char * name,NetClientState * peer,Error ** errp)271 int net_init_stream(const Netdev *netdev, const char *name,
272                     NetClientState *peer, Error **errp)
273 {
274     const NetdevStreamOptions *sock;
275 
276     assert(netdev->type == NET_CLIENT_DRIVER_STREAM);
277     sock = &netdev->u.stream;
278 
279     if (!sock->has_server || !sock->server) {
280         uint32_t reconnect_ms = 0;
281 
282         if (sock->has_reconnect && sock->has_reconnect_ms) {
283             error_setg(errp, "'reconnect' and 'reconnect-ms' are mutually "
284                              "exclusive");
285             return -1;
286         } else if (sock->has_reconnect_ms) {
287             reconnect_ms = sock->reconnect_ms;
288         } else if (sock->has_reconnect) {
289             reconnect_ms = sock->reconnect * 1000u;
290         }
291 
292         return net_stream_client_init(peer, "stream", name, sock->addr,
293                                       reconnect_ms, errp);
294     }
295     if (sock->has_reconnect || sock->has_reconnect_ms) {
296         error_setg(errp, "'reconnect' and 'reconnect-ms' options are "
297                          "incompatible with socket in server mode");
298         return -1;
299     }
300     return net_stream_server_init(peer, "stream", name, sock->addr, errp);
301 }
302