xref: /openbmc/linux/net/smc/smc_tx.c (revision d2ba09c1)
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared Memory Communications over RDMA (SMC-R) and RoCE
4  *
5  * Manage send buffer.
6  * Producer:
7  * Copy user space data into send buffer, if send buffer space available.
8  * Consumer:
9  * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available.
10  *
11  * Copyright IBM Corp. 2016
12  *
13  * Author(s):  Ursula Braun <ubraun@linux.vnet.ibm.com>
14  */
15 
16 #include <linux/net.h>
17 #include <linux/rcupdate.h>
18 #include <linux/workqueue.h>
19 #include <linux/sched/signal.h>
20 
21 #include <net/sock.h>
22 #include <net/tcp.h>
23 
24 #include "smc.h"
25 #include "smc_wr.h"
26 #include "smc_cdc.h"
27 #include "smc_tx.h"
28 
29 #define SMC_TX_WORK_DELAY	HZ
30 #define SMC_TX_CORK_DELAY	(HZ >> 2)	/* 250 ms */
31 
32 /***************************** sndbuf producer *******************************/
33 
34 /* callback implementation for sk.sk_write_space()
35  * to wakeup sndbuf producers that blocked with smc_tx_wait_memory().
36  * called under sk_socket lock.
37  */
38 static void smc_tx_write_space(struct sock *sk)
39 {
40 	struct socket *sock = sk->sk_socket;
41 	struct smc_sock *smc = smc_sk(sk);
42 	struct socket_wq *wq;
43 
44 	/* similar to sk_stream_write_space */
45 	if (atomic_read(&smc->conn.sndbuf_space) && sock) {
46 		clear_bit(SOCK_NOSPACE, &sock->flags);
47 		rcu_read_lock();
48 		wq = rcu_dereference(sk->sk_wq);
49 		if (skwq_has_sleeper(wq))
50 			wake_up_interruptible_poll(&wq->wait,
51 						   EPOLLOUT | EPOLLWRNORM |
52 						   EPOLLWRBAND);
53 		if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
54 			sock_wake_async(wq, SOCK_WAKE_SPACE, POLL_OUT);
55 		rcu_read_unlock();
56 	}
57 }
58 
59 /* Wakeup sndbuf producers that blocked with smc_tx_wait_memory().
60  * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
61  */
62 void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
63 {
64 	if (smc->sk.sk_socket &&
65 	    test_bit(SOCK_NOSPACE, &smc->sk.sk_socket->flags))
66 		smc->sk.sk_write_space(&smc->sk);
67 }
68 
69 /* blocks sndbuf producer until at least one byte of free space available */
70 static int smc_tx_wait_memory(struct smc_sock *smc, int flags)
71 {
72 	DEFINE_WAIT_FUNC(wait, woken_wake_function);
73 	struct smc_connection *conn = &smc->conn;
74 	struct sock *sk = &smc->sk;
75 	bool noblock;
76 	long timeo;
77 	int rc = 0;
78 
79 	/* similar to sk_stream_wait_memory */
80 	timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
81 	noblock = timeo ? false : true;
82 	add_wait_queue(sk_sleep(sk), &wait);
83 	while (1) {
84 		sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
85 		if (sk->sk_err ||
86 		    (sk->sk_shutdown & SEND_SHUTDOWN) ||
87 		    conn->local_tx_ctrl.conn_state_flags.peer_done_writing) {
88 			rc = -EPIPE;
89 			break;
90 		}
91 		if (smc_cdc_rxed_any_close(conn)) {
92 			rc = -ECONNRESET;
93 			break;
94 		}
95 		if (!timeo) {
96 			if (noblock)
97 				set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
98 			rc = -EAGAIN;
99 			break;
100 		}
101 		if (signal_pending(current)) {
102 			rc = sock_intr_errno(timeo);
103 			break;
104 		}
105 		sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
106 		if (atomic_read(&conn->sndbuf_space))
107 			break; /* at least 1 byte of free space available */
108 		set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
109 		sk_wait_event(sk, &timeo,
110 			      sk->sk_err ||
111 			      (sk->sk_shutdown & SEND_SHUTDOWN) ||
112 			      smc_cdc_rxed_any_close(conn) ||
113 			      atomic_read(&conn->sndbuf_space),
114 			      &wait);
115 	}
116 	remove_wait_queue(sk_sleep(sk), &wait);
117 	return rc;
118 }
119 
120 static bool smc_tx_is_corked(struct smc_sock *smc)
121 {
122 	struct tcp_sock *tp = tcp_sk(smc->clcsock->sk);
123 
124 	return (tp->nonagle & TCP_NAGLE_CORK) ? true : false;
125 }
126 
127 /* sndbuf producer: main API called by socket layer.
128  * called under sock lock.
129  */
130 int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
131 {
132 	size_t copylen, send_done = 0, send_remaining = len;
133 	size_t chunk_len, chunk_off, chunk_len_sum;
134 	struct smc_connection *conn = &smc->conn;
135 	union smc_host_cursor prep;
136 	struct sock *sk = &smc->sk;
137 	char *sndbuf_base;
138 	int tx_cnt_prep;
139 	int writespace;
140 	int rc, chunk;
141 
142 	/* This should be in poll */
143 	sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
144 
145 	if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) {
146 		rc = -EPIPE;
147 		goto out_err;
148 	}
149 
150 	while (msg_data_left(msg)) {
151 		if (sk->sk_state == SMC_INIT)
152 			return -ENOTCONN;
153 		if (smc->sk.sk_shutdown & SEND_SHUTDOWN ||
154 		    (smc->sk.sk_err == ECONNABORTED) ||
155 		    conn->local_tx_ctrl.conn_state_flags.peer_conn_abort)
156 			return -EPIPE;
157 		if (smc_cdc_rxed_any_close(conn))
158 			return send_done ?: -ECONNRESET;
159 
160 		if (!atomic_read(&conn->sndbuf_space)) {
161 			rc = smc_tx_wait_memory(smc, msg->msg_flags);
162 			if (rc) {
163 				if (send_done)
164 					return send_done;
165 				goto out_err;
166 			}
167 			continue;
168 		}
169 
170 		/* initialize variables for 1st iteration of subsequent loop */
171 		/* could be just 1 byte, even after smc_tx_wait_memory above */
172 		writespace = atomic_read(&conn->sndbuf_space);
173 		/* not more than what user space asked for */
174 		copylen = min_t(size_t, send_remaining, writespace);
175 		/* determine start of sndbuf */
176 		sndbuf_base = conn->sndbuf_desc->cpu_addr;
177 		smc_curs_write(&prep,
178 			       smc_curs_read(&conn->tx_curs_prep, conn),
179 			       conn);
180 		tx_cnt_prep = prep.count;
181 		/* determine chunks where to write into sndbuf */
182 		/* either unwrapped case, or 1st chunk of wrapped case */
183 		chunk_len = min_t(size_t, copylen, conn->sndbuf_desc->len -
184 				  tx_cnt_prep);
185 		chunk_len_sum = chunk_len;
186 		chunk_off = tx_cnt_prep;
187 		smc_sndbuf_sync_sg_for_cpu(conn);
188 		for (chunk = 0; chunk < 2; chunk++) {
189 			rc = memcpy_from_msg(sndbuf_base + chunk_off,
190 					     msg, chunk_len);
191 			if (rc) {
192 				smc_sndbuf_sync_sg_for_device(conn);
193 				if (send_done)
194 					return send_done;
195 				goto out_err;
196 			}
197 			send_done += chunk_len;
198 			send_remaining -= chunk_len;
199 
200 			if (chunk_len_sum == copylen)
201 				break; /* either on 1st or 2nd iteration */
202 			/* prepare next (== 2nd) iteration */
203 			chunk_len = copylen - chunk_len; /* remainder */
204 			chunk_len_sum += chunk_len;
205 			chunk_off = 0; /* modulo offset in send ring buffer */
206 		}
207 		smc_sndbuf_sync_sg_for_device(conn);
208 		/* update cursors */
209 		smc_curs_add(conn->sndbuf_desc->len, &prep, copylen);
210 		smc_curs_write(&conn->tx_curs_prep,
211 			       smc_curs_read(&prep, conn),
212 			       conn);
213 		/* increased in send tasklet smc_cdc_tx_handler() */
214 		smp_mb__before_atomic();
215 		atomic_sub(copylen, &conn->sndbuf_space);
216 		/* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
217 		smp_mb__after_atomic();
218 		/* since we just produced more new data into sndbuf,
219 		 * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
220 		 */
221 		if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc)) &&
222 		    (atomic_read(&conn->sndbuf_space) >
223 						(conn->sndbuf_desc->len >> 1)))
224 			/* for a corked socket defer the RDMA writes if there
225 			 * is still sufficient sndbuf_space available
226 			 */
227 			schedule_delayed_work(&conn->tx_work,
228 					      SMC_TX_CORK_DELAY);
229 		else
230 			smc_tx_sndbuf_nonempty(conn);
231 	} /* while (msg_data_left(msg)) */
232 
233 	return send_done;
234 
235 out_err:
236 	rc = sk_stream_error(sk, msg->msg_flags, rc);
237 	/* make sure we wake any epoll edge trigger waiter */
238 	if (unlikely(rc == -EAGAIN))
239 		sk->sk_write_space(sk);
240 	return rc;
241 }
242 
243 /***************************** sndbuf consumer *******************************/
244 
245 /* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
246 static int smc_tx_rdma_write(struct smc_connection *conn, int peer_rmbe_offset,
247 			     int num_sges, struct ib_sge sges[])
248 {
249 	struct smc_link_group *lgr = conn->lgr;
250 	struct ib_send_wr *failed_wr = NULL;
251 	struct ib_rdma_wr rdma_wr;
252 	struct smc_link *link;
253 	int rc;
254 
255 	memset(&rdma_wr, 0, sizeof(rdma_wr));
256 	link = &lgr->lnk[SMC_SINGLE_LINK];
257 	rdma_wr.wr.wr_id = smc_wr_tx_get_next_wr_id(link);
258 	rdma_wr.wr.sg_list = sges;
259 	rdma_wr.wr.num_sge = num_sges;
260 	rdma_wr.wr.opcode = IB_WR_RDMA_WRITE;
261 	rdma_wr.remote_addr =
262 		lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].dma_addr +
263 		/* RMBE within RMB */
264 		conn->tx_off +
265 		/* offset within RMBE */
266 		peer_rmbe_offset;
267 	rdma_wr.rkey = lgr->rtokens[conn->rtoken_idx][SMC_SINGLE_LINK].rkey;
268 	rc = ib_post_send(link->roce_qp, &rdma_wr.wr, &failed_wr);
269 	if (rc) {
270 		conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
271 		smc_lgr_terminate(lgr);
272 	}
273 	return rc;
274 }
275 
276 /* sndbuf consumer */
277 static inline void smc_tx_advance_cursors(struct smc_connection *conn,
278 					  union smc_host_cursor *prod,
279 					  union smc_host_cursor *sent,
280 					  size_t len)
281 {
282 	smc_curs_add(conn->peer_rmbe_size, prod, len);
283 	/* increased in recv tasklet smc_cdc_msg_rcv() */
284 	smp_mb__before_atomic();
285 	/* data in flight reduces usable snd_wnd */
286 	atomic_sub(len, &conn->peer_rmbe_space);
287 	/* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
288 	smp_mb__after_atomic();
289 	smc_curs_add(conn->sndbuf_desc->len, sent, len);
290 }
291 
292 /* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
293  * usable snd_wnd as max transmit
294  */
295 static int smc_tx_rdma_writes(struct smc_connection *conn)
296 {
297 	size_t src_off, src_len, dst_off, dst_len; /* current chunk values */
298 	size_t len, dst_len_sum, src_len_sum, dstchunk, srcchunk;
299 	union smc_host_cursor sent, prep, prod, cons;
300 	struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
301 	struct smc_link_group *lgr = conn->lgr;
302 	int to_send, rmbespace;
303 	struct smc_link *link;
304 	dma_addr_t dma_addr;
305 	int num_sges;
306 	int rc;
307 
308 	/* source: sndbuf */
309 	smc_curs_write(&sent, smc_curs_read(&conn->tx_curs_sent, conn), conn);
310 	smc_curs_write(&prep, smc_curs_read(&conn->tx_curs_prep, conn), conn);
311 	/* cf. wmem_alloc - (snd_max - snd_una) */
312 	to_send = smc_curs_diff(conn->sndbuf_desc->len, &sent, &prep);
313 	if (to_send <= 0)
314 		return 0;
315 
316 	/* destination: RMBE */
317 	/* cf. snd_wnd */
318 	rmbespace = atomic_read(&conn->peer_rmbe_space);
319 	if (rmbespace <= 0)
320 		return 0;
321 	smc_curs_write(&prod,
322 		       smc_curs_read(&conn->local_tx_ctrl.prod, conn),
323 		       conn);
324 	smc_curs_write(&cons,
325 		       smc_curs_read(&conn->local_rx_ctrl.cons, conn),
326 		       conn);
327 
328 	/* if usable snd_wnd closes ask peer to advertise once it opens again */
329 	conn->local_tx_ctrl.prod_flags.write_blocked = (to_send >= rmbespace);
330 	/* cf. usable snd_wnd */
331 	len = min(to_send, rmbespace);
332 
333 	/* initialize variables for first iteration of subsequent nested loop */
334 	link = &lgr->lnk[SMC_SINGLE_LINK];
335 	dst_off = prod.count;
336 	if (prod.wrap == cons.wrap) {
337 		/* the filled destination area is unwrapped,
338 		 * hence the available free destination space is wrapped
339 		 * and we need 2 destination chunks of sum len; start with 1st
340 		 * which is limited by what's available in sndbuf
341 		 */
342 		dst_len = min_t(size_t,
343 				conn->peer_rmbe_size - prod.count, len);
344 	} else {
345 		/* the filled destination area is wrapped,
346 		 * hence the available free destination space is unwrapped
347 		 * and we need a single destination chunk of entire len
348 		 */
349 		dst_len = len;
350 	}
351 	dst_len_sum = dst_len;
352 	src_off = sent.count;
353 	/* dst_len determines the maximum src_len */
354 	if (sent.count + dst_len <= conn->sndbuf_desc->len) {
355 		/* unwrapped src case: single chunk of entire dst_len */
356 		src_len = dst_len;
357 	} else {
358 		/* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
359 		src_len = conn->sndbuf_desc->len - sent.count;
360 	}
361 	src_len_sum = src_len;
362 	dma_addr = sg_dma_address(conn->sndbuf_desc->sgt[SMC_SINGLE_LINK].sgl);
363 	for (dstchunk = 0; dstchunk < 2; dstchunk++) {
364 		num_sges = 0;
365 		for (srcchunk = 0; srcchunk < 2; srcchunk++) {
366 			sges[srcchunk].addr = dma_addr + src_off;
367 			sges[srcchunk].length = src_len;
368 			sges[srcchunk].lkey = link->roce_pd->local_dma_lkey;
369 			num_sges++;
370 			src_off += src_len;
371 			if (src_off >= conn->sndbuf_desc->len)
372 				src_off -= conn->sndbuf_desc->len;
373 						/* modulo in send ring */
374 			if (src_len_sum == dst_len)
375 				break; /* either on 1st or 2nd iteration */
376 			/* prepare next (== 2nd) iteration */
377 			src_len = dst_len - src_len; /* remainder */
378 			src_len_sum += src_len;
379 		}
380 		rc = smc_tx_rdma_write(conn, dst_off, num_sges, sges);
381 		if (rc)
382 			return rc;
383 		if (dst_len_sum == len)
384 			break; /* either on 1st or 2nd iteration */
385 		/* prepare next (== 2nd) iteration */
386 		dst_off = 0; /* modulo offset in RMBE ring buffer */
387 		dst_len = len - dst_len; /* remainder */
388 		dst_len_sum += dst_len;
389 		src_len = min_t(int,
390 				dst_len, conn->sndbuf_desc->len - sent.count);
391 		src_len_sum = src_len;
392 	}
393 
394 	smc_tx_advance_cursors(conn, &prod, &sent, len);
395 	/* update connection's cursors with advanced local cursors */
396 	smc_curs_write(&conn->local_tx_ctrl.prod,
397 		       smc_curs_read(&prod, conn),
398 		       conn);
399 							/* dst: peer RMBE */
400 	smc_curs_write(&conn->tx_curs_sent,
401 		       smc_curs_read(&sent, conn),
402 		       conn);
403 							/* src: local sndbuf */
404 
405 	return 0;
406 }
407 
408 /* Wakeup sndbuf consumers from any context (IRQ or process)
409  * since there is more data to transmit; usable snd_wnd as max transmit
410  */
411 int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
412 {
413 	struct smc_cdc_tx_pend *pend;
414 	struct smc_wr_buf *wr_buf;
415 	int rc;
416 
417 	spin_lock_bh(&conn->send_lock);
418 	rc = smc_cdc_get_free_slot(conn, &wr_buf, &pend);
419 	if (rc < 0) {
420 		if (rc == -EBUSY) {
421 			struct smc_sock *smc =
422 				container_of(conn, struct smc_sock, conn);
423 
424 			if (smc->sk.sk_err == ECONNABORTED) {
425 				rc = sock_error(&smc->sk);
426 				goto out_unlock;
427 			}
428 			rc = 0;
429 			if (conn->alert_token_local) /* connection healthy */
430 				mod_delayed_work(system_wq, &conn->tx_work,
431 						 SMC_TX_WORK_DELAY);
432 		}
433 		goto out_unlock;
434 	}
435 
436 	rc = smc_tx_rdma_writes(conn);
437 	if (rc) {
438 		smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
439 				   (struct smc_wr_tx_pend_priv *)pend);
440 		goto out_unlock;
441 	}
442 
443 	rc = smc_cdc_msg_send(conn, wr_buf, pend);
444 
445 out_unlock:
446 	spin_unlock_bh(&conn->send_lock);
447 	return rc;
448 }
449 
450 /* Wakeup sndbuf consumers from process context
451  * since there is more data to transmit
452  */
453 void smc_tx_work(struct work_struct *work)
454 {
455 	struct smc_connection *conn = container_of(to_delayed_work(work),
456 						   struct smc_connection,
457 						   tx_work);
458 	struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
459 	int rc;
460 
461 	lock_sock(&smc->sk);
462 	if (smc->sk.sk_err ||
463 	    !conn->alert_token_local ||
464 	    conn->local_rx_ctrl.conn_state_flags.peer_conn_abort)
465 		goto out;
466 
467 	rc = smc_tx_sndbuf_nonempty(conn);
468 	if (!rc && conn->local_rx_ctrl.prod_flags.write_blocked &&
469 	    !atomic_read(&conn->bytes_to_rcv))
470 		conn->local_rx_ctrl.prod_flags.write_blocked = 0;
471 
472 out:
473 	release_sock(&smc->sk);
474 }
475 
476 void smc_tx_consumer_update(struct smc_connection *conn)
477 {
478 	union smc_host_cursor cfed, cons;
479 	int to_confirm;
480 
481 	smc_curs_write(&cons,
482 		       smc_curs_read(&conn->local_tx_ctrl.cons, conn),
483 		       conn);
484 	smc_curs_write(&cfed,
485 		       smc_curs_read(&conn->rx_curs_confirmed, conn),
486 		       conn);
487 	to_confirm = smc_curs_diff(conn->rmb_desc->len, &cfed, &cons);
488 
489 	if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
490 	    ((to_confirm > conn->rmbe_update_limit) &&
491 	     ((to_confirm > (conn->rmb_desc->len / 2)) ||
492 	      conn->local_rx_ctrl.prod_flags.write_blocked))) {
493 		if ((smc_cdc_get_slot_and_msg_send(conn) < 0) &&
494 		    conn->alert_token_local) { /* connection healthy */
495 			schedule_delayed_work(&conn->tx_work,
496 					      SMC_TX_WORK_DELAY);
497 			return;
498 		}
499 		smc_curs_write(&conn->rx_curs_confirmed,
500 			       smc_curs_read(&conn->local_tx_ctrl.cons, conn),
501 			       conn);
502 		conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
503 	}
504 	if (conn->local_rx_ctrl.prod_flags.write_blocked &&
505 	    !atomic_read(&conn->bytes_to_rcv))
506 		conn->local_rx_ctrl.prod_flags.write_blocked = 0;
507 }
508 
509 /***************************** send initialize *******************************/
510 
511 /* Initialize send properties on connection establishment. NB: not __init! */
512 void smc_tx_init(struct smc_sock *smc)
513 {
514 	smc->sk.sk_write_space = smc_tx_write_space;
515 }
516