xref: /openbmc/qemu/net/stream.c (revision e090e0312dc9030d94e38e3d98a88718d3561e4e)
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2022 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25 
26 #include "qemu/osdep.h"
27 
28 #include "net/net.h"
29 #include "clients.h"
30 #include "qapi/error.h"
31 #include "io/net-listener.h"
32 #include "qapi/qapi-events-net.h"
33 #include "qapi/qapi-visit-sockets.h"
34 #include "qapi/clone-visitor.h"
35 
36 #include "stream_data.h"
37 
38 typedef struct NetStreamState {
39     NetStreamData data;
40     uint32_t reconnect_ms;
41     guint timer_tag;
42     SocketAddress *addr;
43 } NetStreamState;
44 
45 static void net_stream_arm_reconnect(NetStreamState *s);
46 
47 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
48                                   size_t size)
49 {
50     NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc);
51 
52     return net_stream_data_receive(d, buf, size);
53 }
54 
55 static gboolean net_stream_send(QIOChannel *ioc,
56                                 GIOCondition condition,
57                                 gpointer data)
58 {
59     if (net_stream_data_send(ioc, condition, data) == G_SOURCE_REMOVE) {
60         NetStreamState *s = DO_UPCAST(NetStreamState, data, data);
61 
62         qapi_event_send_netdev_stream_disconnected(s->data.nc.name);
63         net_stream_arm_reconnect(s);
64 
65         return G_SOURCE_REMOVE;
66     }
67 
68     return G_SOURCE_CONTINUE;
69 }
70 
71 static void net_stream_cleanup(NetClientState *nc)
72 {
73     NetStreamState *s = DO_UPCAST(NetStreamState, data.nc, nc);
74     if (s->timer_tag) {
75         g_source_remove(s->timer_tag);
76         s->timer_tag = 0;
77     }
78     if (s->addr) {
79         qapi_free_SocketAddress(s->addr);
80         s->addr = NULL;
81     }
82     if (s->data.ioc) {
83         if (QIO_CHANNEL_SOCKET(s->data.ioc)->fd != -1) {
84             if (s->data.ioc_read_tag) {
85                 g_source_remove(s->data.ioc_read_tag);
86                 s->data.ioc_read_tag = 0;
87             }
88             if (s->data.ioc_write_tag) {
89                 g_source_remove(s->data.ioc_write_tag);
90                 s->data.ioc_write_tag = 0;
91             }
92         }
93         object_unref(OBJECT(s->data.ioc));
94         s->data.ioc = NULL;
95     }
96     if (s->data.listen_ioc) {
97         if (s->data.listener) {
98             qio_net_listener_disconnect(s->data.listener);
99             object_unref(OBJECT(s->data.listener));
100             s->data.listener = NULL;
101         }
102         object_unref(OBJECT(s->data.listen_ioc));
103         s->data.listen_ioc = NULL;
104     }
105 }
106 
107 static NetClientInfo net_stream_info = {
108     .type = NET_CLIENT_DRIVER_STREAM,
109     .size = sizeof(NetStreamState),
110     .receive = net_stream_receive,
111     .cleanup = net_stream_cleanup,
112 };
113 
114 static void net_stream_listen(QIONetListener *listener,
115                                   QIOChannelSocket *cioc, gpointer data)
116 {
117     NetStreamData *d = data;
118     SocketAddress *addr;
119     char *uri;
120 
121     net_stream_data_listen(listener, cioc, data);
122 
123     if (cioc->localAddr.ss_family == AF_UNIX) {
124         addr = qio_channel_socket_get_local_address(cioc, NULL);
125     } else {
126         addr = qio_channel_socket_get_remote_address(cioc, NULL);
127     }
128     g_assert(addr != NULL);
129     uri = socket_uri(addr);
130     qemu_set_info_str(&d->nc, "%s", uri);
131     g_free(uri);
132     qapi_event_send_netdev_stream_connected(d->nc.name, addr);
133     qapi_free_SocketAddress(addr);
134 }
135 
136 static void net_stream_server_listening(QIOTask *task, gpointer opaque)
137 {
138     NetStreamData *d = opaque;
139     QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(d->listen_ioc);
140     SocketAddress *addr;
141     Error *err = NULL;
142 
143     if (qio_task_propagate_error(task, &err)) {
144         qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err));
145         error_free(err);
146         return;
147     }
148 
149     addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
150     g_assert(addr != NULL);
151     if (!qemu_set_blocking(listen_sioc->fd, false, &err)) {
152         qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err));
153         error_free(err);
154         return;
155     }
156     qapi_free_SocketAddress(addr);
157 
158     d->nc.link_down = true;
159     d->listener = qio_net_listener_new();
160 
161     qemu_set_info_str(&d->nc, "listening");
162     net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
163     qio_net_listener_set_client_func(d->listener, d->listen, d,
164                                      NULL);
165     qio_net_listener_add(d->listener, listen_sioc);
166 }
167 
168 static int net_stream_server_init(NetClientState *peer,
169                                   const char *model,
170                                   const char *name,
171                                   SocketAddress *addr,
172                                   Error **errp)
173 {
174     NetClientState *nc;
175     NetStreamData *d;
176     QIOChannelSocket *listen_sioc = qio_channel_socket_new();
177 
178     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
179     d = DO_UPCAST(NetStreamData, nc, nc);
180     d->send = net_stream_send;
181     d->listen = net_stream_listen;
182     qemu_set_info_str(&d->nc, "initializing");
183 
184     d->listen_ioc = QIO_CHANNEL(listen_sioc);
185     qio_channel_socket_listen_async(listen_sioc, addr, 0,
186                                     net_stream_server_listening, d,
187                                     NULL, NULL);
188 
189     return 0;
190 }
191 
192 static void net_stream_client_connected(QIOTask *task, gpointer opaque)
193 {
194     NetStreamState *s = opaque;
195     NetStreamData *d = &s->data;
196     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc);
197     SocketAddress *addr;
198     gchar *uri;
199 
200     if (net_stream_data_client_connected(task, d) == -1) {
201         net_stream_arm_reconnect(s);
202         return;
203     }
204 
205     addr = qio_channel_socket_get_remote_address(sioc, NULL);
206     g_assert(addr != NULL);
207     uri = socket_uri(addr);
208     qemu_set_info_str(&d->nc, "%s", uri);
209     g_free(uri);
210     qapi_event_send_netdev_stream_connected(d->nc.name, addr);
211     qapi_free_SocketAddress(addr);
212 }
213 
214 static gboolean net_stream_reconnect(gpointer data)
215 {
216     NetStreamState *s = data;
217     QIOChannelSocket *sioc;
218 
219     s->timer_tag = 0;
220 
221     sioc = qio_channel_socket_new();
222     s->data.ioc = QIO_CHANNEL(sioc);
223     qio_channel_socket_connect_async(sioc, s->addr,
224                                      net_stream_client_connected, s,
225                                      NULL, NULL);
226     return G_SOURCE_REMOVE;
227 }
228 
229 static void net_stream_arm_reconnect(NetStreamState *s)
230 {
231     if (s->reconnect_ms && s->timer_tag == 0) {
232         qemu_set_info_str(&s->data.nc, "connecting");
233         s->timer_tag = g_timeout_add(s->reconnect_ms, net_stream_reconnect, s);
234     }
235 }
236 
237 static int net_stream_client_init(NetClientState *peer,
238                                   const char *model,
239                                   const char *name,
240                                   SocketAddress *addr,
241                                   uint32_t reconnect_ms,
242                                   Error **errp)
243 {
244     NetStreamState *s;
245     NetClientState *nc;
246     QIOChannelSocket *sioc = qio_channel_socket_new();
247 
248     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
249     s = DO_UPCAST(NetStreamState, data.nc, nc);
250     qemu_set_info_str(&s->data.nc, "connecting");
251 
252     s->data.ioc = QIO_CHANNEL(sioc);
253     s->data.nc.link_down = true;
254     s->data.send = net_stream_send;
255     s->data.listen = net_stream_listen;
256 
257     s->reconnect_ms = reconnect_ms;
258     if (reconnect_ms) {
259         s->addr = QAPI_CLONE(SocketAddress, addr);
260     }
261     qio_channel_socket_connect_async(sioc, addr,
262                                      net_stream_client_connected, s,
263                                      NULL, NULL);
264 
265     return 0;
266 }
267 
268 int net_init_stream(const Netdev *netdev, const char *name,
269                     NetClientState *peer, Error **errp)
270 {
271     const NetdevStreamOptions *sock;
272 
273     assert(netdev->type == NET_CLIENT_DRIVER_STREAM);
274     sock = &netdev->u.stream;
275 
276     if (!sock->has_server || !sock->server) {
277         return net_stream_client_init(peer, "stream", name, sock->addr,
278                                       sock->has_reconnect_ms ?
279                                           sock->reconnect_ms : 0,
280                                       errp);
281     }
282     if (sock->has_reconnect_ms) {
283         error_setg(errp, "'reconnect-ms' option is "
284                          "incompatible with socket in server mode");
285         return -1;
286     }
287     return net_stream_server_init(peer, "stream", name, sock->addr, errp);
288 }
289