xref: /openbmc/qemu/net/stream_data.c (revision 6fae7ce1488e3f5bdcc1747564ea68e7f6f0e931)
1 /*
2  * net stream generic functions
3  *
4  * Copyright Red Hat
5  *
6  * SPDX-License-Identifier: GPL-2.0-or-later
7  */
8 
9 #include "qemu/osdep.h"
10 #include "qemu/iov.h"
11 #include "qapi/error.h"
12 #include "net/net.h"
13 #include "io/channel.h"
14 #include "io/net-listener.h"
15 
16 #include "stream_data.h"
17 
net_stream_data_writable(QIOChannel * ioc,GIOCondition condition,gpointer data)18 static gboolean net_stream_data_writable(QIOChannel *ioc,
19                                          GIOCondition condition, gpointer data)
20 {
21     NetStreamData *d = data;
22 
23     d->ioc_write_tag = 0;
24 
25     qemu_flush_queued_packets(&d->nc);
26 
27     return G_SOURCE_REMOVE;
28 }
29 
net_stream_data_receive(NetStreamData * d,const uint8_t * buf,size_t size)30 ssize_t net_stream_data_receive(NetStreamData *d, const uint8_t *buf,
31                                 size_t size)
32 {
33     uint32_t len = htonl(size);
34     struct iovec iov[] = {
35         {
36             .iov_base = &len,
37             .iov_len  = sizeof(len),
38         }, {
39             .iov_base = (void *)buf,
40             .iov_len  = size,
41         },
42     };
43     struct iovec local_iov[2];
44     unsigned int nlocal_iov;
45     size_t remaining;
46     ssize_t ret;
47 
48     remaining = iov_size(iov, 2) - d->send_index;
49     nlocal_iov = iov_copy(local_iov, 2, iov, 2, d->send_index, remaining);
50     ret = qio_channel_writev(d->ioc, local_iov, nlocal_iov, NULL);
51     if (ret == QIO_CHANNEL_ERR_BLOCK) {
52         ret = 0; /* handled further down */
53     }
54     if (ret == -1) {
55         d->send_index = 0;
56         return -errno;
57     }
58     if (ret < (ssize_t)remaining) {
59         d->send_index += ret;
60         d->ioc_write_tag = qio_channel_add_watch(d->ioc, G_IO_OUT,
61                                                  net_stream_data_writable, d,
62                                                  NULL);
63         return 0;
64     }
65     d->send_index = 0;
66     return size;
67 }
68 
net_stream_data_send_completed(NetClientState * nc,ssize_t len)69 static void net_stream_data_send_completed(NetClientState *nc, ssize_t len)
70 {
71     NetStreamData *d = DO_UPCAST(NetStreamData, nc, nc);
72 
73     if (!d->ioc_read_tag) {
74         d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d,
75                                                 NULL);
76     }
77 }
78 
net_stream_data_rs_finalize(SocketReadState * rs)79 void net_stream_data_rs_finalize(SocketReadState *rs)
80 {
81     NetStreamData *d = container_of(rs, NetStreamData, rs);
82 
83     if (qemu_send_packet_async(&d->nc, rs->buf,
84                                rs->packet_len,
85                                net_stream_data_send_completed) == 0) {
86         if (d->ioc_read_tag) {
87             g_source_remove(d->ioc_read_tag);
88             d->ioc_read_tag = 0;
89         }
90     }
91 }
92 
net_stream_data_send(QIOChannel * ioc,GIOCondition condition,NetStreamData * d)93 gboolean net_stream_data_send(QIOChannel *ioc, GIOCondition condition,
94                               NetStreamData *d)
95 {
96     int size;
97     int ret;
98     QEMU_UNINITIALIZED char buf1[NET_BUFSIZE];
99     const char *buf;
100 
101     size = qio_channel_read(d->ioc, buf1, sizeof(buf1), NULL);
102     if (size < 0) {
103         if (errno != EWOULDBLOCK) {
104             goto eoc;
105         }
106     } else if (size == 0) {
107         /* end of connection */
108     eoc:
109         d->ioc_read_tag = 0;
110         if (d->ioc_write_tag) {
111             g_source_remove(d->ioc_write_tag);
112             d->ioc_write_tag = 0;
113         }
114         if (d->listener) {
115             qemu_set_info_str(&d->nc, "listening");
116             qio_net_listener_set_client_func(d->listener,
117                                              d->listen, d, NULL);
118         }
119         object_unref(OBJECT(d->ioc));
120         d->ioc = NULL;
121 
122         net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
123         d->nc.link_down = true;
124 
125         return G_SOURCE_REMOVE;
126     }
127     buf = buf1;
128 
129     ret = net_fill_rstate(&d->rs, (const uint8_t *)buf, size);
130 
131     if (ret == -1) {
132         goto eoc;
133     }
134 
135     return G_SOURCE_CONTINUE;
136 }
137 
net_stream_data_listen(QIONetListener * listener,QIOChannelSocket * cioc,NetStreamData * d)138 void net_stream_data_listen(QIONetListener *listener,
139                             QIOChannelSocket *cioc,
140                             NetStreamData *d)
141 {
142     object_ref(OBJECT(cioc));
143 
144     qio_net_listener_set_client_func(d->listener, NULL, d, NULL);
145 
146     d->ioc = QIO_CHANNEL(cioc);
147     qio_channel_set_name(d->ioc, "stream-server");
148     d->nc.link_down = false;
149 
150     d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL);
151 }
152 
net_stream_data_client_connected(QIOTask * task,NetStreamData * d)153 int net_stream_data_client_connected(QIOTask *task, NetStreamData *d)
154 {
155     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(d->ioc);
156     SocketAddress *addr;
157     int ret;
158     Error *err = NULL;
159 
160     if (qio_task_propagate_error(task, &err)) {
161         qemu_set_info_str(&d->nc, "error: %s", error_get_pretty(err));
162         error_free(err);
163         goto error;
164     }
165 
166     addr = qio_channel_socket_get_remote_address(sioc, NULL);
167     g_assert(addr != NULL);
168 
169     ret = qemu_socket_try_set_nonblock(sioc->fd);
170     if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
171         qemu_set_info_str(&d->nc, "can't use file descriptor %s (errno %d)",
172                           addr->u.fd.str, -ret);
173         qapi_free_SocketAddress(addr);
174         goto error;
175     }
176     g_assert(ret == 0);
177     qapi_free_SocketAddress(addr);
178 
179     net_socket_rs_init(&d->rs, net_stream_data_rs_finalize, false);
180 
181     /* Disable Nagle algorithm on TCP sockets to reduce latency */
182     qio_channel_set_delay(d->ioc, false);
183 
184     d->ioc_read_tag = qio_channel_add_watch(d->ioc, G_IO_IN, d->send, d, NULL);
185     d->nc.link_down = false;
186 
187     return 0;
188 error:
189     object_unref(OBJECT(d->ioc));
190     d->ioc = NULL;
191 
192     return -1;
193 }
194