1 /*
2  * VMware vSockets Driver
3  *
4  * Copyright (C) 2009-2013 VMware, Inc. All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the Free
8  * Software Foundation version 2 and no later version.
9  *
10  * This program is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
13  * more details.
14  */
15 
16 #include <linux/types.h>
17 #include <linux/socket.h>
18 #include <linux/stddef.h>
19 #include <net/sock.h>
20 
21 #include "vmci_transport_notify.h"
22 
23 #define PKT_FIELD(vsk, field_name) \
24 	(vmci_trans(vsk)->notify.pkt_q_state.field_name)
25 
26 static bool vmci_transport_notify_waiting_write(struct vsock_sock *vsk)
27 {
28 	bool retval;
29 	u64 notify_limit;
30 
31 	if (!PKT_FIELD(vsk, peer_waiting_write))
32 		return false;
33 
34 	/* When the sender blocks, we take that as a sign that the sender is
35 	 * faster than the receiver. To reduce the transmit rate of the sender,
36 	 * we delay the sending of the read notification by decreasing the
37 	 * write_notify_window. The notification is delayed until the number of
38 	 * bytes used in the queue drops below the write_notify_window.
39 	 */
40 
41 	if (!PKT_FIELD(vsk, peer_waiting_write_detected)) {
42 		PKT_FIELD(vsk, peer_waiting_write_detected) = true;
43 		if (PKT_FIELD(vsk, write_notify_window) < PAGE_SIZE) {
44 			PKT_FIELD(vsk, write_notify_window) =
45 			    PKT_FIELD(vsk, write_notify_min_window);
46 		} else {
47 			PKT_FIELD(vsk, write_notify_window) -= PAGE_SIZE;
48 			if (PKT_FIELD(vsk, write_notify_window) <
49 			    PKT_FIELD(vsk, write_notify_min_window))
50 				PKT_FIELD(vsk, write_notify_window) =
51 				    PKT_FIELD(vsk, write_notify_min_window);
52 
53 		}
54 	}
55 	notify_limit = vmci_trans(vsk)->consume_size -
56 		PKT_FIELD(vsk, write_notify_window);
57 
58 	/* The notify_limit is used to delay notifications in the case where
59 	 * flow control is enabled. Below the test is expressed in terms of
60 	 * free space in the queue: if free_space > ConsumeSize -
61 	 * write_notify_window then notify An alternate way of expressing this
62 	 * is to rewrite the expression to use the data ready in the receive
63 	 * queue: if write_notify_window > bufferReady then notify as
64 	 * free_space == ConsumeSize - bufferReady.
65 	 */
66 
67 	retval = vmci_qpair_consume_free_space(vmci_trans(vsk)->qpair) >
68 		notify_limit;
69 
70 	if (retval) {
71 		/* Once we notify the peer, we reset the detected flag so the
72 		 * next wait will again cause a decrease in the window size.
73 		 */
74 
75 		PKT_FIELD(vsk, peer_waiting_write_detected) = false;
76 	}
77 	return retval;
78 }
79 
80 static void
81 vmci_transport_handle_read(struct sock *sk,
82 			   struct vmci_transport_packet *pkt,
83 			   bool bottom_half,
84 			   struct sockaddr_vm *dst, struct sockaddr_vm *src)
85 {
86 	sk->sk_write_space(sk);
87 }
88 
89 static void
90 vmci_transport_handle_wrote(struct sock *sk,
91 			    struct vmci_transport_packet *pkt,
92 			    bool bottom_half,
93 			    struct sockaddr_vm *dst, struct sockaddr_vm *src)
94 {
95 	sk->sk_data_ready(sk);
96 }
97 
98 static void vsock_block_update_write_window(struct sock *sk)
99 {
100 	struct vsock_sock *vsk = vsock_sk(sk);
101 
102 	if (PKT_FIELD(vsk, write_notify_window) < vmci_trans(vsk)->consume_size)
103 		PKT_FIELD(vsk, write_notify_window) =
104 		    min(PKT_FIELD(vsk, write_notify_window) + PAGE_SIZE,
105 			vmci_trans(vsk)->consume_size);
106 }
107 
108 static int vmci_transport_send_read_notification(struct sock *sk)
109 {
110 	struct vsock_sock *vsk;
111 	bool sent_read;
112 	unsigned int retries;
113 	int err;
114 
115 	vsk = vsock_sk(sk);
116 	sent_read = false;
117 	retries = 0;
118 	err = 0;
119 
120 	if (vmci_transport_notify_waiting_write(vsk)) {
121 		/* Notify the peer that we have read, retrying the send on
122 		 * failure up to our maximum value.  XXX For now we just log
123 		 * the failure, but later we should schedule a work item to
124 		 * handle the resend until it succeeds.  That would require
125 		 * keeping track of work items in the vsk and cleaning them up
126 		 * upon socket close.
127 		 */
128 		while (!(vsk->peer_shutdown & RCV_SHUTDOWN) &&
129 		       !sent_read &&
130 		       retries < VMCI_TRANSPORT_MAX_DGRAM_RESENDS) {
131 			err = vmci_transport_send_read(sk);
132 			if (err >= 0)
133 				sent_read = true;
134 
135 			retries++;
136 		}
137 
138 		if (retries >= VMCI_TRANSPORT_MAX_DGRAM_RESENDS && !sent_read)
139 			pr_err("%p unable to send read notification to peer\n",
140 			       sk);
141 		else
142 			PKT_FIELD(vsk, peer_waiting_write) = false;
143 
144 	}
145 	return err;
146 }
147 
148 static void vmci_transport_notify_pkt_socket_init(struct sock *sk)
149 {
150 	struct vsock_sock *vsk = vsock_sk(sk);
151 
152 	PKT_FIELD(vsk, write_notify_window) = PAGE_SIZE;
153 	PKT_FIELD(vsk, write_notify_min_window) = PAGE_SIZE;
154 	PKT_FIELD(vsk, peer_waiting_write) = false;
155 	PKT_FIELD(vsk, peer_waiting_write_detected) = false;
156 }
157 
158 static void vmci_transport_notify_pkt_socket_destruct(struct vsock_sock *vsk)
159 {
160 	PKT_FIELD(vsk, write_notify_window) = PAGE_SIZE;
161 	PKT_FIELD(vsk, write_notify_min_window) = PAGE_SIZE;
162 	PKT_FIELD(vsk, peer_waiting_write) = false;
163 	PKT_FIELD(vsk, peer_waiting_write_detected) = false;
164 }
165 
166 static int
167 vmci_transport_notify_pkt_poll_in(struct sock *sk,
168 				  size_t target, bool *data_ready_now)
169 {
170 	struct vsock_sock *vsk = vsock_sk(sk);
171 
172 	if (vsock_stream_has_data(vsk)) {
173 		*data_ready_now = true;
174 	} else {
175 		/* We can't read right now because there is nothing in the
176 		 * queue. Ask for notifications when there is something to
177 		 * read.
178 		 */
179 		if (sk->sk_state == SS_CONNECTED)
180 			vsock_block_update_write_window(sk);
181 		*data_ready_now = false;
182 	}
183 
184 	return 0;
185 }
186 
187 static int
188 vmci_transport_notify_pkt_poll_out(struct sock *sk,
189 				   size_t target, bool *space_avail_now)
190 {
191 	s64 produce_q_free_space;
192 	struct vsock_sock *vsk = vsock_sk(sk);
193 
194 	produce_q_free_space = vsock_stream_has_space(vsk);
195 	if (produce_q_free_space > 0) {
196 		*space_avail_now = true;
197 		return 0;
198 	} else if (produce_q_free_space == 0) {
199 		/* This is a connected socket but we can't currently send data.
200 		 * Nothing else to do.
201 		 */
202 		*space_avail_now = false;
203 	}
204 
205 	return 0;
206 }
207 
208 static int
209 vmci_transport_notify_pkt_recv_init(
210 				struct sock *sk,
211 				size_t target,
212 				struct vmci_transport_recv_notify_data *data)
213 {
214 	struct vsock_sock *vsk = vsock_sk(sk);
215 
216 	data->consume_head = 0;
217 	data->produce_tail = 0;
218 	data->notify_on_block = false;
219 
220 	if (PKT_FIELD(vsk, write_notify_min_window) < target + 1) {
221 		PKT_FIELD(vsk, write_notify_min_window) = target + 1;
222 		if (PKT_FIELD(vsk, write_notify_window) <
223 		    PKT_FIELD(vsk, write_notify_min_window)) {
224 			/* If the current window is smaller than the new
225 			 * minimal window size, we need to reevaluate whether
226 			 * we need to notify the sender. If the number of ready
227 			 * bytes are smaller than the new window, we need to
228 			 * send a notification to the sender before we block.
229 			 */
230 
231 			PKT_FIELD(vsk, write_notify_window) =
232 			    PKT_FIELD(vsk, write_notify_min_window);
233 			data->notify_on_block = true;
234 		}
235 	}
236 
237 	return 0;
238 }
239 
240 static int
241 vmci_transport_notify_pkt_recv_pre_block(
242 				struct sock *sk,
243 				size_t target,
244 				struct vmci_transport_recv_notify_data *data)
245 {
246 	int err = 0;
247 
248 	vsock_block_update_write_window(sk);
249 
250 	if (data->notify_on_block) {
251 		err = vmci_transport_send_read_notification(sk);
252 		if (err < 0)
253 			return err;
254 		data->notify_on_block = false;
255 	}
256 
257 	return err;
258 }
259 
260 static int
261 vmci_transport_notify_pkt_recv_post_dequeue(
262 				struct sock *sk,
263 				size_t target,
264 				ssize_t copied,
265 				bool data_read,
266 				struct vmci_transport_recv_notify_data *data)
267 {
268 	struct vsock_sock *vsk;
269 	int err;
270 	bool was_full = false;
271 	u64 free_space;
272 
273 	vsk = vsock_sk(sk);
274 	err = 0;
275 
276 	if (data_read) {
277 		smp_mb();
278 
279 		free_space =
280 			vmci_qpair_consume_free_space(vmci_trans(vsk)->qpair);
281 		was_full = free_space == copied;
282 
283 		if (was_full)
284 			PKT_FIELD(vsk, peer_waiting_write) = true;
285 
286 		err = vmci_transport_send_read_notification(sk);
287 		if (err < 0)
288 			return err;
289 
290 		/* See the comment in
291 		 * vmci_transport_notify_pkt_send_post_enqueue().
292 		 */
293 		sk->sk_data_ready(sk);
294 	}
295 
296 	return err;
297 }
298 
299 static int
300 vmci_transport_notify_pkt_send_init(
301 				struct sock *sk,
302 				struct vmci_transport_send_notify_data *data)
303 {
304 	data->consume_head = 0;
305 	data->produce_tail = 0;
306 
307 	return 0;
308 }
309 
310 static int
311 vmci_transport_notify_pkt_send_post_enqueue(
312 				struct sock *sk,
313 				ssize_t written,
314 				struct vmci_transport_send_notify_data *data)
315 {
316 	int err = 0;
317 	struct vsock_sock *vsk;
318 	bool sent_wrote = false;
319 	bool was_empty;
320 	int retries = 0;
321 
322 	vsk = vsock_sk(sk);
323 
324 	smp_mb();
325 
326 	was_empty =
327 		vmci_qpair_produce_buf_ready(vmci_trans(vsk)->qpair) == written;
328 	if (was_empty) {
329 		while (!(vsk->peer_shutdown & RCV_SHUTDOWN) &&
330 		       !sent_wrote &&
331 		       retries < VMCI_TRANSPORT_MAX_DGRAM_RESENDS) {
332 			err = vmci_transport_send_wrote(sk);
333 			if (err >= 0)
334 				sent_wrote = true;
335 
336 			retries++;
337 		}
338 	}
339 
340 	if (retries >= VMCI_TRANSPORT_MAX_DGRAM_RESENDS && !sent_wrote) {
341 		pr_err("%p unable to send wrote notification to peer\n",
342 		       sk);
343 		return err;
344 	}
345 
346 	return err;
347 }
348 
349 static void
350 vmci_transport_notify_pkt_handle_pkt(
351 				struct sock *sk,
352 				struct vmci_transport_packet *pkt,
353 				bool bottom_half,
354 				struct sockaddr_vm *dst,
355 				struct sockaddr_vm *src, bool *pkt_processed)
356 {
357 	bool processed = false;
358 
359 	switch (pkt->type) {
360 	case VMCI_TRANSPORT_PACKET_TYPE_WROTE:
361 		vmci_transport_handle_wrote(sk, pkt, bottom_half, dst, src);
362 		processed = true;
363 		break;
364 	case VMCI_TRANSPORT_PACKET_TYPE_READ:
365 		vmci_transport_handle_read(sk, pkt, bottom_half, dst, src);
366 		processed = true;
367 		break;
368 	}
369 
370 	if (pkt_processed)
371 		*pkt_processed = processed;
372 }
373 
374 static void vmci_transport_notify_pkt_process_request(struct sock *sk)
375 {
376 	struct vsock_sock *vsk = vsock_sk(sk);
377 
378 	PKT_FIELD(vsk, write_notify_window) = vmci_trans(vsk)->consume_size;
379 	if (vmci_trans(vsk)->consume_size <
380 		PKT_FIELD(vsk, write_notify_min_window))
381 		PKT_FIELD(vsk, write_notify_min_window) =
382 			vmci_trans(vsk)->consume_size;
383 }
384 
385 static void vmci_transport_notify_pkt_process_negotiate(struct sock *sk)
386 {
387 	struct vsock_sock *vsk = vsock_sk(sk);
388 
389 	PKT_FIELD(vsk, write_notify_window) = vmci_trans(vsk)->consume_size;
390 	if (vmci_trans(vsk)->consume_size <
391 		PKT_FIELD(vsk, write_notify_min_window))
392 		PKT_FIELD(vsk, write_notify_min_window) =
393 			vmci_trans(vsk)->consume_size;
394 }
395 
396 static int
397 vmci_transport_notify_pkt_recv_pre_dequeue(
398 				struct sock *sk,
399 				size_t target,
400 				struct vmci_transport_recv_notify_data *data)
401 {
402 	return 0; /* NOP for QState. */
403 }
404 
405 static int
406 vmci_transport_notify_pkt_send_pre_block(
407 				struct sock *sk,
408 				struct vmci_transport_send_notify_data *data)
409 {
410 	return 0; /* NOP for QState. */
411 }
412 
413 static int
414 vmci_transport_notify_pkt_send_pre_enqueue(
415 				struct sock *sk,
416 				struct vmci_transport_send_notify_data *data)
417 {
418 	return 0; /* NOP for QState. */
419 }
420 
421 /* Socket always on control packet based operations. */
422 const struct vmci_transport_notify_ops vmci_transport_notify_pkt_q_state_ops = {
423 	.socket_init = vmci_transport_notify_pkt_socket_init,
424 	.socket_destruct = vmci_transport_notify_pkt_socket_destruct,
425 	.poll_in = vmci_transport_notify_pkt_poll_in,
426 	.poll_out = vmci_transport_notify_pkt_poll_out,
427 	.handle_notify_pkt = vmci_transport_notify_pkt_handle_pkt,
428 	.recv_init = vmci_transport_notify_pkt_recv_init,
429 	.recv_pre_block = vmci_transport_notify_pkt_recv_pre_block,
430 	.recv_pre_dequeue = vmci_transport_notify_pkt_recv_pre_dequeue,
431 	.recv_post_dequeue = vmci_transport_notify_pkt_recv_post_dequeue,
432 	.send_init = vmci_transport_notify_pkt_send_init,
433 	.send_pre_block = vmci_transport_notify_pkt_send_pre_block,
434 	.send_pre_enqueue = vmci_transport_notify_pkt_send_pre_enqueue,
435 	.send_post_enqueue = vmci_transport_notify_pkt_send_post_enqueue,
436 	.process_request = vmci_transport_notify_pkt_process_request,
437 	.process_negotiate = vmci_transport_notify_pkt_process_negotiate,
438 };
439