1f531a5dbSChuck Lever /* 2f531a5dbSChuck Lever * Copyright (c) 2015 Oracle. All rights reserved. 3f531a5dbSChuck Lever * 4f531a5dbSChuck Lever * Support for backward direction RPCs on RPC/RDMA. 5f531a5dbSChuck Lever */ 6f531a5dbSChuck Lever 7f531a5dbSChuck Lever #include <linux/module.h> 8f531a5dbSChuck Lever 9f531a5dbSChuck Lever #include "xprt_rdma.h" 10f531a5dbSChuck Lever 11f531a5dbSChuck Lever #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 12f531a5dbSChuck Lever # define RPCDBG_FACILITY RPCDBG_TRANS 13f531a5dbSChuck Lever #endif 14f531a5dbSChuck Lever 15f531a5dbSChuck Lever static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, 16f531a5dbSChuck Lever struct rpc_rqst *rqst) 17f531a5dbSChuck Lever { 18f531a5dbSChuck Lever struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 19f531a5dbSChuck Lever struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 20f531a5dbSChuck Lever 21f531a5dbSChuck Lever spin_lock(&buf->rb_reqslock); 22f531a5dbSChuck Lever list_del(&req->rl_all); 23f531a5dbSChuck Lever spin_unlock(&buf->rb_reqslock); 24f531a5dbSChuck Lever 25f531a5dbSChuck Lever rpcrdma_destroy_req(&r_xprt->rx_ia, req); 26f531a5dbSChuck Lever 27f531a5dbSChuck Lever kfree(rqst); 28f531a5dbSChuck Lever } 29f531a5dbSChuck Lever 30f531a5dbSChuck Lever static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, 31f531a5dbSChuck Lever struct rpc_rqst *rqst) 32f531a5dbSChuck Lever { 33f531a5dbSChuck Lever struct rpcrdma_ia *ia = &r_xprt->rx_ia; 34f531a5dbSChuck Lever struct rpcrdma_regbuf *rb; 35f531a5dbSChuck Lever struct rpcrdma_req *req; 36f531a5dbSChuck Lever struct xdr_buf *buf; 37f531a5dbSChuck Lever size_t size; 38f531a5dbSChuck Lever 39f531a5dbSChuck Lever req = rpcrdma_create_req(r_xprt); 40f531a5dbSChuck Lever if (!req) 41f531a5dbSChuck Lever return -ENOMEM; 42f531a5dbSChuck Lever req->rl_backchannel = true; 43f531a5dbSChuck Lever 44f531a5dbSChuck Lever size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); 45f531a5dbSChuck Lever rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); 46f531a5dbSChuck Lever if (IS_ERR(rb)) 47f531a5dbSChuck Lever goto out_fail; 48f531a5dbSChuck Lever req->rl_rdmabuf = rb; 49f531a5dbSChuck Lever 50f531a5dbSChuck Lever size += RPCRDMA_INLINE_READ_THRESHOLD(rqst); 51f531a5dbSChuck Lever rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); 52f531a5dbSChuck Lever if (IS_ERR(rb)) 53f531a5dbSChuck Lever goto out_fail; 54f531a5dbSChuck Lever rb->rg_owner = req; 55f531a5dbSChuck Lever req->rl_sendbuf = rb; 56f531a5dbSChuck Lever /* so that rpcr_to_rdmar works when receiving a request */ 57f531a5dbSChuck Lever rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base; 58f531a5dbSChuck Lever 59f531a5dbSChuck Lever buf = &rqst->rq_snd_buf; 60f531a5dbSChuck Lever buf->head[0].iov_base = rqst->rq_buffer; 61f531a5dbSChuck Lever buf->head[0].iov_len = 0; 62f531a5dbSChuck Lever buf->tail[0].iov_base = NULL; 63f531a5dbSChuck Lever buf->tail[0].iov_len = 0; 64f531a5dbSChuck Lever buf->page_len = 0; 65f531a5dbSChuck Lever buf->len = 0; 66f531a5dbSChuck Lever buf->buflen = size; 67f531a5dbSChuck Lever 68f531a5dbSChuck Lever return 0; 69f531a5dbSChuck Lever 70f531a5dbSChuck Lever out_fail: 71f531a5dbSChuck Lever rpcrdma_bc_free_rqst(r_xprt, rqst); 72f531a5dbSChuck Lever return -ENOMEM; 73f531a5dbSChuck Lever } 74f531a5dbSChuck Lever 75f531a5dbSChuck Lever /* Allocate and add receive buffers to the rpcrdma_buffer's 76f531a5dbSChuck Lever * existing list of rep's. These are released when the 77f531a5dbSChuck Lever * transport is destroyed. 78f531a5dbSChuck Lever */ 79f531a5dbSChuck Lever static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt, 80f531a5dbSChuck Lever unsigned int count) 81f531a5dbSChuck Lever { 82f531a5dbSChuck Lever struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 83f531a5dbSChuck Lever struct rpcrdma_rep *rep; 84f531a5dbSChuck Lever unsigned long flags; 85f531a5dbSChuck Lever int rc = 0; 86f531a5dbSChuck Lever 87f531a5dbSChuck Lever while (count--) { 88f531a5dbSChuck Lever rep = rpcrdma_create_rep(r_xprt); 89f531a5dbSChuck Lever if (IS_ERR(rep)) { 90f531a5dbSChuck Lever pr_err("RPC: %s: reply buffer alloc failed\n", 91f531a5dbSChuck Lever __func__); 92f531a5dbSChuck Lever rc = PTR_ERR(rep); 93f531a5dbSChuck Lever break; 94f531a5dbSChuck Lever } 95f531a5dbSChuck Lever 96f531a5dbSChuck Lever spin_lock_irqsave(&buffers->rb_lock, flags); 97f531a5dbSChuck Lever list_add(&rep->rr_list, &buffers->rb_recv_bufs); 98f531a5dbSChuck Lever spin_unlock_irqrestore(&buffers->rb_lock, flags); 99f531a5dbSChuck Lever } 100f531a5dbSChuck Lever 101f531a5dbSChuck Lever return rc; 102f531a5dbSChuck Lever } 103f531a5dbSChuck Lever 104f531a5dbSChuck Lever /** 105f531a5dbSChuck Lever * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests 106f531a5dbSChuck Lever * @xprt: transport associated with these backchannel resources 107f531a5dbSChuck Lever * @reqs: number of concurrent incoming requests to expect 108f531a5dbSChuck Lever * 109f531a5dbSChuck Lever * Returns 0 on success; otherwise a negative errno 110f531a5dbSChuck Lever */ 111f531a5dbSChuck Lever int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) 112f531a5dbSChuck Lever { 113f531a5dbSChuck Lever struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 114f531a5dbSChuck Lever struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 115f531a5dbSChuck Lever struct rpc_rqst *rqst; 116f531a5dbSChuck Lever unsigned int i; 117f531a5dbSChuck Lever int rc; 118f531a5dbSChuck Lever 119f531a5dbSChuck Lever /* The backchannel reply path returns each rpc_rqst to the 120f531a5dbSChuck Lever * bc_pa_list _after_ the reply is sent. If the server is 121f531a5dbSChuck Lever * faster than the client, it can send another backward 122f531a5dbSChuck Lever * direction request before the rpc_rqst is returned to the 123f531a5dbSChuck Lever * list. The client rejects the request in this case. 124f531a5dbSChuck Lever * 125f531a5dbSChuck Lever * Twice as many rpc_rqsts are prepared to ensure there is 126f531a5dbSChuck Lever * always an rpc_rqst available as soon as a reply is sent. 127f531a5dbSChuck Lever */ 128124fa17dSChuck Lever if (reqs > RPCRDMA_BACKWARD_WRS >> 1) 129124fa17dSChuck Lever goto out_err; 130124fa17dSChuck Lever 131f531a5dbSChuck Lever for (i = 0; i < (reqs << 1); i++) { 132f531a5dbSChuck Lever rqst = kzalloc(sizeof(*rqst), GFP_KERNEL); 133f531a5dbSChuck Lever if (!rqst) { 134f531a5dbSChuck Lever pr_err("RPC: %s: Failed to create bc rpc_rqst\n", 135f531a5dbSChuck Lever __func__); 136f531a5dbSChuck Lever goto out_free; 137f531a5dbSChuck Lever } 138f531a5dbSChuck Lever 139f531a5dbSChuck Lever rqst->rq_xprt = &r_xprt->rx_xprt; 140f531a5dbSChuck Lever INIT_LIST_HEAD(&rqst->rq_list); 141f531a5dbSChuck Lever INIT_LIST_HEAD(&rqst->rq_bc_list); 142f531a5dbSChuck Lever 143f531a5dbSChuck Lever if (rpcrdma_bc_setup_rqst(r_xprt, rqst)) 144f531a5dbSChuck Lever goto out_free; 145f531a5dbSChuck Lever 146f531a5dbSChuck Lever spin_lock_bh(&xprt->bc_pa_lock); 147f531a5dbSChuck Lever list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 148f531a5dbSChuck Lever spin_unlock_bh(&xprt->bc_pa_lock); 149f531a5dbSChuck Lever } 150f531a5dbSChuck Lever 151f531a5dbSChuck Lever rc = rpcrdma_bc_setup_reps(r_xprt, reqs); 152f531a5dbSChuck Lever if (rc) 153f531a5dbSChuck Lever goto out_free; 154f531a5dbSChuck Lever 155f531a5dbSChuck Lever rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs); 156f531a5dbSChuck Lever if (rc) 157f531a5dbSChuck Lever goto out_free; 158f531a5dbSChuck Lever 159f531a5dbSChuck Lever buffer->rb_bc_srv_max_requests = reqs; 160f531a5dbSChuck Lever request_module("svcrdma"); 161f531a5dbSChuck Lever 162f531a5dbSChuck Lever return 0; 163f531a5dbSChuck Lever 164f531a5dbSChuck Lever out_free: 165f531a5dbSChuck Lever xprt_rdma_bc_destroy(xprt, reqs); 166f531a5dbSChuck Lever 167124fa17dSChuck Lever out_err: 168f531a5dbSChuck Lever pr_err("RPC: %s: setup backchannel transport failed\n", __func__); 169f531a5dbSChuck Lever return -ENOMEM; 170f531a5dbSChuck Lever } 171f531a5dbSChuck Lever 172f531a5dbSChuck Lever /** 173*83128a60SChuck Lever * rpcrdma_bc_marshal_reply - Send backwards direction reply 174*83128a60SChuck Lever * @rqst: buffer containing RPC reply data 175*83128a60SChuck Lever * 176*83128a60SChuck Lever * Returns zero on success. 177*83128a60SChuck Lever */ 178*83128a60SChuck Lever int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) 179*83128a60SChuck Lever { 180*83128a60SChuck Lever struct rpc_xprt *xprt = rqst->rq_xprt; 181*83128a60SChuck Lever struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 182*83128a60SChuck Lever struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 183*83128a60SChuck Lever struct rpcrdma_msg *headerp; 184*83128a60SChuck Lever size_t rpclen; 185*83128a60SChuck Lever 186*83128a60SChuck Lever headerp = rdmab_to_msg(req->rl_rdmabuf); 187*83128a60SChuck Lever headerp->rm_xid = rqst->rq_xid; 188*83128a60SChuck Lever headerp->rm_vers = rpcrdma_version; 189*83128a60SChuck Lever headerp->rm_credit = 190*83128a60SChuck Lever cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests); 191*83128a60SChuck Lever headerp->rm_type = rdma_msg; 192*83128a60SChuck Lever headerp->rm_body.rm_chunks[0] = xdr_zero; 193*83128a60SChuck Lever headerp->rm_body.rm_chunks[1] = xdr_zero; 194*83128a60SChuck Lever headerp->rm_body.rm_chunks[2] = xdr_zero; 195*83128a60SChuck Lever 196*83128a60SChuck Lever rpclen = rqst->rq_svec[0].iov_len; 197*83128a60SChuck Lever 198*83128a60SChuck Lever pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n", 199*83128a60SChuck Lever __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf)); 200*83128a60SChuck Lever pr_info("RPC: %s: RPC/RDMA: %*ph\n", 201*83128a60SChuck Lever __func__, (int)RPCRDMA_HDRLEN_MIN, headerp); 202*83128a60SChuck Lever pr_info("RPC: %s: RPC: %*ph\n", 203*83128a60SChuck Lever __func__, (int)rpclen, rqst->rq_svec[0].iov_base); 204*83128a60SChuck Lever 205*83128a60SChuck Lever req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); 206*83128a60SChuck Lever req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN; 207*83128a60SChuck Lever req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); 208*83128a60SChuck Lever 209*83128a60SChuck Lever req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf); 210*83128a60SChuck Lever req->rl_send_iov[1].length = rpclen; 211*83128a60SChuck Lever req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf); 212*83128a60SChuck Lever 213*83128a60SChuck Lever req->rl_niovs = 2; 214*83128a60SChuck Lever return 0; 215*83128a60SChuck Lever } 216*83128a60SChuck Lever 217*83128a60SChuck Lever /** 218f531a5dbSChuck Lever * xprt_rdma_bc_destroy - Release resources for handling backchannel requests 219f531a5dbSChuck Lever * @xprt: transport associated with these backchannel resources 220f531a5dbSChuck Lever * @reqs: number of incoming requests to destroy; ignored 221f531a5dbSChuck Lever */ 222f531a5dbSChuck Lever void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) 223f531a5dbSChuck Lever { 224f531a5dbSChuck Lever struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 225f531a5dbSChuck Lever struct rpc_rqst *rqst, *tmp; 226f531a5dbSChuck Lever 227f531a5dbSChuck Lever spin_lock_bh(&xprt->bc_pa_lock); 228f531a5dbSChuck Lever list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 229f531a5dbSChuck Lever list_del(&rqst->rq_bc_pa_list); 230f531a5dbSChuck Lever spin_unlock_bh(&xprt->bc_pa_lock); 231f531a5dbSChuck Lever 232f531a5dbSChuck Lever rpcrdma_bc_free_rqst(r_xprt, rqst); 233f531a5dbSChuck Lever 234f531a5dbSChuck Lever spin_lock_bh(&xprt->bc_pa_lock); 235f531a5dbSChuck Lever } 236f531a5dbSChuck Lever spin_unlock_bh(&xprt->bc_pa_lock); 237f531a5dbSChuck Lever } 238f531a5dbSChuck Lever 239f531a5dbSChuck Lever /** 240f531a5dbSChuck Lever * xprt_rdma_bc_free_rqst - Release a backchannel rqst 241f531a5dbSChuck Lever * @rqst: request to release 242f531a5dbSChuck Lever */ 243f531a5dbSChuck Lever void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) 244f531a5dbSChuck Lever { 245f531a5dbSChuck Lever struct rpc_xprt *xprt = rqst->rq_xprt; 246f531a5dbSChuck Lever 247f531a5dbSChuck Lever smp_mb__before_atomic(); 248f531a5dbSChuck Lever WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)); 249f531a5dbSChuck Lever clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); 250f531a5dbSChuck Lever smp_mb__after_atomic(); 251f531a5dbSChuck Lever 252f531a5dbSChuck Lever spin_lock_bh(&xprt->bc_pa_lock); 253f531a5dbSChuck Lever list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 254f531a5dbSChuck Lever spin_unlock_bh(&xprt->bc_pa_lock); 255f531a5dbSChuck Lever } 256