1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2015 Oracle. All rights reserved. 4 * 5 * Support for backward direction RPCs on RPC/RDMA. 6 */ 7 8 #include <linux/module.h> 9 #include <linux/sunrpc/xprt.h> 10 #include <linux/sunrpc/svc.h> 11 #include <linux/sunrpc/svc_xprt.h> 12 #include <linux/sunrpc/svc_rdma.h> 13 14 #include "xprt_rdma.h" 15 #include <trace/events/rpcrdma.h> 16 17 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 18 # define RPCDBG_FACILITY RPCDBG_TRANS 19 #endif 20 21 #undef RPCRDMA_BACKCHANNEL_DEBUG 22 23 static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, 24 struct rpc_rqst *rqst) 25 { 26 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 27 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 28 29 spin_lock(&buf->rb_reqslock); 30 list_del(&req->rl_all); 31 spin_unlock(&buf->rb_reqslock); 32 33 rpcrdma_destroy_req(req); 34 } 35 36 static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt, 37 unsigned int count) 38 { 39 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 40 struct rpc_rqst *rqst; 41 unsigned int i; 42 43 for (i = 0; i < (count << 1); i++) { 44 struct rpcrdma_regbuf *rb; 45 struct rpcrdma_req *req; 46 size_t size; 47 48 req = rpcrdma_create_req(r_xprt); 49 if (IS_ERR(req)) 50 return PTR_ERR(req); 51 rqst = &req->rl_slot; 52 53 rqst->rq_xprt = xprt; 54 INIT_LIST_HEAD(&rqst->rq_bc_list); 55 __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); 56 spin_lock(&xprt->bc_pa_lock); 57 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 58 spin_unlock(&xprt->bc_pa_lock); 59 60 size = r_xprt->rx_data.inline_rsize; 61 rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL); 62 if (IS_ERR(rb)) 63 goto out_fail; 64 req->rl_sendbuf = rb; 65 xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base, 66 min_t(size_t, size, PAGE_SIZE)); 67 } 68 return 0; 69 70 out_fail: 71 rpcrdma_bc_free_rqst(r_xprt, rqst); 72 return -ENOMEM; 73 } 74 75 /** 76 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests 77 * @xprt: transport associated with these backchannel resources 78 * @reqs: number of concurrent incoming requests to expect 79 * 80 * Returns 0 on success; otherwise a negative errno 81 */ 82 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) 83 { 84 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 85 int rc; 86 87 /* The backchannel reply path returns each rpc_rqst to the 88 * bc_pa_list _after_ the reply is sent. If the server is 89 * faster than the client, it can send another backward 90 * direction request before the rpc_rqst is returned to the 91 * list. The client rejects the request in this case. 92 * 93 * Twice as many rpc_rqsts are prepared to ensure there is 94 * always an rpc_rqst available as soon as a reply is sent. 95 */ 96 if (reqs > RPCRDMA_BACKWARD_WRS >> 1) 97 goto out_err; 98 99 rc = rpcrdma_bc_setup_reqs(r_xprt, reqs); 100 if (rc) 101 goto out_free; 102 103 r_xprt->rx_buf.rb_bc_srv_max_requests = reqs; 104 request_module("svcrdma"); 105 trace_xprtrdma_cb_setup(r_xprt, reqs); 106 return 0; 107 108 out_free: 109 xprt_rdma_bc_destroy(xprt, reqs); 110 111 out_err: 112 pr_err("RPC: %s: setup backchannel transport failed\n", __func__); 113 return -ENOMEM; 114 } 115 116 /** 117 * xprt_rdma_bc_up - Create transport endpoint for backchannel service 118 * @serv: server endpoint 119 * @net: network namespace 120 * 121 * The "xprt" is an implied argument: it supplies the name of the 122 * backchannel transport class. 123 * 124 * Returns zero on success, negative errno on failure 125 */ 126 int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net) 127 { 128 int ret; 129 130 ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0); 131 if (ret < 0) 132 return ret; 133 return 0; 134 } 135 136 /** 137 * xprt_rdma_bc_maxpayload - Return maximum backchannel message size 138 * @xprt: transport 139 * 140 * Returns maximum size, in bytes, of a backchannel message 141 */ 142 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) 143 { 144 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 145 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 146 size_t maxmsg; 147 148 maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize); 149 maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE); 150 return maxmsg - RPCRDMA_HDRLEN_MIN; 151 } 152 153 static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) 154 { 155 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 156 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 157 __be32 *p; 158 159 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); 160 xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf, 161 req->rl_rdmabuf->rg_base); 162 163 p = xdr_reserve_space(&req->rl_stream, 28); 164 if (unlikely(!p)) 165 return -EIO; 166 *p++ = rqst->rq_xid; 167 *p++ = rpcrdma_version; 168 *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests); 169 *p++ = rdma_msg; 170 *p++ = xdr_zero; 171 *p++ = xdr_zero; 172 *p = xdr_zero; 173 174 if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN, 175 &rqst->rq_snd_buf, rpcrdma_noch)) 176 return -EIO; 177 178 trace_xprtrdma_cb_reply(rqst); 179 return 0; 180 } 181 182 /** 183 * xprt_rdma_bc_send_reply - marshal and send a backchannel reply 184 * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf 185 * 186 * Caller holds the transport's write lock. 187 * 188 * Returns: 189 * %0 if the RPC message has been sent 190 * %-ENOTCONN if the caller should reconnect and call again 191 * %-EIO if a permanent error occurred and the request was not 192 * sent. Do not try to send this message again. 193 */ 194 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst) 195 { 196 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 197 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 198 int rc; 199 200 if (!xprt_connected(rqst->rq_xprt)) 201 goto drop_connection; 202 203 if (!xprt_request_get_cong(rqst->rq_xprt, rqst)) 204 return -EBADSLT; 205 206 rc = rpcrdma_bc_marshal_reply(rqst); 207 if (rc < 0) 208 goto failed_marshal; 209 210 rpcrdma_post_recvs(r_xprt, true); 211 if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) 212 goto drop_connection; 213 return 0; 214 215 failed_marshal: 216 if (rc != -ENOTCONN) 217 return rc; 218 drop_connection: 219 xprt_disconnect_done(rqst->rq_xprt); 220 return -ENOTCONN; 221 } 222 223 /** 224 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests 225 * @xprt: transport associated with these backchannel resources 226 * @reqs: number of incoming requests to destroy; ignored 227 */ 228 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) 229 { 230 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 231 struct rpc_rqst *rqst, *tmp; 232 233 spin_lock(&xprt->bc_pa_lock); 234 list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 235 list_del(&rqst->rq_bc_pa_list); 236 spin_unlock(&xprt->bc_pa_lock); 237 238 rpcrdma_bc_free_rqst(r_xprt, rqst); 239 240 spin_lock(&xprt->bc_pa_lock); 241 } 242 spin_unlock(&xprt->bc_pa_lock); 243 } 244 245 /** 246 * xprt_rdma_bc_free_rqst - Release a backchannel rqst 247 * @rqst: request to release 248 */ 249 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) 250 { 251 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 252 struct rpc_xprt *xprt = rqst->rq_xprt; 253 254 dprintk("RPC: %s: freeing rqst %p (req %p)\n", 255 __func__, rqst, req); 256 257 rpcrdma_recv_buffer_put(req->rl_reply); 258 req->rl_reply = NULL; 259 260 spin_lock(&xprt->bc_pa_lock); 261 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 262 spin_unlock(&xprt->bc_pa_lock); 263 } 264 265 /** 266 * rpcrdma_bc_receive_call - Handle a backward direction call 267 * @r_xprt: transport receiving the call 268 * @rep: receive buffer containing the call 269 * 270 * Operational assumptions: 271 * o Backchannel credits are ignored, just as the NFS server 272 * forechannel currently does 273 * o The ULP manages a replay cache (eg, NFSv4.1 sessions). 274 * No replay detection is done at the transport level 275 */ 276 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, 277 struct rpcrdma_rep *rep) 278 { 279 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 280 struct svc_serv *bc_serv; 281 struct rpcrdma_req *req; 282 struct rpc_rqst *rqst; 283 struct xdr_buf *buf; 284 size_t size; 285 __be32 *p; 286 287 p = xdr_inline_decode(&rep->rr_stream, 0); 288 size = xdr_stream_remaining(&rep->rr_stream); 289 290 #ifdef RPCRDMA_BACKCHANNEL_DEBUG 291 pr_info("RPC: %s: callback XID %08x, length=%u\n", 292 __func__, be32_to_cpup(p), size); 293 pr_info("RPC: %s: %*ph\n", __func__, size, p); 294 #endif 295 296 /* Grab a free bc rqst */ 297 spin_lock(&xprt->bc_pa_lock); 298 if (list_empty(&xprt->bc_pa_list)) { 299 spin_unlock(&xprt->bc_pa_lock); 300 goto out_overflow; 301 } 302 rqst = list_first_entry(&xprt->bc_pa_list, 303 struct rpc_rqst, rq_bc_pa_list); 304 list_del(&rqst->rq_bc_pa_list); 305 spin_unlock(&xprt->bc_pa_lock); 306 307 /* Prepare rqst */ 308 rqst->rq_reply_bytes_recvd = 0; 309 rqst->rq_bytes_sent = 0; 310 rqst->rq_xid = *p; 311 312 rqst->rq_private_buf.len = size; 313 314 buf = &rqst->rq_rcv_buf; 315 memset(buf, 0, sizeof(*buf)); 316 buf->head[0].iov_base = p; 317 buf->head[0].iov_len = size; 318 buf->len = size; 319 320 /* The receive buffer has to be hooked to the rpcrdma_req 321 * so that it is not released while the req is pointing 322 * to its buffer, and so that it can be reposted after 323 * the Upper Layer is done decoding it. 324 */ 325 req = rpcr_to_rdmar(rqst); 326 req->rl_reply = rep; 327 trace_xprtrdma_cb_call(rqst); 328 329 /* Queue rqst for ULP's callback service */ 330 bc_serv = xprt->bc_serv; 331 spin_lock(&bc_serv->sv_cb_lock); 332 list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list); 333 spin_unlock(&bc_serv->sv_cb_lock); 334 335 wake_up(&bc_serv->sv_cb_waitq); 336 337 r_xprt->rx_stats.bcall_count++; 338 return; 339 340 out_overflow: 341 pr_warn("RPC/RDMA backchannel overflow\n"); 342 xprt_disconnect_done(xprt); 343 /* This receive buffer gets reposted automatically 344 * when the connection is re-established. 345 */ 346 return; 347 } 348