1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2015 Oracle. All rights reserved. 4 * 5 * Support for backward direction RPCs on RPC/RDMA. 6 */ 7 8 #include <linux/module.h> 9 #include <linux/sunrpc/xprt.h> 10 #include <linux/sunrpc/svc.h> 11 #include <linux/sunrpc/svc_xprt.h> 12 #include <linux/sunrpc/svc_rdma.h> 13 14 #include "xprt_rdma.h" 15 #include <trace/events/rpcrdma.h> 16 17 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 18 # define RPCDBG_FACILITY RPCDBG_TRANS 19 #endif 20 21 #undef RPCRDMA_BACKCHANNEL_DEBUG 22 23 static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, 24 struct rpc_rqst *rqst) 25 { 26 struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 27 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 28 29 spin_lock(&buf->rb_reqslock); 30 list_del(&req->rl_all); 31 spin_unlock(&buf->rb_reqslock); 32 33 rpcrdma_destroy_req(req); 34 } 35 36 static int rpcrdma_bc_setup_reqs(struct rpcrdma_xprt *r_xprt, 37 unsigned int count) 38 { 39 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 40 struct rpc_rqst *rqst; 41 unsigned int i; 42 43 for (i = 0; i < (count << 1); i++) { 44 struct rpcrdma_regbuf *rb; 45 struct rpcrdma_req *req; 46 size_t size; 47 48 req = rpcrdma_create_req(r_xprt); 49 if (IS_ERR(req)) 50 return PTR_ERR(req); 51 rqst = &req->rl_slot; 52 53 rqst->rq_xprt = xprt; 54 INIT_LIST_HEAD(&rqst->rq_list); 55 INIT_LIST_HEAD(&rqst->rq_bc_list); 56 __set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); 57 spin_lock_bh(&xprt->bc_pa_lock); 58 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 59 spin_unlock_bh(&xprt->bc_pa_lock); 60 61 size = r_xprt->rx_data.inline_rsize; 62 rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL); 63 if (IS_ERR(rb)) 64 goto out_fail; 65 req->rl_sendbuf = rb; 66 xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base, 67 min_t(size_t, size, PAGE_SIZE)); 68 } 69 return 0; 70 71 out_fail: 72 rpcrdma_bc_free_rqst(r_xprt, rqst); 73 return -ENOMEM; 74 } 75 76 /** 77 * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests 78 * @xprt: transport associated with these backchannel resources 79 * @reqs: number of concurrent incoming requests to expect 80 * 81 * Returns 0 on success; otherwise a negative errno 82 */ 83 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) 84 { 85 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 86 int rc; 87 88 /* The backchannel reply path returns each rpc_rqst to the 89 * bc_pa_list _after_ the reply is sent. If the server is 90 * faster than the client, it can send another backward 91 * direction request before the rpc_rqst is returned to the 92 * list. The client rejects the request in this case. 93 * 94 * Twice as many rpc_rqsts are prepared to ensure there is 95 * always an rpc_rqst available as soon as a reply is sent. 96 */ 97 if (reqs > RPCRDMA_BACKWARD_WRS >> 1) 98 goto out_err; 99 100 rc = rpcrdma_bc_setup_reqs(r_xprt, reqs); 101 if (rc) 102 goto out_free; 103 104 r_xprt->rx_buf.rb_bc_srv_max_requests = reqs; 105 request_module("svcrdma"); 106 trace_xprtrdma_cb_setup(r_xprt, reqs); 107 return 0; 108 109 out_free: 110 xprt_rdma_bc_destroy(xprt, reqs); 111 112 out_err: 113 pr_err("RPC: %s: setup backchannel transport failed\n", __func__); 114 return -ENOMEM; 115 } 116 117 /** 118 * xprt_rdma_bc_up - Create transport endpoint for backchannel service 119 * @serv: server endpoint 120 * @net: network namespace 121 * 122 * The "xprt" is an implied argument: it supplies the name of the 123 * backchannel transport class. 124 * 125 * Returns zero on success, negative errno on failure 126 */ 127 int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net) 128 { 129 int ret; 130 131 ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0); 132 if (ret < 0) 133 return ret; 134 return 0; 135 } 136 137 /** 138 * xprt_rdma_bc_maxpayload - Return maximum backchannel message size 139 * @xprt: transport 140 * 141 * Returns maximum size, in bytes, of a backchannel message 142 */ 143 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) 144 { 145 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 146 struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; 147 size_t maxmsg; 148 149 maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize); 150 maxmsg = min_t(unsigned int, maxmsg, PAGE_SIZE); 151 return maxmsg - RPCRDMA_HDRLEN_MIN; 152 } 153 154 static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) 155 { 156 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 157 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 158 __be32 *p; 159 160 rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0); 161 xdr_init_encode(&req->rl_stream, &req->rl_hdrbuf, 162 req->rl_rdmabuf->rg_base); 163 164 p = xdr_reserve_space(&req->rl_stream, 28); 165 if (unlikely(!p)) 166 return -EIO; 167 *p++ = rqst->rq_xid; 168 *p++ = rpcrdma_version; 169 *p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests); 170 *p++ = rdma_msg; 171 *p++ = xdr_zero; 172 *p++ = xdr_zero; 173 *p = xdr_zero; 174 175 if (rpcrdma_prepare_send_sges(r_xprt, req, RPCRDMA_HDRLEN_MIN, 176 &rqst->rq_snd_buf, rpcrdma_noch)) 177 return -EIO; 178 179 trace_xprtrdma_cb_reply(rqst); 180 return 0; 181 } 182 183 /** 184 * xprt_rdma_bc_send_reply - marshal and send a backchannel reply 185 * @rqst: RPC rqst with a backchannel RPC reply in rq_snd_buf 186 * 187 * Caller holds the transport's write lock. 188 * 189 * Returns: 190 * %0 if the RPC message has been sent 191 * %-ENOTCONN if the caller should reconnect and call again 192 * %-EIO if a permanent error occurred and the request was not 193 * sent. Do not try to send this message again. 194 */ 195 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst) 196 { 197 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 198 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 199 int rc; 200 201 if (!xprt_connected(rqst->rq_xprt)) 202 goto drop_connection; 203 204 rc = rpcrdma_bc_marshal_reply(rqst); 205 if (rc < 0) 206 goto failed_marshal; 207 208 rpcrdma_post_recvs(r_xprt, true); 209 if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) 210 goto drop_connection; 211 return 0; 212 213 failed_marshal: 214 if (rc != -ENOTCONN) 215 return rc; 216 drop_connection: 217 xprt_disconnect_done(rqst->rq_xprt); 218 return -ENOTCONN; 219 } 220 221 /** 222 * xprt_rdma_bc_destroy - Release resources for handling backchannel requests 223 * @xprt: transport associated with these backchannel resources 224 * @reqs: number of incoming requests to destroy; ignored 225 */ 226 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) 227 { 228 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 229 struct rpc_rqst *rqst, *tmp; 230 231 spin_lock_bh(&xprt->bc_pa_lock); 232 list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 233 list_del(&rqst->rq_bc_pa_list); 234 spin_unlock_bh(&xprt->bc_pa_lock); 235 236 rpcrdma_bc_free_rqst(r_xprt, rqst); 237 238 spin_lock_bh(&xprt->bc_pa_lock); 239 } 240 spin_unlock_bh(&xprt->bc_pa_lock); 241 } 242 243 /** 244 * xprt_rdma_bc_free_rqst - Release a backchannel rqst 245 * @rqst: request to release 246 */ 247 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) 248 { 249 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 250 struct rpc_xprt *xprt = rqst->rq_xprt; 251 252 dprintk("RPC: %s: freeing rqst %p (req %p)\n", 253 __func__, rqst, req); 254 255 rpcrdma_recv_buffer_put(req->rl_reply); 256 req->rl_reply = NULL; 257 258 spin_lock_bh(&xprt->bc_pa_lock); 259 list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 260 spin_unlock_bh(&xprt->bc_pa_lock); 261 } 262 263 /** 264 * rpcrdma_bc_receive_call - Handle a backward direction call 265 * @r_xprt: transport receiving the call 266 * @rep: receive buffer containing the call 267 * 268 * Operational assumptions: 269 * o Backchannel credits are ignored, just as the NFS server 270 * forechannel currently does 271 * o The ULP manages a replay cache (eg, NFSv4.1 sessions). 272 * No replay detection is done at the transport level 273 */ 274 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, 275 struct rpcrdma_rep *rep) 276 { 277 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 278 struct svc_serv *bc_serv; 279 struct rpcrdma_req *req; 280 struct rpc_rqst *rqst; 281 struct xdr_buf *buf; 282 size_t size; 283 __be32 *p; 284 285 p = xdr_inline_decode(&rep->rr_stream, 0); 286 size = xdr_stream_remaining(&rep->rr_stream); 287 288 #ifdef RPCRDMA_BACKCHANNEL_DEBUG 289 pr_info("RPC: %s: callback XID %08x, length=%u\n", 290 __func__, be32_to_cpup(p), size); 291 pr_info("RPC: %s: %*ph\n", __func__, size, p); 292 #endif 293 294 /* Grab a free bc rqst */ 295 spin_lock(&xprt->bc_pa_lock); 296 if (list_empty(&xprt->bc_pa_list)) { 297 spin_unlock(&xprt->bc_pa_lock); 298 goto out_overflow; 299 } 300 rqst = list_first_entry(&xprt->bc_pa_list, 301 struct rpc_rqst, rq_bc_pa_list); 302 list_del(&rqst->rq_bc_pa_list); 303 spin_unlock(&xprt->bc_pa_lock); 304 305 /* Prepare rqst */ 306 rqst->rq_reply_bytes_recvd = 0; 307 rqst->rq_bytes_sent = 0; 308 rqst->rq_xid = *p; 309 310 rqst->rq_private_buf.len = size; 311 312 buf = &rqst->rq_rcv_buf; 313 memset(buf, 0, sizeof(*buf)); 314 buf->head[0].iov_base = p; 315 buf->head[0].iov_len = size; 316 buf->len = size; 317 318 /* The receive buffer has to be hooked to the rpcrdma_req 319 * so that it is not released while the req is pointing 320 * to its buffer, and so that it can be reposted after 321 * the Upper Layer is done decoding it. 322 */ 323 req = rpcr_to_rdmar(rqst); 324 req->rl_reply = rep; 325 trace_xprtrdma_cb_call(rqst); 326 327 /* Queue rqst for ULP's callback service */ 328 bc_serv = xprt->bc_serv; 329 spin_lock(&bc_serv->sv_cb_lock); 330 list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list); 331 spin_unlock(&bc_serv->sv_cb_lock); 332 333 wake_up(&bc_serv->sv_cb_waitq); 334 335 r_xprt->rx_stats.bcall_count++; 336 return; 337 338 out_overflow: 339 pr_warn("RPC/RDMA backchannel overflow\n"); 340 xprt_disconnect_done(xprt); 341 /* This receive buffer gets reposted automatically 342 * when the connection is re-established. 343 */ 344 return; 345 } 346