1*78147ca8SChuck Lever // SPDX-License-Identifier: GPL-2.0
2*78147ca8SChuck Lever /*
3*78147ca8SChuck Lever * Copyright (c) 2020 Oracle. All rights reserved.
4*78147ca8SChuck Lever */
5*78147ca8SChuck Lever
6*78147ca8SChuck Lever #include <linux/sunrpc/svc_rdma.h>
7*78147ca8SChuck Lever #include <linux/sunrpc/rpc_rdma.h>
8*78147ca8SChuck Lever
9*78147ca8SChuck Lever #include "xprt_rdma.h"
10*78147ca8SChuck Lever #include <trace/events/rpcrdma.h>
11*78147ca8SChuck Lever
12*78147ca8SChuck Lever /**
13*78147ca8SChuck Lever * pcl_free - Release all memory associated with a parsed chunk list
14*78147ca8SChuck Lever * @pcl: parsed chunk list
15*78147ca8SChuck Lever *
16*78147ca8SChuck Lever */
pcl_free(struct svc_rdma_pcl * pcl)17*78147ca8SChuck Lever void pcl_free(struct svc_rdma_pcl *pcl)
18*78147ca8SChuck Lever {
19*78147ca8SChuck Lever while (!list_empty(&pcl->cl_chunks)) {
20*78147ca8SChuck Lever struct svc_rdma_chunk *chunk;
21*78147ca8SChuck Lever
22*78147ca8SChuck Lever chunk = pcl_first_chunk(pcl);
23*78147ca8SChuck Lever list_del(&chunk->ch_list);
24*78147ca8SChuck Lever kfree(chunk);
25*78147ca8SChuck Lever }
26*78147ca8SChuck Lever }
27*78147ca8SChuck Lever
pcl_alloc_chunk(u32 segcount,u32 position)28*78147ca8SChuck Lever static struct svc_rdma_chunk *pcl_alloc_chunk(u32 segcount, u32 position)
29*78147ca8SChuck Lever {
30*78147ca8SChuck Lever struct svc_rdma_chunk *chunk;
31*78147ca8SChuck Lever
32*78147ca8SChuck Lever chunk = kmalloc(struct_size(chunk, ch_segments, segcount), GFP_KERNEL);
33*78147ca8SChuck Lever if (!chunk)
34*78147ca8SChuck Lever return NULL;
35*78147ca8SChuck Lever
36*78147ca8SChuck Lever chunk->ch_position = position;
37*78147ca8SChuck Lever chunk->ch_length = 0;
38*78147ca8SChuck Lever chunk->ch_payload_length = 0;
39*78147ca8SChuck Lever chunk->ch_segcount = 0;
40*78147ca8SChuck Lever return chunk;
41*78147ca8SChuck Lever }
42*78147ca8SChuck Lever
43*78147ca8SChuck Lever static struct svc_rdma_chunk *
pcl_lookup_position(struct svc_rdma_pcl * pcl,u32 position)44*78147ca8SChuck Lever pcl_lookup_position(struct svc_rdma_pcl *pcl, u32 position)
45*78147ca8SChuck Lever {
46*78147ca8SChuck Lever struct svc_rdma_chunk *pos;
47*78147ca8SChuck Lever
48*78147ca8SChuck Lever pcl_for_each_chunk(pos, pcl) {
49*78147ca8SChuck Lever if (pos->ch_position == position)
50*78147ca8SChuck Lever return pos;
51*78147ca8SChuck Lever }
52*78147ca8SChuck Lever return NULL;
53*78147ca8SChuck Lever }
54*78147ca8SChuck Lever
pcl_insert_position(struct svc_rdma_pcl * pcl,struct svc_rdma_chunk * chunk)55*78147ca8SChuck Lever static void pcl_insert_position(struct svc_rdma_pcl *pcl,
56*78147ca8SChuck Lever struct svc_rdma_chunk *chunk)
57*78147ca8SChuck Lever {
58*78147ca8SChuck Lever struct svc_rdma_chunk *pos;
59*78147ca8SChuck Lever
60*78147ca8SChuck Lever pcl_for_each_chunk(pos, pcl) {
61*78147ca8SChuck Lever if (pos->ch_position > chunk->ch_position)
62*78147ca8SChuck Lever break;
63*78147ca8SChuck Lever }
64*78147ca8SChuck Lever __list_add(&chunk->ch_list, pos->ch_list.prev, &pos->ch_list);
65*78147ca8SChuck Lever pcl->cl_count++;
66*78147ca8SChuck Lever }
67*78147ca8SChuck Lever
pcl_set_read_segment(const struct svc_rdma_recv_ctxt * rctxt,struct svc_rdma_chunk * chunk,u32 handle,u32 length,u64 offset)68*78147ca8SChuck Lever static void pcl_set_read_segment(const struct svc_rdma_recv_ctxt *rctxt,
69*78147ca8SChuck Lever struct svc_rdma_chunk *chunk,
70*78147ca8SChuck Lever u32 handle, u32 length, u64 offset)
71*78147ca8SChuck Lever {
72*78147ca8SChuck Lever struct svc_rdma_segment *segment;
73*78147ca8SChuck Lever
74*78147ca8SChuck Lever segment = &chunk->ch_segments[chunk->ch_segcount];
75*78147ca8SChuck Lever segment->rs_handle = handle;
76*78147ca8SChuck Lever segment->rs_length = length;
77*78147ca8SChuck Lever segment->rs_offset = offset;
78*78147ca8SChuck Lever
79*78147ca8SChuck Lever trace_svcrdma_decode_rseg(&rctxt->rc_cid, chunk, segment);
80*78147ca8SChuck Lever
81*78147ca8SChuck Lever chunk->ch_length += length;
82*78147ca8SChuck Lever chunk->ch_segcount++;
83*78147ca8SChuck Lever }
84*78147ca8SChuck Lever
85*78147ca8SChuck Lever /**
86*78147ca8SChuck Lever * pcl_alloc_call - Construct a parsed chunk list for the Call body
87*78147ca8SChuck Lever * @rctxt: Ingress receive context
88*78147ca8SChuck Lever * @p: Start of an un-decoded Read list
89*78147ca8SChuck Lever *
90*78147ca8SChuck Lever * Assumptions:
91*78147ca8SChuck Lever * - The incoming Read list has already been sanity checked.
92*78147ca8SChuck Lever * - cl_count is already set to the number of segments in
93*78147ca8SChuck Lever * the un-decoded list.
94*78147ca8SChuck Lever * - The list might not be in order by position.
95*78147ca8SChuck Lever *
96*78147ca8SChuck Lever * Return values:
97*78147ca8SChuck Lever * %true: Parsed chunk list was successfully constructed, and
98*78147ca8SChuck Lever * cl_count is updated to be the number of chunks (ie.
99*78147ca8SChuck Lever * unique positions) in the Read list.
100*78147ca8SChuck Lever * %false: Memory allocation failed.
101*78147ca8SChuck Lever */
pcl_alloc_call(struct svc_rdma_recv_ctxt * rctxt,__be32 * p)102*78147ca8SChuck Lever bool pcl_alloc_call(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
103*78147ca8SChuck Lever {
104*78147ca8SChuck Lever struct svc_rdma_pcl *pcl = &rctxt->rc_call_pcl;
105*78147ca8SChuck Lever unsigned int i, segcount = pcl->cl_count;
106*78147ca8SChuck Lever
107*78147ca8SChuck Lever pcl->cl_count = 0;
108*78147ca8SChuck Lever for (i = 0; i < segcount; i++) {
109*78147ca8SChuck Lever struct svc_rdma_chunk *chunk;
110*78147ca8SChuck Lever u32 position, handle, length;
111*78147ca8SChuck Lever u64 offset;
112*78147ca8SChuck Lever
113*78147ca8SChuck Lever p++; /* skip the list discriminator */
114*78147ca8SChuck Lever p = xdr_decode_read_segment(p, &position, &handle,
115*78147ca8SChuck Lever &length, &offset);
116*78147ca8SChuck Lever if (position != 0)
117*78147ca8SChuck Lever continue;
118*78147ca8SChuck Lever
119*78147ca8SChuck Lever if (pcl_is_empty(pcl)) {
120*78147ca8SChuck Lever chunk = pcl_alloc_chunk(segcount, position);
121*78147ca8SChuck Lever if (!chunk)
122*78147ca8SChuck Lever return false;
123*78147ca8SChuck Lever pcl_insert_position(pcl, chunk);
124*78147ca8SChuck Lever } else {
125*78147ca8SChuck Lever chunk = list_first_entry(&pcl->cl_chunks,
126*78147ca8SChuck Lever struct svc_rdma_chunk,
127*78147ca8SChuck Lever ch_list);
128*78147ca8SChuck Lever }
129*78147ca8SChuck Lever
130*78147ca8SChuck Lever pcl_set_read_segment(rctxt, chunk, handle, length, offset);
131*78147ca8SChuck Lever }
132*78147ca8SChuck Lever
133*78147ca8SChuck Lever return true;
134*78147ca8SChuck Lever }
135*78147ca8SChuck Lever
136*78147ca8SChuck Lever /**
137*78147ca8SChuck Lever * pcl_alloc_read - Construct a parsed chunk list for normal Read chunks
138*78147ca8SChuck Lever * @rctxt: Ingress receive context
139*78147ca8SChuck Lever * @p: Start of an un-decoded Read list
140*78147ca8SChuck Lever *
141*78147ca8SChuck Lever * Assumptions:
142*78147ca8SChuck Lever * - The incoming Read list has already been sanity checked.
143*78147ca8SChuck Lever * - cl_count is already set to the number of segments in
144*78147ca8SChuck Lever * the un-decoded list.
145*78147ca8SChuck Lever * - The list might not be in order by position.
146*78147ca8SChuck Lever *
147*78147ca8SChuck Lever * Return values:
148*78147ca8SChuck Lever * %true: Parsed chunk list was successfully constructed, and
149*78147ca8SChuck Lever * cl_count is updated to be the number of chunks (ie.
150*78147ca8SChuck Lever * unique position values) in the Read list.
151*78147ca8SChuck Lever * %false: Memory allocation failed.
152*78147ca8SChuck Lever *
153*78147ca8SChuck Lever * TODO:
154*78147ca8SChuck Lever * - Check for chunk range overlaps
155*78147ca8SChuck Lever */
pcl_alloc_read(struct svc_rdma_recv_ctxt * rctxt,__be32 * p)156*78147ca8SChuck Lever bool pcl_alloc_read(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
157*78147ca8SChuck Lever {
158*78147ca8SChuck Lever struct svc_rdma_pcl *pcl = &rctxt->rc_read_pcl;
159*78147ca8SChuck Lever unsigned int i, segcount = pcl->cl_count;
160*78147ca8SChuck Lever
161*78147ca8SChuck Lever pcl->cl_count = 0;
162*78147ca8SChuck Lever for (i = 0; i < segcount; i++) {
163*78147ca8SChuck Lever struct svc_rdma_chunk *chunk;
164*78147ca8SChuck Lever u32 position, handle, length;
165*78147ca8SChuck Lever u64 offset;
166*78147ca8SChuck Lever
167*78147ca8SChuck Lever p++; /* skip the list discriminator */
168*78147ca8SChuck Lever p = xdr_decode_read_segment(p, &position, &handle,
169*78147ca8SChuck Lever &length, &offset);
170*78147ca8SChuck Lever if (position == 0)
171*78147ca8SChuck Lever continue;
172*78147ca8SChuck Lever
173*78147ca8SChuck Lever chunk = pcl_lookup_position(pcl, position);
174*78147ca8SChuck Lever if (!chunk) {
175*78147ca8SChuck Lever chunk = pcl_alloc_chunk(segcount, position);
176*78147ca8SChuck Lever if (!chunk)
177*78147ca8SChuck Lever return false;
178*78147ca8SChuck Lever pcl_insert_position(pcl, chunk);
179*78147ca8SChuck Lever }
180*78147ca8SChuck Lever
181*78147ca8SChuck Lever pcl_set_read_segment(rctxt, chunk, handle, length, offset);
182*78147ca8SChuck Lever }
183*78147ca8SChuck Lever
184*78147ca8SChuck Lever return true;
185*78147ca8SChuck Lever }
186*78147ca8SChuck Lever
187*78147ca8SChuck Lever /**
188*78147ca8SChuck Lever * pcl_alloc_write - Construct a parsed chunk list from a Write list
189*78147ca8SChuck Lever * @rctxt: Ingress receive context
190*78147ca8SChuck Lever * @pcl: Parsed chunk list to populate
191*78147ca8SChuck Lever * @p: Start of an un-decoded Write list
192*78147ca8SChuck Lever *
193*78147ca8SChuck Lever * Assumptions:
194*78147ca8SChuck Lever * - The incoming Write list has already been sanity checked, and
195*78147ca8SChuck Lever * - cl_count is set to the number of chunks in the un-decoded list.
196*78147ca8SChuck Lever *
197*78147ca8SChuck Lever * Return values:
198*78147ca8SChuck Lever * %true: Parsed chunk list was successfully constructed.
199*78147ca8SChuck Lever * %false: Memory allocation failed.
200*78147ca8SChuck Lever */
pcl_alloc_write(struct svc_rdma_recv_ctxt * rctxt,struct svc_rdma_pcl * pcl,__be32 * p)201*78147ca8SChuck Lever bool pcl_alloc_write(struct svc_rdma_recv_ctxt *rctxt,
202*78147ca8SChuck Lever struct svc_rdma_pcl *pcl, __be32 *p)
203*78147ca8SChuck Lever {
204*78147ca8SChuck Lever struct svc_rdma_segment *segment;
205*78147ca8SChuck Lever struct svc_rdma_chunk *chunk;
206*78147ca8SChuck Lever unsigned int i, j;
207*78147ca8SChuck Lever u32 segcount;
208*78147ca8SChuck Lever
209*78147ca8SChuck Lever for (i = 0; i < pcl->cl_count; i++) {
210*78147ca8SChuck Lever p++; /* skip the list discriminator */
211*78147ca8SChuck Lever segcount = be32_to_cpup(p++);
212*78147ca8SChuck Lever
213*78147ca8SChuck Lever chunk = pcl_alloc_chunk(segcount, 0);
214*78147ca8SChuck Lever if (!chunk)
215*78147ca8SChuck Lever return false;
216*78147ca8SChuck Lever list_add_tail(&chunk->ch_list, &pcl->cl_chunks);
217*78147ca8SChuck Lever
218*78147ca8SChuck Lever for (j = 0; j < segcount; j++) {
219*78147ca8SChuck Lever segment = &chunk->ch_segments[j];
220*78147ca8SChuck Lever p = xdr_decode_rdma_segment(p, &segment->rs_handle,
221*78147ca8SChuck Lever &segment->rs_length,
222*78147ca8SChuck Lever &segment->rs_offset);
223*78147ca8SChuck Lever trace_svcrdma_decode_wseg(&rctxt->rc_cid, chunk, j);
224*78147ca8SChuck Lever
225*78147ca8SChuck Lever chunk->ch_length += segment->rs_length;
226*78147ca8SChuck Lever chunk->ch_segcount++;
227*78147ca8SChuck Lever }
228*78147ca8SChuck Lever }
229*78147ca8SChuck Lever return true;
230*78147ca8SChuck Lever }
231*78147ca8SChuck Lever
pcl_process_region(const struct xdr_buf * xdr,unsigned int offset,unsigned int length,int (* actor)(const struct xdr_buf *,void *),void * data)232*78147ca8SChuck Lever static int pcl_process_region(const struct xdr_buf *xdr,
233*78147ca8SChuck Lever unsigned int offset, unsigned int length,
234*78147ca8SChuck Lever int (*actor)(const struct xdr_buf *, void *),
235*78147ca8SChuck Lever void *data)
236*78147ca8SChuck Lever {
237*78147ca8SChuck Lever struct xdr_buf subbuf;
238*78147ca8SChuck Lever
239*78147ca8SChuck Lever if (!length)
240*78147ca8SChuck Lever return 0;
241*78147ca8SChuck Lever if (xdr_buf_subsegment(xdr, &subbuf, offset, length))
242*78147ca8SChuck Lever return -EMSGSIZE;
243*78147ca8SChuck Lever return actor(&subbuf, data);
244*78147ca8SChuck Lever }
245*78147ca8SChuck Lever
246*78147ca8SChuck Lever /**
247*78147ca8SChuck Lever * pcl_process_nonpayloads - Process non-payload regions inside @xdr
248*78147ca8SChuck Lever * @pcl: Chunk list to process
249*78147ca8SChuck Lever * @xdr: xdr_buf to process
250*78147ca8SChuck Lever * @actor: Function to invoke on each non-payload region
251*78147ca8SChuck Lever * @data: Arguments for @actor
252*78147ca8SChuck Lever *
253*78147ca8SChuck Lever * This mechanism must ignore not only result payloads that were already
254*78147ca8SChuck Lever * sent via RDMA Write, but also XDR padding for those payloads that
255*78147ca8SChuck Lever * the upper layer has added.
256*78147ca8SChuck Lever *
257*78147ca8SChuck Lever * Assumptions:
258*78147ca8SChuck Lever * The xdr->len and ch_position fields are aligned to 4-byte multiples.
259*78147ca8SChuck Lever *
260*78147ca8SChuck Lever * Returns:
261*78147ca8SChuck Lever * On success, zero,
262*78147ca8SChuck Lever * %-EMSGSIZE on XDR buffer overflow, or
263*78147ca8SChuck Lever * The return value of @actor
264*78147ca8SChuck Lever */
pcl_process_nonpayloads(const struct svc_rdma_pcl * pcl,const struct xdr_buf * xdr,int (* actor)(const struct xdr_buf *,void *),void * data)265*78147ca8SChuck Lever int pcl_process_nonpayloads(const struct svc_rdma_pcl *pcl,
266*78147ca8SChuck Lever const struct xdr_buf *xdr,
267*78147ca8SChuck Lever int (*actor)(const struct xdr_buf *, void *),
268*78147ca8SChuck Lever void *data)
269*78147ca8SChuck Lever {
270*78147ca8SChuck Lever struct svc_rdma_chunk *chunk, *next;
271*78147ca8SChuck Lever unsigned int start;
272*78147ca8SChuck Lever int ret;
273*78147ca8SChuck Lever
274*78147ca8SChuck Lever chunk = pcl_first_chunk(pcl);
275*78147ca8SChuck Lever
276*78147ca8SChuck Lever /* No result payloads were generated */
277*78147ca8SChuck Lever if (!chunk || !chunk->ch_payload_length)
278*78147ca8SChuck Lever return actor(xdr, data);
279*78147ca8SChuck Lever
280*78147ca8SChuck Lever /* Process the region before the first result payload */
281*78147ca8SChuck Lever ret = pcl_process_region(xdr, 0, chunk->ch_position, actor, data);
282*78147ca8SChuck Lever if (ret < 0)
283*78147ca8SChuck Lever return ret;
284*78147ca8SChuck Lever
285*78147ca8SChuck Lever /* Process the regions between each middle result payload */
286*78147ca8SChuck Lever while ((next = pcl_next_chunk(pcl, chunk))) {
287*78147ca8SChuck Lever if (!next->ch_payload_length)
288*78147ca8SChuck Lever break;
289*78147ca8SChuck Lever
290*78147ca8SChuck Lever start = pcl_chunk_end_offset(chunk);
291*78147ca8SChuck Lever ret = pcl_process_region(xdr, start, next->ch_position - start,
292*78147ca8SChuck Lever actor, data);
293*78147ca8SChuck Lever if (ret < 0)
294*78147ca8SChuck Lever return ret;
295*78147ca8SChuck Lever
296*78147ca8SChuck Lever chunk = next;
297*78147ca8SChuck Lever }
298*78147ca8SChuck Lever
299*78147ca8SChuck Lever /* Process the region after the last result payload */
300*78147ca8SChuck Lever start = pcl_chunk_end_offset(chunk);
301*78147ca8SChuck Lever ret = pcl_process_region(xdr, start, xdr->len - start, actor, data);
302*78147ca8SChuck Lever if (ret < 0)
303*78147ca8SChuck Lever return ret;
304*78147ca8SChuck Lever
305*78147ca8SChuck Lever return 0;
306*78147ca8SChuck Lever }
307