1f531a5dbSChuck Lever /* 2f531a5dbSChuck Lever * Copyright (c) 2015 Oracle. All rights reserved. 3f531a5dbSChuck Lever * 4f531a5dbSChuck Lever * Support for backward direction RPCs on RPC/RDMA. 5f531a5dbSChuck Lever */ 6f531a5dbSChuck Lever 7f531a5dbSChuck Lever #include <linux/module.h> 8f531a5dbSChuck Lever 9f531a5dbSChuck Lever #include "xprt_rdma.h" 10f531a5dbSChuck Lever 11f531a5dbSChuck Lever #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 12f531a5dbSChuck Lever # define RPCDBG_FACILITY RPCDBG_TRANS 13f531a5dbSChuck Lever #endif 14f531a5dbSChuck Lever 15f531a5dbSChuck Lever static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, 16f531a5dbSChuck Lever struct rpc_rqst *rqst) 17f531a5dbSChuck Lever { 18f531a5dbSChuck Lever struct rpcrdma_buffer *buf = &r_xprt->rx_buf; 19f531a5dbSChuck Lever struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 20f531a5dbSChuck Lever 21f531a5dbSChuck Lever spin_lock(&buf->rb_reqslock); 22f531a5dbSChuck Lever list_del(&req->rl_all); 23f531a5dbSChuck Lever spin_unlock(&buf->rb_reqslock); 24f531a5dbSChuck Lever 25f531a5dbSChuck Lever rpcrdma_destroy_req(&r_xprt->rx_ia, req); 26f531a5dbSChuck Lever 27f531a5dbSChuck Lever kfree(rqst); 28f531a5dbSChuck Lever } 29f531a5dbSChuck Lever 30f531a5dbSChuck Lever static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, 31f531a5dbSChuck Lever struct rpc_rqst *rqst) 32f531a5dbSChuck Lever { 33f531a5dbSChuck Lever struct rpcrdma_ia *ia = &r_xprt->rx_ia; 34f531a5dbSChuck Lever struct rpcrdma_regbuf *rb; 35f531a5dbSChuck Lever struct rpcrdma_req *req; 36f531a5dbSChuck Lever struct xdr_buf *buf; 37f531a5dbSChuck Lever size_t size; 38f531a5dbSChuck Lever 39f531a5dbSChuck Lever req = rpcrdma_create_req(r_xprt); 40f531a5dbSChuck Lever if (!req) 41f531a5dbSChuck Lever return -ENOMEM; 42f531a5dbSChuck Lever req->rl_backchannel = true; 43f531a5dbSChuck Lever 44f531a5dbSChuck Lever size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); 45f531a5dbSChuck Lever rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); 46f531a5dbSChuck Lever if (IS_ERR(rb)) 47f531a5dbSChuck Lever goto out_fail; 48f531a5dbSChuck Lever req->rl_rdmabuf = rb; 49f531a5dbSChuck Lever 50f531a5dbSChuck Lever size += RPCRDMA_INLINE_READ_THRESHOLD(rqst); 51f531a5dbSChuck Lever rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); 52f531a5dbSChuck Lever if (IS_ERR(rb)) 53f531a5dbSChuck Lever goto out_fail; 54f531a5dbSChuck Lever rb->rg_owner = req; 55f531a5dbSChuck Lever req->rl_sendbuf = rb; 56f531a5dbSChuck Lever /* so that rpcr_to_rdmar works when receiving a request */ 57f531a5dbSChuck Lever rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base; 58f531a5dbSChuck Lever 59f531a5dbSChuck Lever buf = &rqst->rq_snd_buf; 60f531a5dbSChuck Lever buf->head[0].iov_base = rqst->rq_buffer; 61f531a5dbSChuck Lever buf->head[0].iov_len = 0; 62f531a5dbSChuck Lever buf->tail[0].iov_base = NULL; 63f531a5dbSChuck Lever buf->tail[0].iov_len = 0; 64f531a5dbSChuck Lever buf->page_len = 0; 65f531a5dbSChuck Lever buf->len = 0; 66f531a5dbSChuck Lever buf->buflen = size; 67f531a5dbSChuck Lever 68f531a5dbSChuck Lever return 0; 69f531a5dbSChuck Lever 70f531a5dbSChuck Lever out_fail: 71f531a5dbSChuck Lever rpcrdma_bc_free_rqst(r_xprt, rqst); 72f531a5dbSChuck Lever return -ENOMEM; 73f531a5dbSChuck Lever } 74f531a5dbSChuck Lever 75f531a5dbSChuck Lever /* Allocate and add receive buffers to the rpcrdma_buffer's 76f531a5dbSChuck Lever * existing list of rep's. These are released when the 77f531a5dbSChuck Lever * transport is destroyed. 78f531a5dbSChuck Lever */ 79f531a5dbSChuck Lever static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt, 80f531a5dbSChuck Lever unsigned int count) 81f531a5dbSChuck Lever { 82f531a5dbSChuck Lever struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; 83f531a5dbSChuck Lever struct rpcrdma_rep *rep; 84f531a5dbSChuck Lever unsigned long flags; 85f531a5dbSChuck Lever int rc = 0; 86f531a5dbSChuck Lever 87f531a5dbSChuck Lever while (count--) { 88f531a5dbSChuck Lever rep = rpcrdma_create_rep(r_xprt); 89f531a5dbSChuck Lever if (IS_ERR(rep)) { 90f531a5dbSChuck Lever pr_err("RPC: %s: reply buffer alloc failed\n", 91f531a5dbSChuck Lever __func__); 92f531a5dbSChuck Lever rc = PTR_ERR(rep); 93f531a5dbSChuck Lever break; 94f531a5dbSChuck Lever } 95f531a5dbSChuck Lever 96f531a5dbSChuck Lever spin_lock_irqsave(&buffers->rb_lock, flags); 97f531a5dbSChuck Lever list_add(&rep->rr_list, &buffers->rb_recv_bufs); 98f531a5dbSChuck Lever spin_unlock_irqrestore(&buffers->rb_lock, flags); 99f531a5dbSChuck Lever } 100f531a5dbSChuck Lever 101f531a5dbSChuck Lever return rc; 102f531a5dbSChuck Lever } 103f531a5dbSChuck Lever 104f531a5dbSChuck Lever /** 105f531a5dbSChuck Lever * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests 106f531a5dbSChuck Lever * @xprt: transport associated with these backchannel resources 107f531a5dbSChuck Lever * @reqs: number of concurrent incoming requests to expect 108f531a5dbSChuck Lever * 109f531a5dbSChuck Lever * Returns 0 on success; otherwise a negative errno 110f531a5dbSChuck Lever */ 111f531a5dbSChuck Lever int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) 112f531a5dbSChuck Lever { 113f531a5dbSChuck Lever struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 114f531a5dbSChuck Lever struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; 115f531a5dbSChuck Lever struct rpc_rqst *rqst; 116f531a5dbSChuck Lever unsigned int i; 117f531a5dbSChuck Lever int rc; 118f531a5dbSChuck Lever 119f531a5dbSChuck Lever /* The backchannel reply path returns each rpc_rqst to the 120f531a5dbSChuck Lever * bc_pa_list _after_ the reply is sent. If the server is 121f531a5dbSChuck Lever * faster than the client, it can send another backward 122f531a5dbSChuck Lever * direction request before the rpc_rqst is returned to the 123f531a5dbSChuck Lever * list. The client rejects the request in this case. 124f531a5dbSChuck Lever * 125f531a5dbSChuck Lever * Twice as many rpc_rqsts are prepared to ensure there is 126f531a5dbSChuck Lever * always an rpc_rqst available as soon as a reply is sent. 127f531a5dbSChuck Lever */ 128*124fa17dSChuck Lever if (reqs > RPCRDMA_BACKWARD_WRS >> 1) 129*124fa17dSChuck Lever goto out_err; 130*124fa17dSChuck Lever 131f531a5dbSChuck Lever for (i = 0; i < (reqs << 1); i++) { 132f531a5dbSChuck Lever rqst = kzalloc(sizeof(*rqst), GFP_KERNEL); 133f531a5dbSChuck Lever if (!rqst) { 134f531a5dbSChuck Lever pr_err("RPC: %s: Failed to create bc rpc_rqst\n", 135f531a5dbSChuck Lever __func__); 136f531a5dbSChuck Lever goto out_free; 137f531a5dbSChuck Lever } 138f531a5dbSChuck Lever 139f531a5dbSChuck Lever rqst->rq_xprt = &r_xprt->rx_xprt; 140f531a5dbSChuck Lever INIT_LIST_HEAD(&rqst->rq_list); 141f531a5dbSChuck Lever INIT_LIST_HEAD(&rqst->rq_bc_list); 142f531a5dbSChuck Lever 143f531a5dbSChuck Lever if (rpcrdma_bc_setup_rqst(r_xprt, rqst)) 144f531a5dbSChuck Lever goto out_free; 145f531a5dbSChuck Lever 146f531a5dbSChuck Lever spin_lock_bh(&xprt->bc_pa_lock); 147f531a5dbSChuck Lever list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 148f531a5dbSChuck Lever spin_unlock_bh(&xprt->bc_pa_lock); 149f531a5dbSChuck Lever } 150f531a5dbSChuck Lever 151f531a5dbSChuck Lever rc = rpcrdma_bc_setup_reps(r_xprt, reqs); 152f531a5dbSChuck Lever if (rc) 153f531a5dbSChuck Lever goto out_free; 154f531a5dbSChuck Lever 155f531a5dbSChuck Lever rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs); 156f531a5dbSChuck Lever if (rc) 157f531a5dbSChuck Lever goto out_free; 158f531a5dbSChuck Lever 159f531a5dbSChuck Lever buffer->rb_bc_srv_max_requests = reqs; 160f531a5dbSChuck Lever request_module("svcrdma"); 161f531a5dbSChuck Lever 162f531a5dbSChuck Lever return 0; 163f531a5dbSChuck Lever 164f531a5dbSChuck Lever out_free: 165f531a5dbSChuck Lever xprt_rdma_bc_destroy(xprt, reqs); 166f531a5dbSChuck Lever 167*124fa17dSChuck Lever out_err: 168f531a5dbSChuck Lever pr_err("RPC: %s: setup backchannel transport failed\n", __func__); 169f531a5dbSChuck Lever return -ENOMEM; 170f531a5dbSChuck Lever } 171f531a5dbSChuck Lever 172f531a5dbSChuck Lever /** 173f531a5dbSChuck Lever * xprt_rdma_bc_destroy - Release resources for handling backchannel requests 174f531a5dbSChuck Lever * @xprt: transport associated with these backchannel resources 175f531a5dbSChuck Lever * @reqs: number of incoming requests to destroy; ignored 176f531a5dbSChuck Lever */ 177f531a5dbSChuck Lever void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) 178f531a5dbSChuck Lever { 179f531a5dbSChuck Lever struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 180f531a5dbSChuck Lever struct rpc_rqst *rqst, *tmp; 181f531a5dbSChuck Lever 182f531a5dbSChuck Lever spin_lock_bh(&xprt->bc_pa_lock); 183f531a5dbSChuck Lever list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { 184f531a5dbSChuck Lever list_del(&rqst->rq_bc_pa_list); 185f531a5dbSChuck Lever spin_unlock_bh(&xprt->bc_pa_lock); 186f531a5dbSChuck Lever 187f531a5dbSChuck Lever rpcrdma_bc_free_rqst(r_xprt, rqst); 188f531a5dbSChuck Lever 189f531a5dbSChuck Lever spin_lock_bh(&xprt->bc_pa_lock); 190f531a5dbSChuck Lever } 191f531a5dbSChuck Lever spin_unlock_bh(&xprt->bc_pa_lock); 192f531a5dbSChuck Lever } 193f531a5dbSChuck Lever 194f531a5dbSChuck Lever /** 195f531a5dbSChuck Lever * xprt_rdma_bc_free_rqst - Release a backchannel rqst 196f531a5dbSChuck Lever * @rqst: request to release 197f531a5dbSChuck Lever */ 198f531a5dbSChuck Lever void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) 199f531a5dbSChuck Lever { 200f531a5dbSChuck Lever struct rpc_xprt *xprt = rqst->rq_xprt; 201f531a5dbSChuck Lever 202f531a5dbSChuck Lever smp_mb__before_atomic(); 203f531a5dbSChuck Lever WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)); 204f531a5dbSChuck Lever clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); 205f531a5dbSChuck Lever smp_mb__after_atomic(); 206f531a5dbSChuck Lever 207f531a5dbSChuck Lever spin_lock_bh(&xprt->bc_pa_lock); 208f531a5dbSChuck Lever list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); 209f531a5dbSChuck Lever spin_unlock_bh(&xprt->bc_pa_lock); 210f531a5dbSChuck Lever } 211