xref: /openbmc/linux/net/ceph/osd_client.c (revision ed4543328f7108e1047b83b96ca7f7208747d930)
1  // SPDX-License-Identifier: GPL-2.0
2  
3  #include <linux/ceph/ceph_debug.h>
4  
5  #include <linux/module.h>
6  #include <linux/err.h>
7  #include <linux/highmem.h>
8  #include <linux/mm.h>
9  #include <linux/pagemap.h>
10  #include <linux/slab.h>
11  #include <linux/uaccess.h>
12  #ifdef CONFIG_BLOCK
13  #include <linux/bio.h>
14  #endif
15  
16  #include <linux/ceph/ceph_features.h>
17  #include <linux/ceph/libceph.h>
18  #include <linux/ceph/osd_client.h>
19  #include <linux/ceph/messenger.h>
20  #include <linux/ceph/decode.h>
21  #include <linux/ceph/auth.h>
22  #include <linux/ceph/pagelist.h>
23  #include <linux/ceph/striper.h>
24  
25  #define OSD_OPREPLY_FRONT_LEN	512
26  
27  static struct kmem_cache	*ceph_osd_request_cache;
28  
29  static const struct ceph_connection_operations osd_con_ops;
30  
31  /*
32   * Implement client access to distributed object storage cluster.
33   *
34   * All data objects are stored within a cluster/cloud of OSDs, or
35   * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
36   * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
37   * remote daemons serving up and coordinating consistent and safe
38   * access to storage.
39   *
40   * Cluster membership and the mapping of data objects onto storage devices
41   * are described by the osd map.
42   *
43   * We keep track of pending OSD requests (read, write), resubmit
44   * requests to different OSDs when the cluster topology/data layout
45   * change, or retry the affected requests when the communications
46   * channel with an OSD is reset.
47   */
48  
49  static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
50  static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
51  static void link_linger(struct ceph_osd *osd,
52  			struct ceph_osd_linger_request *lreq);
53  static void unlink_linger(struct ceph_osd *osd,
54  			  struct ceph_osd_linger_request *lreq);
55  static void clear_backoffs(struct ceph_osd *osd);
56  
57  #if 1
rwsem_is_wrlocked(struct rw_semaphore * sem)58  static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
59  {
60  	bool wrlocked = true;
61  
62  	if (unlikely(down_read_trylock(sem))) {
63  		wrlocked = false;
64  		up_read(sem);
65  	}
66  
67  	return wrlocked;
68  }
verify_osdc_locked(struct ceph_osd_client * osdc)69  static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
70  {
71  	WARN_ON(!rwsem_is_locked(&osdc->lock));
72  }
verify_osdc_wrlocked(struct ceph_osd_client * osdc)73  static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
74  {
75  	WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
76  }
verify_osd_locked(struct ceph_osd * osd)77  static inline void verify_osd_locked(struct ceph_osd *osd)
78  {
79  	struct ceph_osd_client *osdc = osd->o_osdc;
80  
81  	WARN_ON(!(mutex_is_locked(&osd->lock) &&
82  		  rwsem_is_locked(&osdc->lock)) &&
83  		!rwsem_is_wrlocked(&osdc->lock));
84  }
verify_lreq_locked(struct ceph_osd_linger_request * lreq)85  static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
86  {
87  	WARN_ON(!mutex_is_locked(&lreq->lock));
88  }
89  #else
verify_osdc_locked(struct ceph_osd_client * osdc)90  static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
verify_osdc_wrlocked(struct ceph_osd_client * osdc)91  static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
verify_osd_locked(struct ceph_osd * osd)92  static inline void verify_osd_locked(struct ceph_osd *osd) { }
verify_lreq_locked(struct ceph_osd_linger_request * lreq)93  static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
94  #endif
95  
96  /*
97   * calculate the mapping of a file extent onto an object, and fill out the
98   * request accordingly.  shorten extent as necessary if it crosses an
99   * object boundary.
100   *
101   * fill osd op in request message.
102   */
calc_layout(struct ceph_file_layout * layout,u64 off,u64 * plen,u64 * objnum,u64 * objoff,u64 * objlen)103  static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
104  			u64 *objnum, u64 *objoff, u64 *objlen)
105  {
106  	u64 orig_len = *plen;
107  	u32 xlen;
108  
109  	/* object extent? */
110  	ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
111  					  objoff, &xlen);
112  	*objlen = xlen;
113  	if (*objlen < orig_len) {
114  		*plen = *objlen;
115  		dout(" skipping last %llu, final file extent %llu~%llu\n",
116  		     orig_len - *plen, off, *plen);
117  	}
118  
119  	dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
120  	return 0;
121  }
122  
ceph_osd_data_init(struct ceph_osd_data * osd_data)123  static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
124  {
125  	memset(osd_data, 0, sizeof (*osd_data));
126  	osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
127  }
128  
129  /*
130   * Consumes @pages if @own_pages is true.
131   */
ceph_osd_data_pages_init(struct ceph_osd_data * osd_data,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)132  static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
133  			struct page **pages, u64 length, u32 alignment,
134  			bool pages_from_pool, bool own_pages)
135  {
136  	osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
137  	osd_data->pages = pages;
138  	osd_data->length = length;
139  	osd_data->alignment = alignment;
140  	osd_data->pages_from_pool = pages_from_pool;
141  	osd_data->own_pages = own_pages;
142  }
143  
144  /*
145   * Consumes a ref on @pagelist.
146   */
ceph_osd_data_pagelist_init(struct ceph_osd_data * osd_data,struct ceph_pagelist * pagelist)147  static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
148  			struct ceph_pagelist *pagelist)
149  {
150  	osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
151  	osd_data->pagelist = pagelist;
152  }
153  
154  #ifdef CONFIG_BLOCK
ceph_osd_data_bio_init(struct ceph_osd_data * osd_data,struct ceph_bio_iter * bio_pos,u32 bio_length)155  static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
156  				   struct ceph_bio_iter *bio_pos,
157  				   u32 bio_length)
158  {
159  	osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
160  	osd_data->bio_pos = *bio_pos;
161  	osd_data->bio_length = bio_length;
162  }
163  #endif /* CONFIG_BLOCK */
164  
ceph_osd_data_bvecs_init(struct ceph_osd_data * osd_data,struct ceph_bvec_iter * bvec_pos,u32 num_bvecs)165  static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
166  				     struct ceph_bvec_iter *bvec_pos,
167  				     u32 num_bvecs)
168  {
169  	osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
170  	osd_data->bvec_pos = *bvec_pos;
171  	osd_data->num_bvecs = num_bvecs;
172  }
173  
ceph_osd_iter_init(struct ceph_osd_data * osd_data,struct iov_iter * iter)174  static void ceph_osd_iter_init(struct ceph_osd_data *osd_data,
175  			       struct iov_iter *iter)
176  {
177  	osd_data->type = CEPH_OSD_DATA_TYPE_ITER;
178  	osd_data->iter = *iter;
179  }
180  
181  static struct ceph_osd_data *
osd_req_op_raw_data_in(struct ceph_osd_request * osd_req,unsigned int which)182  osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
183  {
184  	BUG_ON(which >= osd_req->r_num_ops);
185  
186  	return &osd_req->r_ops[which].raw_data_in;
187  }
188  
189  struct ceph_osd_data *
osd_req_op_extent_osd_data(struct ceph_osd_request * osd_req,unsigned int which)190  osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
191  			unsigned int which)
192  {
193  	return osd_req_op_data(osd_req, which, extent, osd_data);
194  }
195  EXPORT_SYMBOL(osd_req_op_extent_osd_data);
196  
osd_req_op_raw_data_in_pages(struct ceph_osd_request * osd_req,unsigned int which,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)197  void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
198  			unsigned int which, struct page **pages,
199  			u64 length, u32 alignment,
200  			bool pages_from_pool, bool own_pages)
201  {
202  	struct ceph_osd_data *osd_data;
203  
204  	osd_data = osd_req_op_raw_data_in(osd_req, which);
205  	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
206  				pages_from_pool, own_pages);
207  }
208  EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
209  
osd_req_op_extent_osd_data_pages(struct ceph_osd_request * osd_req,unsigned int which,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)210  void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
211  			unsigned int which, struct page **pages,
212  			u64 length, u32 alignment,
213  			bool pages_from_pool, bool own_pages)
214  {
215  	struct ceph_osd_data *osd_data;
216  
217  	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
218  	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
219  				pages_from_pool, own_pages);
220  }
221  EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
222  
osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_pagelist * pagelist)223  void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
224  			unsigned int which, struct ceph_pagelist *pagelist)
225  {
226  	struct ceph_osd_data *osd_data;
227  
228  	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
229  	ceph_osd_data_pagelist_init(osd_data, pagelist);
230  }
231  EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
232  
233  #ifdef CONFIG_BLOCK
osd_req_op_extent_osd_data_bio(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_bio_iter * bio_pos,u32 bio_length)234  void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
235  				    unsigned int which,
236  				    struct ceph_bio_iter *bio_pos,
237  				    u32 bio_length)
238  {
239  	struct ceph_osd_data *osd_data;
240  
241  	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
242  	ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
243  }
244  EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
245  #endif /* CONFIG_BLOCK */
246  
osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request * osd_req,unsigned int which,struct bio_vec * bvecs,u32 num_bvecs,u32 bytes)247  void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req,
248  				      unsigned int which,
249  				      struct bio_vec *bvecs, u32 num_bvecs,
250  				      u32 bytes)
251  {
252  	struct ceph_osd_data *osd_data;
253  	struct ceph_bvec_iter it = {
254  		.bvecs = bvecs,
255  		.iter = { .bi_size = bytes },
256  	};
257  
258  	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
259  	ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
260  }
261  EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvecs);
262  
osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_bvec_iter * bvec_pos)263  void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
264  					 unsigned int which,
265  					 struct ceph_bvec_iter *bvec_pos)
266  {
267  	struct ceph_osd_data *osd_data;
268  
269  	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
270  	ceph_osd_data_bvecs_init(osd_data, bvec_pos, 0);
271  }
272  EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
273  
274  /**
275   * osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer
276   * @osd_req: The request to set up
277   * @which: Index of the operation in which to set the iter
278   * @iter: The buffer iterator
279   */
osd_req_op_extent_osd_iter(struct ceph_osd_request * osd_req,unsigned int which,struct iov_iter * iter)280  void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req,
281  				unsigned int which, struct iov_iter *iter)
282  {
283  	struct ceph_osd_data *osd_data;
284  
285  	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
286  	ceph_osd_iter_init(osd_data, iter);
287  }
288  EXPORT_SYMBOL(osd_req_op_extent_osd_iter);
289  
osd_req_op_cls_request_info_pagelist(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_pagelist * pagelist)290  static void osd_req_op_cls_request_info_pagelist(
291  			struct ceph_osd_request *osd_req,
292  			unsigned int which, struct ceph_pagelist *pagelist)
293  {
294  	struct ceph_osd_data *osd_data;
295  
296  	osd_data = osd_req_op_data(osd_req, which, cls, request_info);
297  	ceph_osd_data_pagelist_init(osd_data, pagelist);
298  }
299  
osd_req_op_cls_request_data_pagelist(struct ceph_osd_request * osd_req,unsigned int which,struct ceph_pagelist * pagelist)300  void osd_req_op_cls_request_data_pagelist(
301  			struct ceph_osd_request *osd_req,
302  			unsigned int which, struct ceph_pagelist *pagelist)
303  {
304  	struct ceph_osd_data *osd_data;
305  
306  	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
307  	ceph_osd_data_pagelist_init(osd_data, pagelist);
308  	osd_req->r_ops[which].cls.indata_len += pagelist->length;
309  	osd_req->r_ops[which].indata_len += pagelist->length;
310  }
311  EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
312  
osd_req_op_cls_request_data_pages(struct ceph_osd_request * osd_req,unsigned int which,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)313  void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
314  			unsigned int which, struct page **pages, u64 length,
315  			u32 alignment, bool pages_from_pool, bool own_pages)
316  {
317  	struct ceph_osd_data *osd_data;
318  
319  	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
320  	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
321  				pages_from_pool, own_pages);
322  	osd_req->r_ops[which].cls.indata_len += length;
323  	osd_req->r_ops[which].indata_len += length;
324  }
325  EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
326  
osd_req_op_cls_request_data_bvecs(struct ceph_osd_request * osd_req,unsigned int which,struct bio_vec * bvecs,u32 num_bvecs,u32 bytes)327  void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
328  				       unsigned int which,
329  				       struct bio_vec *bvecs, u32 num_bvecs,
330  				       u32 bytes)
331  {
332  	struct ceph_osd_data *osd_data;
333  	struct ceph_bvec_iter it = {
334  		.bvecs = bvecs,
335  		.iter = { .bi_size = bytes },
336  	};
337  
338  	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
339  	ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
340  	osd_req->r_ops[which].cls.indata_len += bytes;
341  	osd_req->r_ops[which].indata_len += bytes;
342  }
343  EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
344  
osd_req_op_cls_response_data_pages(struct ceph_osd_request * osd_req,unsigned int which,struct page ** pages,u64 length,u32 alignment,bool pages_from_pool,bool own_pages)345  void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
346  			unsigned int which, struct page **pages, u64 length,
347  			u32 alignment, bool pages_from_pool, bool own_pages)
348  {
349  	struct ceph_osd_data *osd_data;
350  
351  	osd_data = osd_req_op_data(osd_req, which, cls, response_data);
352  	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
353  				pages_from_pool, own_pages);
354  }
355  EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
356  
ceph_osd_data_length(struct ceph_osd_data * osd_data)357  static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
358  {
359  	switch (osd_data->type) {
360  	case CEPH_OSD_DATA_TYPE_NONE:
361  		return 0;
362  	case CEPH_OSD_DATA_TYPE_PAGES:
363  		return osd_data->length;
364  	case CEPH_OSD_DATA_TYPE_PAGELIST:
365  		return (u64)osd_data->pagelist->length;
366  #ifdef CONFIG_BLOCK
367  	case CEPH_OSD_DATA_TYPE_BIO:
368  		return (u64)osd_data->bio_length;
369  #endif /* CONFIG_BLOCK */
370  	case CEPH_OSD_DATA_TYPE_BVECS:
371  		return osd_data->bvec_pos.iter.bi_size;
372  	case CEPH_OSD_DATA_TYPE_ITER:
373  		return iov_iter_count(&osd_data->iter);
374  	default:
375  		WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
376  		return 0;
377  	}
378  }
379  
ceph_osd_data_release(struct ceph_osd_data * osd_data)380  static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
381  {
382  	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
383  		int num_pages;
384  
385  		num_pages = calc_pages_for((u64)osd_data->alignment,
386  						(u64)osd_data->length);
387  		ceph_release_page_vector(osd_data->pages, num_pages);
388  	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
389  		ceph_pagelist_release(osd_data->pagelist);
390  	}
391  	ceph_osd_data_init(osd_data);
392  }
393  
osd_req_op_data_release(struct ceph_osd_request * osd_req,unsigned int which)394  static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
395  			unsigned int which)
396  {
397  	struct ceph_osd_req_op *op;
398  
399  	BUG_ON(which >= osd_req->r_num_ops);
400  	op = &osd_req->r_ops[which];
401  
402  	switch (op->op) {
403  	case CEPH_OSD_OP_READ:
404  	case CEPH_OSD_OP_SPARSE_READ:
405  	case CEPH_OSD_OP_WRITE:
406  	case CEPH_OSD_OP_WRITEFULL:
407  		kfree(op->extent.sparse_ext);
408  		ceph_osd_data_release(&op->extent.osd_data);
409  		break;
410  	case CEPH_OSD_OP_CALL:
411  		ceph_osd_data_release(&op->cls.request_info);
412  		ceph_osd_data_release(&op->cls.request_data);
413  		ceph_osd_data_release(&op->cls.response_data);
414  		break;
415  	case CEPH_OSD_OP_SETXATTR:
416  	case CEPH_OSD_OP_CMPXATTR:
417  		ceph_osd_data_release(&op->xattr.osd_data);
418  		break;
419  	case CEPH_OSD_OP_STAT:
420  		ceph_osd_data_release(&op->raw_data_in);
421  		break;
422  	case CEPH_OSD_OP_NOTIFY_ACK:
423  		ceph_osd_data_release(&op->notify_ack.request_data);
424  		break;
425  	case CEPH_OSD_OP_NOTIFY:
426  		ceph_osd_data_release(&op->notify.request_data);
427  		ceph_osd_data_release(&op->notify.response_data);
428  		break;
429  	case CEPH_OSD_OP_LIST_WATCHERS:
430  		ceph_osd_data_release(&op->list_watchers.response_data);
431  		break;
432  	case CEPH_OSD_OP_COPY_FROM2:
433  		ceph_osd_data_release(&op->copy_from.osd_data);
434  		break;
435  	default:
436  		break;
437  	}
438  }
439  
440  /*
441   * Assumes @t is zero-initialized.
442   */
target_init(struct ceph_osd_request_target * t)443  static void target_init(struct ceph_osd_request_target *t)
444  {
445  	ceph_oid_init(&t->base_oid);
446  	ceph_oloc_init(&t->base_oloc);
447  	ceph_oid_init(&t->target_oid);
448  	ceph_oloc_init(&t->target_oloc);
449  
450  	ceph_osds_init(&t->acting);
451  	ceph_osds_init(&t->up);
452  	t->size = -1;
453  	t->min_size = -1;
454  
455  	t->osd = CEPH_HOMELESS_OSD;
456  }
457  
target_copy(struct ceph_osd_request_target * dest,const struct ceph_osd_request_target * src)458  static void target_copy(struct ceph_osd_request_target *dest,
459  			const struct ceph_osd_request_target *src)
460  {
461  	ceph_oid_copy(&dest->base_oid, &src->base_oid);
462  	ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
463  	ceph_oid_copy(&dest->target_oid, &src->target_oid);
464  	ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
465  
466  	dest->pgid = src->pgid; /* struct */
467  	dest->spgid = src->spgid; /* struct */
468  	dest->pg_num = src->pg_num;
469  	dest->pg_num_mask = src->pg_num_mask;
470  	ceph_osds_copy(&dest->acting, &src->acting);
471  	ceph_osds_copy(&dest->up, &src->up);
472  	dest->size = src->size;
473  	dest->min_size = src->min_size;
474  	dest->sort_bitwise = src->sort_bitwise;
475  	dest->recovery_deletes = src->recovery_deletes;
476  
477  	dest->flags = src->flags;
478  	dest->used_replica = src->used_replica;
479  	dest->paused = src->paused;
480  
481  	dest->epoch = src->epoch;
482  	dest->last_force_resend = src->last_force_resend;
483  
484  	dest->osd = src->osd;
485  }
486  
target_destroy(struct ceph_osd_request_target * t)487  static void target_destroy(struct ceph_osd_request_target *t)
488  {
489  	ceph_oid_destroy(&t->base_oid);
490  	ceph_oloc_destroy(&t->base_oloc);
491  	ceph_oid_destroy(&t->target_oid);
492  	ceph_oloc_destroy(&t->target_oloc);
493  }
494  
495  /*
496   * requests
497   */
request_release_checks(struct ceph_osd_request * req)498  static void request_release_checks(struct ceph_osd_request *req)
499  {
500  	WARN_ON(!RB_EMPTY_NODE(&req->r_node));
501  	WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
502  	WARN_ON(!list_empty(&req->r_private_item));
503  	WARN_ON(req->r_osd);
504  }
505  
ceph_osdc_release_request(struct kref * kref)506  static void ceph_osdc_release_request(struct kref *kref)
507  {
508  	struct ceph_osd_request *req = container_of(kref,
509  					    struct ceph_osd_request, r_kref);
510  	unsigned int which;
511  
512  	dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
513  	     req->r_request, req->r_reply);
514  	request_release_checks(req);
515  
516  	if (req->r_request)
517  		ceph_msg_put(req->r_request);
518  	if (req->r_reply)
519  		ceph_msg_put(req->r_reply);
520  
521  	for (which = 0; which < req->r_num_ops; which++)
522  		osd_req_op_data_release(req, which);
523  
524  	target_destroy(&req->r_t);
525  	ceph_put_snap_context(req->r_snapc);
526  
527  	if (req->r_mempool)
528  		mempool_free(req, req->r_osdc->req_mempool);
529  	else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
530  		kmem_cache_free(ceph_osd_request_cache, req);
531  	else
532  		kfree(req);
533  }
534  
ceph_osdc_get_request(struct ceph_osd_request * req)535  void ceph_osdc_get_request(struct ceph_osd_request *req)
536  {
537  	dout("%s %p (was %d)\n", __func__, req,
538  	     kref_read(&req->r_kref));
539  	kref_get(&req->r_kref);
540  }
541  EXPORT_SYMBOL(ceph_osdc_get_request);
542  
ceph_osdc_put_request(struct ceph_osd_request * req)543  void ceph_osdc_put_request(struct ceph_osd_request *req)
544  {
545  	if (req) {
546  		dout("%s %p (was %d)\n", __func__, req,
547  		     kref_read(&req->r_kref));
548  		kref_put(&req->r_kref, ceph_osdc_release_request);
549  	}
550  }
551  EXPORT_SYMBOL(ceph_osdc_put_request);
552  
request_init(struct ceph_osd_request * req)553  static void request_init(struct ceph_osd_request *req)
554  {
555  	/* req only, each op is zeroed in osd_req_op_init() */
556  	memset(req, 0, sizeof(*req));
557  
558  	kref_init(&req->r_kref);
559  	init_completion(&req->r_completion);
560  	RB_CLEAR_NODE(&req->r_node);
561  	RB_CLEAR_NODE(&req->r_mc_node);
562  	INIT_LIST_HEAD(&req->r_private_item);
563  
564  	target_init(&req->r_t);
565  }
566  
ceph_osdc_alloc_request(struct ceph_osd_client * osdc,struct ceph_snap_context * snapc,unsigned int num_ops,bool use_mempool,gfp_t gfp_flags)567  struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
568  					       struct ceph_snap_context *snapc,
569  					       unsigned int num_ops,
570  					       bool use_mempool,
571  					       gfp_t gfp_flags)
572  {
573  	struct ceph_osd_request *req;
574  
575  	if (use_mempool) {
576  		BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
577  		req = mempool_alloc(osdc->req_mempool, gfp_flags);
578  	} else if (num_ops <= CEPH_OSD_SLAB_OPS) {
579  		req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
580  	} else {
581  		BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
582  		req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags);
583  	}
584  	if (unlikely(!req))
585  		return NULL;
586  
587  	request_init(req);
588  	req->r_osdc = osdc;
589  	req->r_mempool = use_mempool;
590  	req->r_num_ops = num_ops;
591  	req->r_snapid = CEPH_NOSNAP;
592  	req->r_snapc = ceph_get_snap_context(snapc);
593  
594  	dout("%s req %p\n", __func__, req);
595  	return req;
596  }
597  EXPORT_SYMBOL(ceph_osdc_alloc_request);
598  
ceph_oloc_encoding_size(const struct ceph_object_locator * oloc)599  static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
600  {
601  	return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
602  }
603  
__ceph_osdc_alloc_messages(struct ceph_osd_request * req,gfp_t gfp,int num_request_data_items,int num_reply_data_items)604  static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
605  				      int num_request_data_items,
606  				      int num_reply_data_items)
607  {
608  	struct ceph_osd_client *osdc = req->r_osdc;
609  	struct ceph_msg *msg;
610  	int msg_size;
611  
612  	WARN_ON(req->r_request || req->r_reply);
613  	WARN_ON(ceph_oid_empty(&req->r_base_oid));
614  	WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
615  
616  	/* create request message */
617  	msg_size = CEPH_ENCODING_START_BLK_LEN +
618  			CEPH_PGID_ENCODING_LEN + 1; /* spgid */
619  	msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
620  	msg_size += CEPH_ENCODING_START_BLK_LEN +
621  			sizeof(struct ceph_osd_reqid); /* reqid */
622  	msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
623  	msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
624  	msg_size += CEPH_ENCODING_START_BLK_LEN +
625  			ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
626  	msg_size += 4 + req->r_base_oid.name_len; /* oid */
627  	msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
628  	msg_size += 8; /* snapid */
629  	msg_size += 8; /* snap_seq */
630  	msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
631  	msg_size += 4 + 8; /* retry_attempt, features */
632  
633  	if (req->r_mempool)
634  		msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
635  				       num_request_data_items);
636  	else
637  		msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
638  				    num_request_data_items, gfp, true);
639  	if (!msg)
640  		return -ENOMEM;
641  
642  	memset(msg->front.iov_base, 0, msg->front.iov_len);
643  	req->r_request = msg;
644  
645  	/* create reply message */
646  	msg_size = OSD_OPREPLY_FRONT_LEN;
647  	msg_size += req->r_base_oid.name_len;
648  	msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
649  
650  	if (req->r_mempool)
651  		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
652  				       num_reply_data_items);
653  	else
654  		msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
655  				    num_reply_data_items, gfp, true);
656  	if (!msg)
657  		return -ENOMEM;
658  
659  	req->r_reply = msg;
660  
661  	return 0;
662  }
663  
osd_req_opcode_valid(u16 opcode)664  static bool osd_req_opcode_valid(u16 opcode)
665  {
666  	switch (opcode) {
667  #define GENERATE_CASE(op, opcode, str)	case CEPH_OSD_OP_##op: return true;
668  __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
669  #undef GENERATE_CASE
670  	default:
671  		return false;
672  	}
673  }
674  
get_num_data_items(struct ceph_osd_request * req,int * num_request_data_items,int * num_reply_data_items)675  static void get_num_data_items(struct ceph_osd_request *req,
676  			       int *num_request_data_items,
677  			       int *num_reply_data_items)
678  {
679  	struct ceph_osd_req_op *op;
680  
681  	*num_request_data_items = 0;
682  	*num_reply_data_items = 0;
683  
684  	for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
685  		switch (op->op) {
686  		/* request */
687  		case CEPH_OSD_OP_WRITE:
688  		case CEPH_OSD_OP_WRITEFULL:
689  		case CEPH_OSD_OP_SETXATTR:
690  		case CEPH_OSD_OP_CMPXATTR:
691  		case CEPH_OSD_OP_NOTIFY_ACK:
692  		case CEPH_OSD_OP_COPY_FROM2:
693  			*num_request_data_items += 1;
694  			break;
695  
696  		/* reply */
697  		case CEPH_OSD_OP_STAT:
698  		case CEPH_OSD_OP_READ:
699  		case CEPH_OSD_OP_SPARSE_READ:
700  		case CEPH_OSD_OP_LIST_WATCHERS:
701  			*num_reply_data_items += 1;
702  			break;
703  
704  		/* both */
705  		case CEPH_OSD_OP_NOTIFY:
706  			*num_request_data_items += 1;
707  			*num_reply_data_items += 1;
708  			break;
709  		case CEPH_OSD_OP_CALL:
710  			*num_request_data_items += 2;
711  			*num_reply_data_items += 1;
712  			break;
713  
714  		default:
715  			WARN_ON(!osd_req_opcode_valid(op->op));
716  			break;
717  		}
718  	}
719  }
720  
721  /*
722   * oid, oloc and OSD op opcode(s) must be filled in before this function
723   * is called.
724   */
ceph_osdc_alloc_messages(struct ceph_osd_request * req,gfp_t gfp)725  int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
726  {
727  	int num_request_data_items, num_reply_data_items;
728  
729  	get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
730  	return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
731  					  num_reply_data_items);
732  }
733  EXPORT_SYMBOL(ceph_osdc_alloc_messages);
734  
735  /*
736   * This is an osd op init function for opcodes that have no data or
737   * other information associated with them.  It also serves as a
738   * common init routine for all the other init functions, below.
739   */
740  struct ceph_osd_req_op *
osd_req_op_init(struct ceph_osd_request * osd_req,unsigned int which,u16 opcode,u32 flags)741  osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
742  		 u16 opcode, u32 flags)
743  {
744  	struct ceph_osd_req_op *op;
745  
746  	BUG_ON(which >= osd_req->r_num_ops);
747  	BUG_ON(!osd_req_opcode_valid(opcode));
748  
749  	op = &osd_req->r_ops[which];
750  	memset(op, 0, sizeof (*op));
751  	op->op = opcode;
752  	op->flags = flags;
753  
754  	return op;
755  }
756  EXPORT_SYMBOL(osd_req_op_init);
757  
osd_req_op_extent_init(struct ceph_osd_request * osd_req,unsigned int which,u16 opcode,u64 offset,u64 length,u64 truncate_size,u32 truncate_seq)758  void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
759  				unsigned int which, u16 opcode,
760  				u64 offset, u64 length,
761  				u64 truncate_size, u32 truncate_seq)
762  {
763  	struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
764  						     opcode, 0);
765  	size_t payload_len = 0;
766  
767  	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
768  	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
769  	       opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);
770  
771  	op->extent.offset = offset;
772  	op->extent.length = length;
773  	op->extent.truncate_size = truncate_size;
774  	op->extent.truncate_seq = truncate_seq;
775  	if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
776  		payload_len += length;
777  
778  	op->indata_len = payload_len;
779  }
780  EXPORT_SYMBOL(osd_req_op_extent_init);
781  
osd_req_op_extent_update(struct ceph_osd_request * osd_req,unsigned int which,u64 length)782  void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
783  				unsigned int which, u64 length)
784  {
785  	struct ceph_osd_req_op *op;
786  	u64 previous;
787  
788  	BUG_ON(which >= osd_req->r_num_ops);
789  	op = &osd_req->r_ops[which];
790  	previous = op->extent.length;
791  
792  	if (length == previous)
793  		return;		/* Nothing to do */
794  	BUG_ON(length > previous);
795  
796  	op->extent.length = length;
797  	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
798  		op->indata_len -= previous - length;
799  }
800  EXPORT_SYMBOL(osd_req_op_extent_update);
801  
osd_req_op_extent_dup_last(struct ceph_osd_request * osd_req,unsigned int which,u64 offset_inc)802  void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
803  				unsigned int which, u64 offset_inc)
804  {
805  	struct ceph_osd_req_op *op, *prev_op;
806  
807  	BUG_ON(which + 1 >= osd_req->r_num_ops);
808  
809  	prev_op = &osd_req->r_ops[which];
810  	op = osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
811  	/* dup previous one */
812  	op->indata_len = prev_op->indata_len;
813  	op->outdata_len = prev_op->outdata_len;
814  	op->extent = prev_op->extent;
815  	/* adjust offset */
816  	op->extent.offset += offset_inc;
817  	op->extent.length -= offset_inc;
818  
819  	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
820  		op->indata_len -= offset_inc;
821  }
822  EXPORT_SYMBOL(osd_req_op_extent_dup_last);
823  
osd_req_op_cls_init(struct ceph_osd_request * osd_req,unsigned int which,const char * class,const char * method)824  int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
825  			const char *class, const char *method)
826  {
827  	struct ceph_osd_req_op *op;
828  	struct ceph_pagelist *pagelist;
829  	size_t payload_len = 0;
830  	size_t size;
831  	int ret;
832  
833  	op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);
834  
835  	pagelist = ceph_pagelist_alloc(GFP_NOFS);
836  	if (!pagelist)
837  		return -ENOMEM;
838  
839  	op->cls.class_name = class;
840  	size = strlen(class);
841  	BUG_ON(size > (size_t) U8_MAX);
842  	op->cls.class_len = size;
843  	ret = ceph_pagelist_append(pagelist, class, size);
844  	if (ret)
845  		goto err_pagelist_free;
846  	payload_len += size;
847  
848  	op->cls.method_name = method;
849  	size = strlen(method);
850  	BUG_ON(size > (size_t) U8_MAX);
851  	op->cls.method_len = size;
852  	ret = ceph_pagelist_append(pagelist, method, size);
853  	if (ret)
854  		goto err_pagelist_free;
855  	payload_len += size;
856  
857  	osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
858  	op->indata_len = payload_len;
859  	return 0;
860  
861  err_pagelist_free:
862  	ceph_pagelist_release(pagelist);
863  	return ret;
864  }
865  EXPORT_SYMBOL(osd_req_op_cls_init);
866  
osd_req_op_xattr_init(struct ceph_osd_request * osd_req,unsigned int which,u16 opcode,const char * name,const void * value,size_t size,u8 cmp_op,u8 cmp_mode)867  int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
868  			  u16 opcode, const char *name, const void *value,
869  			  size_t size, u8 cmp_op, u8 cmp_mode)
870  {
871  	struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
872  						     opcode, 0);
873  	struct ceph_pagelist *pagelist;
874  	size_t payload_len;
875  	int ret;
876  
877  	BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
878  
879  	pagelist = ceph_pagelist_alloc(GFP_NOFS);
880  	if (!pagelist)
881  		return -ENOMEM;
882  
883  	payload_len = strlen(name);
884  	op->xattr.name_len = payload_len;
885  	ret = ceph_pagelist_append(pagelist, name, payload_len);
886  	if (ret)
887  		goto err_pagelist_free;
888  
889  	op->xattr.value_len = size;
890  	ret = ceph_pagelist_append(pagelist, value, size);
891  	if (ret)
892  		goto err_pagelist_free;
893  	payload_len += size;
894  
895  	op->xattr.cmp_op = cmp_op;
896  	op->xattr.cmp_mode = cmp_mode;
897  
898  	ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
899  	op->indata_len = payload_len;
900  	return 0;
901  
902  err_pagelist_free:
903  	ceph_pagelist_release(pagelist);
904  	return ret;
905  }
906  EXPORT_SYMBOL(osd_req_op_xattr_init);
907  
908  /*
909   * @watch_opcode: CEPH_OSD_WATCH_OP_*
910   */
osd_req_op_watch_init(struct ceph_osd_request * req,int which,u8 watch_opcode,u64 cookie,u32 gen)911  static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
912  				  u8 watch_opcode, u64 cookie, u32 gen)
913  {
914  	struct ceph_osd_req_op *op;
915  
916  	op = osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
917  	op->watch.cookie = cookie;
918  	op->watch.op = watch_opcode;
919  	op->watch.gen = gen;
920  }
921  
922  /*
923   * prot_ver, timeout and notify payload (may be empty) should already be
924   * encoded in @request_pl
925   */
osd_req_op_notify_init(struct ceph_osd_request * req,int which,u64 cookie,struct ceph_pagelist * request_pl)926  static void osd_req_op_notify_init(struct ceph_osd_request *req, int which,
927  				   u64 cookie, struct ceph_pagelist *request_pl)
928  {
929  	struct ceph_osd_req_op *op;
930  
931  	op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
932  	op->notify.cookie = cookie;
933  
934  	ceph_osd_data_pagelist_init(&op->notify.request_data, request_pl);
935  	op->indata_len = request_pl->length;
936  }
937  
938  /*
939   * @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_*
940   */
osd_req_op_alloc_hint_init(struct ceph_osd_request * osd_req,unsigned int which,u64 expected_object_size,u64 expected_write_size,u32 flags)941  void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
942  				unsigned int which,
943  				u64 expected_object_size,
944  				u64 expected_write_size,
945  				u32 flags)
946  {
947  	struct ceph_osd_req_op *op;
948  
949  	op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_SETALLOCHINT, 0);
950  	op->alloc_hint.expected_object_size = expected_object_size;
951  	op->alloc_hint.expected_write_size = expected_write_size;
952  	op->alloc_hint.flags = flags;
953  
954  	/*
955  	 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
956  	 * not worth a feature bit.  Set FAILOK per-op flag to make
957  	 * sure older osds don't trip over an unsupported opcode.
958  	 */
959  	op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
960  }
961  EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
962  
ceph_osdc_msg_data_add(struct ceph_msg * msg,struct ceph_osd_data * osd_data)963  static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
964  				struct ceph_osd_data *osd_data)
965  {
966  	u64 length = ceph_osd_data_length(osd_data);
967  
968  	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
969  		BUG_ON(length > (u64) SIZE_MAX);
970  		if (length)
971  			ceph_msg_data_add_pages(msg, osd_data->pages,
972  					length, osd_data->alignment, false);
973  	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
974  		BUG_ON(!length);
975  		ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
976  #ifdef CONFIG_BLOCK
977  	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
978  		ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
979  #endif
980  	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
981  		ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
982  	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) {
983  		ceph_msg_data_add_iter(msg, &osd_data->iter);
984  	} else {
985  		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
986  	}
987  }
988  
osd_req_encode_op(struct ceph_osd_op * dst,const struct ceph_osd_req_op * src)989  static u32 osd_req_encode_op(struct ceph_osd_op *dst,
990  			     const struct ceph_osd_req_op *src)
991  {
992  	switch (src->op) {
993  	case CEPH_OSD_OP_STAT:
994  		break;
995  	case CEPH_OSD_OP_READ:
996  	case CEPH_OSD_OP_SPARSE_READ:
997  	case CEPH_OSD_OP_WRITE:
998  	case CEPH_OSD_OP_WRITEFULL:
999  	case CEPH_OSD_OP_ZERO:
1000  	case CEPH_OSD_OP_TRUNCATE:
1001  		dst->extent.offset = cpu_to_le64(src->extent.offset);
1002  		dst->extent.length = cpu_to_le64(src->extent.length);
1003  		dst->extent.truncate_size =
1004  			cpu_to_le64(src->extent.truncate_size);
1005  		dst->extent.truncate_seq =
1006  			cpu_to_le32(src->extent.truncate_seq);
1007  		break;
1008  	case CEPH_OSD_OP_CALL:
1009  		dst->cls.class_len = src->cls.class_len;
1010  		dst->cls.method_len = src->cls.method_len;
1011  		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
1012  		break;
1013  	case CEPH_OSD_OP_WATCH:
1014  		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
1015  		dst->watch.ver = cpu_to_le64(0);
1016  		dst->watch.op = src->watch.op;
1017  		dst->watch.gen = cpu_to_le32(src->watch.gen);
1018  		break;
1019  	case CEPH_OSD_OP_NOTIFY_ACK:
1020  		break;
1021  	case CEPH_OSD_OP_NOTIFY:
1022  		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
1023  		break;
1024  	case CEPH_OSD_OP_LIST_WATCHERS:
1025  		break;
1026  	case CEPH_OSD_OP_SETALLOCHINT:
1027  		dst->alloc_hint.expected_object_size =
1028  		    cpu_to_le64(src->alloc_hint.expected_object_size);
1029  		dst->alloc_hint.expected_write_size =
1030  		    cpu_to_le64(src->alloc_hint.expected_write_size);
1031  		dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags);
1032  		break;
1033  	case CEPH_OSD_OP_SETXATTR:
1034  	case CEPH_OSD_OP_CMPXATTR:
1035  		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
1036  		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
1037  		dst->xattr.cmp_op = src->xattr.cmp_op;
1038  		dst->xattr.cmp_mode = src->xattr.cmp_mode;
1039  		break;
1040  	case CEPH_OSD_OP_CREATE:
1041  	case CEPH_OSD_OP_DELETE:
1042  		break;
1043  	case CEPH_OSD_OP_COPY_FROM2:
1044  		dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
1045  		dst->copy_from.src_version =
1046  			cpu_to_le64(src->copy_from.src_version);
1047  		dst->copy_from.flags = src->copy_from.flags;
1048  		dst->copy_from.src_fadvise_flags =
1049  			cpu_to_le32(src->copy_from.src_fadvise_flags);
1050  		break;
1051  	case CEPH_OSD_OP_ASSERT_VER:
1052  		dst->assert_ver.unused = cpu_to_le64(0);
1053  		dst->assert_ver.ver = cpu_to_le64(src->assert_ver.ver);
1054  		break;
1055  	default:
1056  		pr_err("unsupported osd opcode %s\n",
1057  			ceph_osd_op_name(src->op));
1058  		WARN_ON(1);
1059  
1060  		return 0;
1061  	}
1062  
1063  	dst->op = cpu_to_le16(src->op);
1064  	dst->flags = cpu_to_le32(src->flags);
1065  	dst->payload_len = cpu_to_le32(src->indata_len);
1066  
1067  	return src->indata_len;
1068  }
1069  
1070  /*
1071   * build new request AND message, calculate layout, and adjust file
1072   * extent as needed.
1073   *
1074   * if the file was recently truncated, we include information about its
1075   * old and new size so that the object can be updated appropriately.  (we
1076   * avoid synchronously deleting truncated objects because it's slow.)
1077   */
ceph_osdc_new_request(struct ceph_osd_client * osdc,struct ceph_file_layout * layout,struct ceph_vino vino,u64 off,u64 * plen,unsigned int which,int num_ops,int opcode,int flags,struct ceph_snap_context * snapc,u32 truncate_seq,u64 truncate_size,bool use_mempool)1078  struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
1079  					       struct ceph_file_layout *layout,
1080  					       struct ceph_vino vino,
1081  					       u64 off, u64 *plen,
1082  					       unsigned int which, int num_ops,
1083  					       int opcode, int flags,
1084  					       struct ceph_snap_context *snapc,
1085  					       u32 truncate_seq,
1086  					       u64 truncate_size,
1087  					       bool use_mempool)
1088  {
1089  	struct ceph_osd_request *req;
1090  	u64 objnum = 0;
1091  	u64 objoff = 0;
1092  	u64 objlen = 0;
1093  	int r;
1094  
1095  	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
1096  	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
1097  	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
1098  	       opcode != CEPH_OSD_OP_SPARSE_READ);
1099  
1100  	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
1101  					GFP_NOFS);
1102  	if (!req) {
1103  		r = -ENOMEM;
1104  		goto fail;
1105  	}
1106  
1107  	/* calculate max write size */
1108  	r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
1109  	if (r)
1110  		goto fail;
1111  
1112  	if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
1113  		osd_req_op_init(req, which, opcode, 0);
1114  	} else {
1115  		u32 object_size = layout->object_size;
1116  		u32 object_base = off - objoff;
1117  		if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
1118  			if (truncate_size <= object_base) {
1119  				truncate_size = 0;
1120  			} else {
1121  				truncate_size -= object_base;
1122  				if (truncate_size > object_size)
1123  					truncate_size = object_size;
1124  			}
1125  		}
1126  		osd_req_op_extent_init(req, which, opcode, objoff, objlen,
1127  				       truncate_size, truncate_seq);
1128  	}
1129  
1130  	req->r_base_oloc.pool = layout->pool_id;
1131  	req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
1132  	ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
1133  	req->r_flags = flags | osdc->client->options->read_from_replica;
1134  
1135  	req->r_snapid = vino.snap;
1136  	if (flags & CEPH_OSD_FLAG_WRITE)
1137  		req->r_data_offset = off;
1138  
1139  	if (num_ops > 1) {
1140  		int num_req_ops, num_rep_ops;
1141  
1142  		/*
1143  		 * If this is a multi-op write request, assume that we'll need
1144  		 * request ops. If it's a multi-op read then assume we'll need
1145  		 * reply ops. Anything else and call it -EINVAL.
1146  		 */
1147  		if (flags & CEPH_OSD_FLAG_WRITE) {
1148  			num_req_ops = num_ops;
1149  			num_rep_ops = 0;
1150  		} else if (flags & CEPH_OSD_FLAG_READ) {
1151  			num_req_ops = 0;
1152  			num_rep_ops = num_ops;
1153  		} else {
1154  			r = -EINVAL;
1155  			goto fail;
1156  		}
1157  
1158  		r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_req_ops,
1159  					       num_rep_ops);
1160  	} else {
1161  		r = ceph_osdc_alloc_messages(req, GFP_NOFS);
1162  	}
1163  	if (r)
1164  		goto fail;
1165  
1166  	return req;
1167  
1168  fail:
1169  	ceph_osdc_put_request(req);
1170  	return ERR_PTR(r);
1171  }
1172  EXPORT_SYMBOL(ceph_osdc_new_request);
1173  
__ceph_alloc_sparse_ext_map(struct ceph_osd_req_op * op,int cnt)1174  int __ceph_alloc_sparse_ext_map(struct ceph_osd_req_op *op, int cnt)
1175  {
1176  	WARN_ON(op->op != CEPH_OSD_OP_SPARSE_READ);
1177  
1178  	op->extent.sparse_ext_cnt = cnt;
1179  	op->extent.sparse_ext = kmalloc_array(cnt,
1180  					      sizeof(*op->extent.sparse_ext),
1181  					      GFP_NOFS);
1182  	if (!op->extent.sparse_ext)
1183  		return -ENOMEM;
1184  	return 0;
1185  }
1186  EXPORT_SYMBOL(__ceph_alloc_sparse_ext_map);
1187  
1188  /*
1189   * We keep osd requests in an rbtree, sorted by ->r_tid.
1190   */
DEFINE_RB_FUNCS(request,struct ceph_osd_request,r_tid,r_node)1191  DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
1192  DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
1193  
1194  /*
1195   * Call @fn on each OSD request as long as @fn returns 0.
1196   */
1197  static void for_each_request(struct ceph_osd_client *osdc,
1198  			int (*fn)(struct ceph_osd_request *req, void *arg),
1199  			void *arg)
1200  {
1201  	struct rb_node *n, *p;
1202  
1203  	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
1204  		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1205  
1206  		for (p = rb_first(&osd->o_requests); p; ) {
1207  			struct ceph_osd_request *req =
1208  			    rb_entry(p, struct ceph_osd_request, r_node);
1209  
1210  			p = rb_next(p);
1211  			if (fn(req, arg))
1212  				return;
1213  		}
1214  	}
1215  
1216  	for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
1217  		struct ceph_osd_request *req =
1218  		    rb_entry(p, struct ceph_osd_request, r_node);
1219  
1220  		p = rb_next(p);
1221  		if (fn(req, arg))
1222  			return;
1223  	}
1224  }
1225  
osd_homeless(struct ceph_osd * osd)1226  static bool osd_homeless(struct ceph_osd *osd)
1227  {
1228  	return osd->o_osd == CEPH_HOMELESS_OSD;
1229  }
1230  
osd_registered(struct ceph_osd * osd)1231  static bool osd_registered(struct ceph_osd *osd)
1232  {
1233  	verify_osdc_locked(osd->o_osdc);
1234  
1235  	return !RB_EMPTY_NODE(&osd->o_node);
1236  }
1237  
1238  /*
1239   * Assumes @osd is zero-initialized.
1240   */
osd_init(struct ceph_osd * osd)1241  static void osd_init(struct ceph_osd *osd)
1242  {
1243  	refcount_set(&osd->o_ref, 1);
1244  	RB_CLEAR_NODE(&osd->o_node);
1245  	spin_lock_init(&osd->o_requests_lock);
1246  	osd->o_requests = RB_ROOT;
1247  	osd->o_linger_requests = RB_ROOT;
1248  	osd->o_backoff_mappings = RB_ROOT;
1249  	osd->o_backoffs_by_id = RB_ROOT;
1250  	INIT_LIST_HEAD(&osd->o_osd_lru);
1251  	INIT_LIST_HEAD(&osd->o_keepalive_item);
1252  	osd->o_incarnation = 1;
1253  	mutex_init(&osd->lock);
1254  }
1255  
ceph_init_sparse_read(struct ceph_sparse_read * sr)1256  static void ceph_init_sparse_read(struct ceph_sparse_read *sr)
1257  {
1258  	kfree(sr->sr_extent);
1259  	memset(sr, '\0', sizeof(*sr));
1260  	sr->sr_state = CEPH_SPARSE_READ_HDR;
1261  }
1262  
osd_cleanup(struct ceph_osd * osd)1263  static void osd_cleanup(struct ceph_osd *osd)
1264  {
1265  	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1266  	WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1267  	WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1268  	WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
1269  	WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
1270  	WARN_ON(!list_empty(&osd->o_osd_lru));
1271  	WARN_ON(!list_empty(&osd->o_keepalive_item));
1272  
1273  	ceph_init_sparse_read(&osd->o_sparse_read);
1274  
1275  	if (osd->o_auth.authorizer) {
1276  		WARN_ON(osd_homeless(osd));
1277  		ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1278  	}
1279  }
1280  
1281  /*
1282   * Track open sessions with osds.
1283   */
create_osd(struct ceph_osd_client * osdc,int onum)1284  static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1285  {
1286  	struct ceph_osd *osd;
1287  
1288  	WARN_ON(onum == CEPH_HOMELESS_OSD);
1289  
1290  	osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
1291  	osd_init(osd);
1292  	osd->o_osdc = osdc;
1293  	osd->o_osd = onum;
1294  	osd->o_sparse_op_idx = -1;
1295  
1296  	ceph_init_sparse_read(&osd->o_sparse_read);
1297  
1298  	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1299  
1300  	return osd;
1301  }
1302  
get_osd(struct ceph_osd * osd)1303  static struct ceph_osd *get_osd(struct ceph_osd *osd)
1304  {
1305  	if (refcount_inc_not_zero(&osd->o_ref)) {
1306  		dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
1307  		     refcount_read(&osd->o_ref));
1308  		return osd;
1309  	} else {
1310  		dout("get_osd %p FAIL\n", osd);
1311  		return NULL;
1312  	}
1313  }
1314  
put_osd(struct ceph_osd * osd)1315  static void put_osd(struct ceph_osd *osd)
1316  {
1317  	dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
1318  	     refcount_read(&osd->o_ref) - 1);
1319  	if (refcount_dec_and_test(&osd->o_ref)) {
1320  		osd_cleanup(osd);
1321  		kfree(osd);
1322  	}
1323  }
1324  
DEFINE_RB_FUNCS(osd,struct ceph_osd,o_osd,o_node)1325  DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1326  
1327  static void __move_osd_to_lru(struct ceph_osd *osd)
1328  {
1329  	struct ceph_osd_client *osdc = osd->o_osdc;
1330  
1331  	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1332  	BUG_ON(!list_empty(&osd->o_osd_lru));
1333  
1334  	spin_lock(&osdc->osd_lru_lock);
1335  	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1336  	spin_unlock(&osdc->osd_lru_lock);
1337  
1338  	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1339  }
1340  
maybe_move_osd_to_lru(struct ceph_osd * osd)1341  static void maybe_move_osd_to_lru(struct ceph_osd *osd)
1342  {
1343  	if (RB_EMPTY_ROOT(&osd->o_requests) &&
1344  	    RB_EMPTY_ROOT(&osd->o_linger_requests))
1345  		__move_osd_to_lru(osd);
1346  }
1347  
__remove_osd_from_lru(struct ceph_osd * osd)1348  static void __remove_osd_from_lru(struct ceph_osd *osd)
1349  {
1350  	struct ceph_osd_client *osdc = osd->o_osdc;
1351  
1352  	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1353  
1354  	spin_lock(&osdc->osd_lru_lock);
1355  	if (!list_empty(&osd->o_osd_lru))
1356  		list_del_init(&osd->o_osd_lru);
1357  	spin_unlock(&osdc->osd_lru_lock);
1358  }
1359  
1360  /*
1361   * Close the connection and assign any leftover requests to the
1362   * homeless session.
1363   */
close_osd(struct ceph_osd * osd)1364  static void close_osd(struct ceph_osd *osd)
1365  {
1366  	struct ceph_osd_client *osdc = osd->o_osdc;
1367  	struct rb_node *n;
1368  
1369  	verify_osdc_wrlocked(osdc);
1370  	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1371  
1372  	ceph_con_close(&osd->o_con);
1373  
1374  	for (n = rb_first(&osd->o_requests); n; ) {
1375  		struct ceph_osd_request *req =
1376  		    rb_entry(n, struct ceph_osd_request, r_node);
1377  
1378  		n = rb_next(n); /* unlink_request() */
1379  
1380  		dout(" reassigning req %p tid %llu\n", req, req->r_tid);
1381  		unlink_request(osd, req);
1382  		link_request(&osdc->homeless_osd, req);
1383  	}
1384  	for (n = rb_first(&osd->o_linger_requests); n; ) {
1385  		struct ceph_osd_linger_request *lreq =
1386  		    rb_entry(n, struct ceph_osd_linger_request, node);
1387  
1388  		n = rb_next(n); /* unlink_linger() */
1389  
1390  		dout(" reassigning lreq %p linger_id %llu\n", lreq,
1391  		     lreq->linger_id);
1392  		unlink_linger(osd, lreq);
1393  		link_linger(&osdc->homeless_osd, lreq);
1394  	}
1395  	clear_backoffs(osd);
1396  
1397  	__remove_osd_from_lru(osd);
1398  	erase_osd(&osdc->osds, osd);
1399  	put_osd(osd);
1400  }
1401  
1402  /*
1403   * reset osd connect
1404   */
reopen_osd(struct ceph_osd * osd)1405  static int reopen_osd(struct ceph_osd *osd)
1406  {
1407  	struct ceph_entity_addr *peer_addr;
1408  
1409  	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1410  
1411  	if (RB_EMPTY_ROOT(&osd->o_requests) &&
1412  	    RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1413  		close_osd(osd);
1414  		return -ENODEV;
1415  	}
1416  
1417  	peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
1418  	if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1419  			!ceph_con_opened(&osd->o_con)) {
1420  		struct rb_node *n;
1421  
1422  		dout("osd addr hasn't changed and connection never opened, "
1423  		     "letting msgr retry\n");
1424  		/* touch each r_stamp for handle_timeout()'s benfit */
1425  		for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1426  			struct ceph_osd_request *req =
1427  			    rb_entry(n, struct ceph_osd_request, r_node);
1428  			req->r_stamp = jiffies;
1429  		}
1430  
1431  		return -EAGAIN;
1432  	}
1433  
1434  	ceph_con_close(&osd->o_con);
1435  	ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1436  	osd->o_incarnation++;
1437  
1438  	return 0;
1439  }
1440  
lookup_create_osd(struct ceph_osd_client * osdc,int o,bool wrlocked)1441  static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
1442  					  bool wrlocked)
1443  {
1444  	struct ceph_osd *osd;
1445  
1446  	if (wrlocked)
1447  		verify_osdc_wrlocked(osdc);
1448  	else
1449  		verify_osdc_locked(osdc);
1450  
1451  	if (o != CEPH_HOMELESS_OSD)
1452  		osd = lookup_osd(&osdc->osds, o);
1453  	else
1454  		osd = &osdc->homeless_osd;
1455  	if (!osd) {
1456  		if (!wrlocked)
1457  			return ERR_PTR(-EAGAIN);
1458  
1459  		osd = create_osd(osdc, o);
1460  		insert_osd(&osdc->osds, osd);
1461  		ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1462  			      &osdc->osdmap->osd_addr[osd->o_osd]);
1463  	}
1464  
1465  	dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1466  	return osd;
1467  }
1468  
1469  /*
1470   * Create request <-> OSD session relation.
1471   *
1472   * @req has to be assigned a tid, @osd may be homeless.
1473   */
link_request(struct ceph_osd * osd,struct ceph_osd_request * req)1474  static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1475  {
1476  	verify_osd_locked(osd);
1477  	WARN_ON(!req->r_tid || req->r_osd);
1478  	dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1479  	     req, req->r_tid);
1480  
1481  	if (!osd_homeless(osd))
1482  		__remove_osd_from_lru(osd);
1483  	else
1484  		atomic_inc(&osd->o_osdc->num_homeless);
1485  
1486  	get_osd(osd);
1487  	spin_lock(&osd->o_requests_lock);
1488  	insert_request(&osd->o_requests, req);
1489  	spin_unlock(&osd->o_requests_lock);
1490  	req->r_osd = osd;
1491  }
1492  
unlink_request(struct ceph_osd * osd,struct ceph_osd_request * req)1493  static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1494  {
1495  	verify_osd_locked(osd);
1496  	WARN_ON(req->r_osd != osd);
1497  	dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1498  	     req, req->r_tid);
1499  
1500  	req->r_osd = NULL;
1501  	spin_lock(&osd->o_requests_lock);
1502  	erase_request(&osd->o_requests, req);
1503  	spin_unlock(&osd->o_requests_lock);
1504  	put_osd(osd);
1505  
1506  	if (!osd_homeless(osd))
1507  		maybe_move_osd_to_lru(osd);
1508  	else
1509  		atomic_dec(&osd->o_osdc->num_homeless);
1510  }
1511  
__pool_full(struct ceph_pg_pool_info * pi)1512  static bool __pool_full(struct ceph_pg_pool_info *pi)
1513  {
1514  	return pi->flags & CEPH_POOL_FLAG_FULL;
1515  }
1516  
have_pool_full(struct ceph_osd_client * osdc)1517  static bool have_pool_full(struct ceph_osd_client *osdc)
1518  {
1519  	struct rb_node *n;
1520  
1521  	for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
1522  		struct ceph_pg_pool_info *pi =
1523  		    rb_entry(n, struct ceph_pg_pool_info, node);
1524  
1525  		if (__pool_full(pi))
1526  			return true;
1527  	}
1528  
1529  	return false;
1530  }
1531  
pool_full(struct ceph_osd_client * osdc,s64 pool_id)1532  static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
1533  {
1534  	struct ceph_pg_pool_info *pi;
1535  
1536  	pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
1537  	if (!pi)
1538  		return false;
1539  
1540  	return __pool_full(pi);
1541  }
1542  
1543  /*
1544   * Returns whether a request should be blocked from being sent
1545   * based on the current osdmap and osd_client settings.
1546   */
target_should_be_paused(struct ceph_osd_client * osdc,const struct ceph_osd_request_target * t,struct ceph_pg_pool_info * pi)1547  static bool target_should_be_paused(struct ceph_osd_client *osdc,
1548  				    const struct ceph_osd_request_target *t,
1549  				    struct ceph_pg_pool_info *pi)
1550  {
1551  	bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
1552  	bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
1553  		       ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1554  		       __pool_full(pi);
1555  
1556  	WARN_ON(pi->id != t->target_oloc.pool);
1557  	return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
1558  	       ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
1559  	       (osdc->osdmap->epoch < osdc->epoch_barrier);
1560  }
1561  
pick_random_replica(const struct ceph_osds * acting)1562  static int pick_random_replica(const struct ceph_osds *acting)
1563  {
1564  	int i = get_random_u32_below(acting->size);
1565  
1566  	dout("%s picked osd%d, primary osd%d\n", __func__,
1567  	     acting->osds[i], acting->primary);
1568  	return i;
1569  }
1570  
1571  /*
1572   * Picks the closest replica based on client's location given by
1573   * crush_location option.  Prefers the primary if the locality is
1574   * the same.
1575   */
pick_closest_replica(struct ceph_osd_client * osdc,const struct ceph_osds * acting)1576  static int pick_closest_replica(struct ceph_osd_client *osdc,
1577  				const struct ceph_osds *acting)
1578  {
1579  	struct ceph_options *opt = osdc->client->options;
1580  	int best_i, best_locality;
1581  	int i = 0, locality;
1582  
1583  	do {
1584  		locality = ceph_get_crush_locality(osdc->osdmap,
1585  						   acting->osds[i],
1586  						   &opt->crush_locs);
1587  		if (i == 0 ||
1588  		    (locality >= 0 && best_locality < 0) ||
1589  		    (locality >= 0 && best_locality >= 0 &&
1590  		     locality < best_locality)) {
1591  			best_i = i;
1592  			best_locality = locality;
1593  		}
1594  	} while (++i < acting->size);
1595  
1596  	dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
1597  	     acting->osds[best_i], best_locality, acting->primary);
1598  	return best_i;
1599  }
1600  
1601  enum calc_target_result {
1602  	CALC_TARGET_NO_ACTION = 0,
1603  	CALC_TARGET_NEED_RESEND,
1604  	CALC_TARGET_POOL_DNE,
1605  };
1606  
calc_target(struct ceph_osd_client * osdc,struct ceph_osd_request_target * t,bool any_change)1607  static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1608  					   struct ceph_osd_request_target *t,
1609  					   bool any_change)
1610  {
1611  	struct ceph_pg_pool_info *pi;
1612  	struct ceph_pg pgid, last_pgid;
1613  	struct ceph_osds up, acting;
1614  	bool is_read = t->flags & CEPH_OSD_FLAG_READ;
1615  	bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
1616  	bool force_resend = false;
1617  	bool unpaused = false;
1618  	bool legacy_change = false;
1619  	bool split = false;
1620  	bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
1621  	bool recovery_deletes = ceph_osdmap_flag(osdc,
1622  						 CEPH_OSDMAP_RECOVERY_DELETES);
1623  	enum calc_target_result ct_res;
1624  
1625  	t->epoch = osdc->osdmap->epoch;
1626  	pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1627  	if (!pi) {
1628  		t->osd = CEPH_HOMELESS_OSD;
1629  		ct_res = CALC_TARGET_POOL_DNE;
1630  		goto out;
1631  	}
1632  
1633  	if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1634  		if (t->last_force_resend < pi->last_force_request_resend) {
1635  			t->last_force_resend = pi->last_force_request_resend;
1636  			force_resend = true;
1637  		} else if (t->last_force_resend == 0) {
1638  			force_resend = true;
1639  		}
1640  	}
1641  
1642  	/* apply tiering */
1643  	ceph_oid_copy(&t->target_oid, &t->base_oid);
1644  	ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1645  	if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1646  		if (is_read && pi->read_tier >= 0)
1647  			t->target_oloc.pool = pi->read_tier;
1648  		if (is_write && pi->write_tier >= 0)
1649  			t->target_oloc.pool = pi->write_tier;
1650  
1651  		pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
1652  		if (!pi) {
1653  			t->osd = CEPH_HOMELESS_OSD;
1654  			ct_res = CALC_TARGET_POOL_DNE;
1655  			goto out;
1656  		}
1657  	}
1658  
1659  	__ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid);
1660  	last_pgid.pool = pgid.pool;
1661  	last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1662  
1663  	ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
1664  	if (any_change &&
1665  	    ceph_is_new_interval(&t->acting,
1666  				 &acting,
1667  				 &t->up,
1668  				 &up,
1669  				 t->size,
1670  				 pi->size,
1671  				 t->min_size,
1672  				 pi->min_size,
1673  				 t->pg_num,
1674  				 pi->pg_num,
1675  				 t->sort_bitwise,
1676  				 sort_bitwise,
1677  				 t->recovery_deletes,
1678  				 recovery_deletes,
1679  				 &last_pgid))
1680  		force_resend = true;
1681  
1682  	if (t->paused && !target_should_be_paused(osdc, t, pi)) {
1683  		t->paused = false;
1684  		unpaused = true;
1685  	}
1686  	legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
1687  			ceph_osds_changed(&t->acting, &acting,
1688  					  t->used_replica || any_change);
1689  	if (t->pg_num)
1690  		split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
1691  
1692  	if (legacy_change || force_resend || split) {
1693  		t->pgid = pgid; /* struct */
1694  		ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
1695  		ceph_osds_copy(&t->acting, &acting);
1696  		ceph_osds_copy(&t->up, &up);
1697  		t->size = pi->size;
1698  		t->min_size = pi->min_size;
1699  		t->pg_num = pi->pg_num;
1700  		t->pg_num_mask = pi->pg_num_mask;
1701  		t->sort_bitwise = sort_bitwise;
1702  		t->recovery_deletes = recovery_deletes;
1703  
1704  		if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
1705  				 CEPH_OSD_FLAG_LOCALIZE_READS)) &&
1706  		    !is_write && pi->type == CEPH_POOL_TYPE_REP &&
1707  		    acting.size > 1) {
1708  			int pos;
1709  
1710  			WARN_ON(!is_read || acting.osds[0] != acting.primary);
1711  			if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
1712  				pos = pick_random_replica(&acting);
1713  			} else {
1714  				pos = pick_closest_replica(osdc, &acting);
1715  			}
1716  			t->osd = acting.osds[pos];
1717  			t->used_replica = pos > 0;
1718  		} else {
1719  			t->osd = acting.primary;
1720  			t->used_replica = false;
1721  		}
1722  	}
1723  
1724  	if (unpaused || legacy_change || force_resend || split)
1725  		ct_res = CALC_TARGET_NEED_RESEND;
1726  	else
1727  		ct_res = CALC_TARGET_NO_ACTION;
1728  
1729  out:
1730  	dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused,
1731  	     legacy_change, force_resend, split, ct_res, t->osd);
1732  	return ct_res;
1733  }
1734  
alloc_spg_mapping(void)1735  static struct ceph_spg_mapping *alloc_spg_mapping(void)
1736  {
1737  	struct ceph_spg_mapping *spg;
1738  
1739  	spg = kmalloc(sizeof(*spg), GFP_NOIO);
1740  	if (!spg)
1741  		return NULL;
1742  
1743  	RB_CLEAR_NODE(&spg->node);
1744  	spg->backoffs = RB_ROOT;
1745  	return spg;
1746  }
1747  
free_spg_mapping(struct ceph_spg_mapping * spg)1748  static void free_spg_mapping(struct ceph_spg_mapping *spg)
1749  {
1750  	WARN_ON(!RB_EMPTY_NODE(&spg->node));
1751  	WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
1752  
1753  	kfree(spg);
1754  }
1755  
1756  /*
1757   * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
1758   * ceph_pg_mapping.  Used to track OSD backoffs -- a backoff [range] is
1759   * defined only within a specific spgid; it does not pass anything to
1760   * children on split, or to another primary.
1761   */
DEFINE_RB_FUNCS2(spg_mapping,struct ceph_spg_mapping,spgid,ceph_spg_compare,RB_BYPTR,const struct ceph_spg *,node)1762  DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
1763  		 RB_BYPTR, const struct ceph_spg *, node)
1764  
1765  static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
1766  {
1767  	return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
1768  }
1769  
hoid_get_effective_key(const struct ceph_hobject_id * hoid,void ** pkey,size_t * pkey_len)1770  static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
1771  				   void **pkey, size_t *pkey_len)
1772  {
1773  	if (hoid->key_len) {
1774  		*pkey = hoid->key;
1775  		*pkey_len = hoid->key_len;
1776  	} else {
1777  		*pkey = hoid->oid;
1778  		*pkey_len = hoid->oid_len;
1779  	}
1780  }
1781  
compare_names(const void * name1,size_t name1_len,const void * name2,size_t name2_len)1782  static int compare_names(const void *name1, size_t name1_len,
1783  			 const void *name2, size_t name2_len)
1784  {
1785  	int ret;
1786  
1787  	ret = memcmp(name1, name2, min(name1_len, name2_len));
1788  	if (!ret) {
1789  		if (name1_len < name2_len)
1790  			ret = -1;
1791  		else if (name1_len > name2_len)
1792  			ret = 1;
1793  	}
1794  	return ret;
1795  }
1796  
hoid_compare(const struct ceph_hobject_id * lhs,const struct ceph_hobject_id * rhs)1797  static int hoid_compare(const struct ceph_hobject_id *lhs,
1798  			const struct ceph_hobject_id *rhs)
1799  {
1800  	void *effective_key1, *effective_key2;
1801  	size_t effective_key1_len, effective_key2_len;
1802  	int ret;
1803  
1804  	if (lhs->is_max < rhs->is_max)
1805  		return -1;
1806  	if (lhs->is_max > rhs->is_max)
1807  		return 1;
1808  
1809  	if (lhs->pool < rhs->pool)
1810  		return -1;
1811  	if (lhs->pool > rhs->pool)
1812  		return 1;
1813  
1814  	if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
1815  		return -1;
1816  	if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
1817  		return 1;
1818  
1819  	ret = compare_names(lhs->nspace, lhs->nspace_len,
1820  			    rhs->nspace, rhs->nspace_len);
1821  	if (ret)
1822  		return ret;
1823  
1824  	hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
1825  	hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
1826  	ret = compare_names(effective_key1, effective_key1_len,
1827  			    effective_key2, effective_key2_len);
1828  	if (ret)
1829  		return ret;
1830  
1831  	ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
1832  	if (ret)
1833  		return ret;
1834  
1835  	if (lhs->snapid < rhs->snapid)
1836  		return -1;
1837  	if (lhs->snapid > rhs->snapid)
1838  		return 1;
1839  
1840  	return 0;
1841  }
1842  
1843  /*
1844   * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
1845   * compat stuff here.
1846   *
1847   * Assumes @hoid is zero-initialized.
1848   */
decode_hoid(void ** p,void * end,struct ceph_hobject_id * hoid)1849  static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
1850  {
1851  	u8 struct_v;
1852  	u32 struct_len;
1853  	int ret;
1854  
1855  	ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
1856  				  &struct_len);
1857  	if (ret)
1858  		return ret;
1859  
1860  	if (struct_v < 4) {
1861  		pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
1862  		goto e_inval;
1863  	}
1864  
1865  	hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
1866  						GFP_NOIO);
1867  	if (IS_ERR(hoid->key)) {
1868  		ret = PTR_ERR(hoid->key);
1869  		hoid->key = NULL;
1870  		return ret;
1871  	}
1872  
1873  	hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
1874  						GFP_NOIO);
1875  	if (IS_ERR(hoid->oid)) {
1876  		ret = PTR_ERR(hoid->oid);
1877  		hoid->oid = NULL;
1878  		return ret;
1879  	}
1880  
1881  	ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
1882  	ceph_decode_32_safe(p, end, hoid->hash, e_inval);
1883  	ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
1884  
1885  	hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
1886  						   GFP_NOIO);
1887  	if (IS_ERR(hoid->nspace)) {
1888  		ret = PTR_ERR(hoid->nspace);
1889  		hoid->nspace = NULL;
1890  		return ret;
1891  	}
1892  
1893  	ceph_decode_64_safe(p, end, hoid->pool, e_inval);
1894  
1895  	ceph_hoid_build_hash_cache(hoid);
1896  	return 0;
1897  
1898  e_inval:
1899  	return -EINVAL;
1900  }
1901  
hoid_encoding_size(const struct ceph_hobject_id * hoid)1902  static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
1903  {
1904  	return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
1905  	       4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
1906  }
1907  
encode_hoid(void ** p,void * end,const struct ceph_hobject_id * hoid)1908  static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
1909  {
1910  	ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
1911  	ceph_encode_string(p, end, hoid->key, hoid->key_len);
1912  	ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
1913  	ceph_encode_64(p, hoid->snapid);
1914  	ceph_encode_32(p, hoid->hash);
1915  	ceph_encode_8(p, hoid->is_max);
1916  	ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
1917  	ceph_encode_64(p, hoid->pool);
1918  }
1919  
free_hoid(struct ceph_hobject_id * hoid)1920  static void free_hoid(struct ceph_hobject_id *hoid)
1921  {
1922  	if (hoid) {
1923  		kfree(hoid->key);
1924  		kfree(hoid->oid);
1925  		kfree(hoid->nspace);
1926  		kfree(hoid);
1927  	}
1928  }
1929  
alloc_backoff(void)1930  static struct ceph_osd_backoff *alloc_backoff(void)
1931  {
1932  	struct ceph_osd_backoff *backoff;
1933  
1934  	backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
1935  	if (!backoff)
1936  		return NULL;
1937  
1938  	RB_CLEAR_NODE(&backoff->spg_node);
1939  	RB_CLEAR_NODE(&backoff->id_node);
1940  	return backoff;
1941  }
1942  
free_backoff(struct ceph_osd_backoff * backoff)1943  static void free_backoff(struct ceph_osd_backoff *backoff)
1944  {
1945  	WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
1946  	WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
1947  
1948  	free_hoid(backoff->begin);
1949  	free_hoid(backoff->end);
1950  	kfree(backoff);
1951  }
1952  
1953  /*
1954   * Within a specific spgid, backoffs are managed by ->begin hoid.
1955   */
1956  DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
1957  			RB_BYVAL, spg_node);
1958  
lookup_containing_backoff(struct rb_root * root,const struct ceph_hobject_id * hoid)1959  static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
1960  					    const struct ceph_hobject_id *hoid)
1961  {
1962  	struct rb_node *n = root->rb_node;
1963  
1964  	while (n) {
1965  		struct ceph_osd_backoff *cur =
1966  		    rb_entry(n, struct ceph_osd_backoff, spg_node);
1967  		int cmp;
1968  
1969  		cmp = hoid_compare(hoid, cur->begin);
1970  		if (cmp < 0) {
1971  			n = n->rb_left;
1972  		} else if (cmp > 0) {
1973  			if (hoid_compare(hoid, cur->end) < 0)
1974  				return cur;
1975  
1976  			n = n->rb_right;
1977  		} else {
1978  			return cur;
1979  		}
1980  	}
1981  
1982  	return NULL;
1983  }
1984  
1985  /*
1986   * Each backoff has a unique id within its OSD session.
1987   */
DEFINE_RB_FUNCS(backoff_by_id,struct ceph_osd_backoff,id,id_node)1988  DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
1989  
1990  static void clear_backoffs(struct ceph_osd *osd)
1991  {
1992  	while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
1993  		struct ceph_spg_mapping *spg =
1994  		    rb_entry(rb_first(&osd->o_backoff_mappings),
1995  			     struct ceph_spg_mapping, node);
1996  
1997  		while (!RB_EMPTY_ROOT(&spg->backoffs)) {
1998  			struct ceph_osd_backoff *backoff =
1999  			    rb_entry(rb_first(&spg->backoffs),
2000  				     struct ceph_osd_backoff, spg_node);
2001  
2002  			erase_backoff(&spg->backoffs, backoff);
2003  			erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
2004  			free_backoff(backoff);
2005  		}
2006  		erase_spg_mapping(&osd->o_backoff_mappings, spg);
2007  		free_spg_mapping(spg);
2008  	}
2009  }
2010  
2011  /*
2012   * Set up a temporary, non-owning view into @t.
2013   */
hoid_fill_from_target(struct ceph_hobject_id * hoid,const struct ceph_osd_request_target * t)2014  static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
2015  				  const struct ceph_osd_request_target *t)
2016  {
2017  	hoid->key = NULL;
2018  	hoid->key_len = 0;
2019  	hoid->oid = t->target_oid.name;
2020  	hoid->oid_len = t->target_oid.name_len;
2021  	hoid->snapid = CEPH_NOSNAP;
2022  	hoid->hash = t->pgid.seed;
2023  	hoid->is_max = false;
2024  	if (t->target_oloc.pool_ns) {
2025  		hoid->nspace = t->target_oloc.pool_ns->str;
2026  		hoid->nspace_len = t->target_oloc.pool_ns->len;
2027  	} else {
2028  		hoid->nspace = NULL;
2029  		hoid->nspace_len = 0;
2030  	}
2031  	hoid->pool = t->target_oloc.pool;
2032  	ceph_hoid_build_hash_cache(hoid);
2033  }
2034  
should_plug_request(struct ceph_osd_request * req)2035  static bool should_plug_request(struct ceph_osd_request *req)
2036  {
2037  	struct ceph_osd *osd = req->r_osd;
2038  	struct ceph_spg_mapping *spg;
2039  	struct ceph_osd_backoff *backoff;
2040  	struct ceph_hobject_id hoid;
2041  
2042  	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
2043  	if (!spg)
2044  		return false;
2045  
2046  	hoid_fill_from_target(&hoid, &req->r_t);
2047  	backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
2048  	if (!backoff)
2049  		return false;
2050  
2051  	dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
2052  	     __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
2053  	     backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
2054  	return true;
2055  }
2056  
2057  /*
2058   * Keep get_num_data_items() in sync with this function.
2059   */
setup_request_data(struct ceph_osd_request * req)2060  static void setup_request_data(struct ceph_osd_request *req)
2061  {
2062  	struct ceph_msg *request_msg = req->r_request;
2063  	struct ceph_msg *reply_msg = req->r_reply;
2064  	struct ceph_osd_req_op *op;
2065  
2066  	if (req->r_request->num_data_items || req->r_reply->num_data_items)
2067  		return;
2068  
2069  	WARN_ON(request_msg->data_length || reply_msg->data_length);
2070  	for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
2071  		switch (op->op) {
2072  		/* request */
2073  		case CEPH_OSD_OP_WRITE:
2074  		case CEPH_OSD_OP_WRITEFULL:
2075  			WARN_ON(op->indata_len != op->extent.length);
2076  			ceph_osdc_msg_data_add(request_msg,
2077  					       &op->extent.osd_data);
2078  			break;
2079  		case CEPH_OSD_OP_SETXATTR:
2080  		case CEPH_OSD_OP_CMPXATTR:
2081  			WARN_ON(op->indata_len != op->xattr.name_len +
2082  						  op->xattr.value_len);
2083  			ceph_osdc_msg_data_add(request_msg,
2084  					       &op->xattr.osd_data);
2085  			break;
2086  		case CEPH_OSD_OP_NOTIFY_ACK:
2087  			ceph_osdc_msg_data_add(request_msg,
2088  					       &op->notify_ack.request_data);
2089  			break;
2090  		case CEPH_OSD_OP_COPY_FROM2:
2091  			ceph_osdc_msg_data_add(request_msg,
2092  					       &op->copy_from.osd_data);
2093  			break;
2094  
2095  		/* reply */
2096  		case CEPH_OSD_OP_STAT:
2097  			ceph_osdc_msg_data_add(reply_msg,
2098  					       &op->raw_data_in);
2099  			break;
2100  		case CEPH_OSD_OP_READ:
2101  		case CEPH_OSD_OP_SPARSE_READ:
2102  			ceph_osdc_msg_data_add(reply_msg,
2103  					       &op->extent.osd_data);
2104  			break;
2105  		case CEPH_OSD_OP_LIST_WATCHERS:
2106  			ceph_osdc_msg_data_add(reply_msg,
2107  					       &op->list_watchers.response_data);
2108  			break;
2109  
2110  		/* both */
2111  		case CEPH_OSD_OP_CALL:
2112  			WARN_ON(op->indata_len != op->cls.class_len +
2113  						  op->cls.method_len +
2114  						  op->cls.indata_len);
2115  			ceph_osdc_msg_data_add(request_msg,
2116  					       &op->cls.request_info);
2117  			/* optional, can be NONE */
2118  			ceph_osdc_msg_data_add(request_msg,
2119  					       &op->cls.request_data);
2120  			/* optional, can be NONE */
2121  			ceph_osdc_msg_data_add(reply_msg,
2122  					       &op->cls.response_data);
2123  			break;
2124  		case CEPH_OSD_OP_NOTIFY:
2125  			ceph_osdc_msg_data_add(request_msg,
2126  					       &op->notify.request_data);
2127  			ceph_osdc_msg_data_add(reply_msg,
2128  					       &op->notify.response_data);
2129  			break;
2130  		}
2131  	}
2132  }
2133  
encode_pgid(void ** p,const struct ceph_pg * pgid)2134  static void encode_pgid(void **p, const struct ceph_pg *pgid)
2135  {
2136  	ceph_encode_8(p, 1);
2137  	ceph_encode_64(p, pgid->pool);
2138  	ceph_encode_32(p, pgid->seed);
2139  	ceph_encode_32(p, -1); /* preferred */
2140  }
2141  
encode_spgid(void ** p,const struct ceph_spg * spgid)2142  static void encode_spgid(void **p, const struct ceph_spg *spgid)
2143  {
2144  	ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
2145  	encode_pgid(p, &spgid->pgid);
2146  	ceph_encode_8(p, spgid->shard);
2147  }
2148  
encode_oloc(void ** p,void * end,const struct ceph_object_locator * oloc)2149  static void encode_oloc(void **p, void *end,
2150  			const struct ceph_object_locator *oloc)
2151  {
2152  	ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
2153  	ceph_encode_64(p, oloc->pool);
2154  	ceph_encode_32(p, -1); /* preferred */
2155  	ceph_encode_32(p, 0);  /* key len */
2156  	if (oloc->pool_ns)
2157  		ceph_encode_string(p, end, oloc->pool_ns->str,
2158  				   oloc->pool_ns->len);
2159  	else
2160  		ceph_encode_32(p, 0);
2161  }
2162  
encode_request_partial(struct ceph_osd_request * req,struct ceph_msg * msg)2163  static void encode_request_partial(struct ceph_osd_request *req,
2164  				   struct ceph_msg *msg)
2165  {
2166  	void *p = msg->front.iov_base;
2167  	void *const end = p + msg->front_alloc_len;
2168  	u32 data_len = 0;
2169  	int i;
2170  
2171  	if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
2172  		/* snapshots aren't writeable */
2173  		WARN_ON(req->r_snapid != CEPH_NOSNAP);
2174  	} else {
2175  		WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
2176  			req->r_data_offset || req->r_snapc);
2177  	}
2178  
2179  	setup_request_data(req);
2180  
2181  	encode_spgid(&p, &req->r_t.spgid); /* actual spg */
2182  	ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
2183  	ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
2184  	ceph_encode_32(&p, req->r_flags);
2185  
2186  	/* reqid */
2187  	ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
2188  	memset(p, 0, sizeof(struct ceph_osd_reqid));
2189  	p += sizeof(struct ceph_osd_reqid);
2190  
2191  	/* trace */
2192  	memset(p, 0, sizeof(struct ceph_blkin_trace_info));
2193  	p += sizeof(struct ceph_blkin_trace_info);
2194  
2195  	ceph_encode_32(&p, 0); /* client_inc, always 0 */
2196  	ceph_encode_timespec64(p, &req->r_mtime);
2197  	p += sizeof(struct ceph_timespec);
2198  
2199  	encode_oloc(&p, end, &req->r_t.target_oloc);
2200  	ceph_encode_string(&p, end, req->r_t.target_oid.name,
2201  			   req->r_t.target_oid.name_len);
2202  
2203  	/* ops, can imply data */
2204  	ceph_encode_16(&p, req->r_num_ops);
2205  	for (i = 0; i < req->r_num_ops; i++) {
2206  		data_len += osd_req_encode_op(p, &req->r_ops[i]);
2207  		p += sizeof(struct ceph_osd_op);
2208  	}
2209  
2210  	ceph_encode_64(&p, req->r_snapid); /* snapid */
2211  	if (req->r_snapc) {
2212  		ceph_encode_64(&p, req->r_snapc->seq);
2213  		ceph_encode_32(&p, req->r_snapc->num_snaps);
2214  		for (i = 0; i < req->r_snapc->num_snaps; i++)
2215  			ceph_encode_64(&p, req->r_snapc->snaps[i]);
2216  	} else {
2217  		ceph_encode_64(&p, 0); /* snap_seq */
2218  		ceph_encode_32(&p, 0); /* snaps len */
2219  	}
2220  
2221  	ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
2222  	BUG_ON(p > end - 8); /* space for features */
2223  
2224  	msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
2225  	/* front_len is finalized in encode_request_finish() */
2226  	msg->front.iov_len = p - msg->front.iov_base;
2227  	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2228  	msg->hdr.data_len = cpu_to_le32(data_len);
2229  	/*
2230  	 * The header "data_off" is a hint to the receiver allowing it
2231  	 * to align received data into its buffers such that there's no
2232  	 * need to re-copy it before writing it to disk (direct I/O).
2233  	 */
2234  	msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
2235  
2236  	dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
2237  	     req->r_t.target_oid.name, req->r_t.target_oid.name_len);
2238  }
2239  
encode_request_finish(struct ceph_msg * msg)2240  static void encode_request_finish(struct ceph_msg *msg)
2241  {
2242  	void *p = msg->front.iov_base;
2243  	void *const partial_end = p + msg->front.iov_len;
2244  	void *const end = p + msg->front_alloc_len;
2245  
2246  	if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
2247  		/* luminous OSD -- encode features and be done */
2248  		p = partial_end;
2249  		ceph_encode_64(&p, msg->con->peer_features);
2250  	} else {
2251  		struct {
2252  			char spgid[CEPH_ENCODING_START_BLK_LEN +
2253  				   CEPH_PGID_ENCODING_LEN + 1];
2254  			__le32 hash;
2255  			__le32 epoch;
2256  			__le32 flags;
2257  			char reqid[CEPH_ENCODING_START_BLK_LEN +
2258  				   sizeof(struct ceph_osd_reqid)];
2259  			char trace[sizeof(struct ceph_blkin_trace_info)];
2260  			__le32 client_inc;
2261  			struct ceph_timespec mtime;
2262  		} __packed head;
2263  		struct ceph_pg pgid;
2264  		void *oloc, *oid, *tail;
2265  		int oloc_len, oid_len, tail_len;
2266  		int len;
2267  
2268  		/*
2269  		 * Pre-luminous OSD -- reencode v8 into v4 using @head
2270  		 * as a temporary buffer.  Encode the raw PG; the rest
2271  		 * is just a matter of moving oloc, oid and tail blobs
2272  		 * around.
2273  		 */
2274  		memcpy(&head, p, sizeof(head));
2275  		p += sizeof(head);
2276  
2277  		oloc = p;
2278  		p += CEPH_ENCODING_START_BLK_LEN;
2279  		pgid.pool = ceph_decode_64(&p);
2280  		p += 4 + 4; /* preferred, key len */
2281  		len = ceph_decode_32(&p);
2282  		p += len;   /* nspace */
2283  		oloc_len = p - oloc;
2284  
2285  		oid = p;
2286  		len = ceph_decode_32(&p);
2287  		p += len;
2288  		oid_len = p - oid;
2289  
2290  		tail = p;
2291  		tail_len = partial_end - p;
2292  
2293  		p = msg->front.iov_base;
2294  		ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
2295  		ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
2296  		ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
2297  		ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
2298  
2299  		/* reassert_version */
2300  		memset(p, 0, sizeof(struct ceph_eversion));
2301  		p += sizeof(struct ceph_eversion);
2302  
2303  		BUG_ON(p >= oloc);
2304  		memmove(p, oloc, oloc_len);
2305  		p += oloc_len;
2306  
2307  		pgid.seed = le32_to_cpu(head.hash);
2308  		encode_pgid(&p, &pgid); /* raw pg */
2309  
2310  		BUG_ON(p >= oid);
2311  		memmove(p, oid, oid_len);
2312  		p += oid_len;
2313  
2314  		/* tail -- ops, snapid, snapc, retry_attempt */
2315  		BUG_ON(p >= tail);
2316  		memmove(p, tail, tail_len);
2317  		p += tail_len;
2318  
2319  		msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
2320  	}
2321  
2322  	BUG_ON(p > end);
2323  	msg->front.iov_len = p - msg->front.iov_base;
2324  	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2325  
2326  	dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
2327  	     le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
2328  	     le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
2329  	     le16_to_cpu(msg->hdr.version));
2330  }
2331  
2332  /*
2333   * @req has to be assigned a tid and registered.
2334   */
send_request(struct ceph_osd_request * req)2335  static void send_request(struct ceph_osd_request *req)
2336  {
2337  	struct ceph_osd *osd = req->r_osd;
2338  
2339  	verify_osd_locked(osd);
2340  	WARN_ON(osd->o_osd != req->r_t.osd);
2341  
2342  	/* backoff? */
2343  	if (should_plug_request(req))
2344  		return;
2345  
2346  	/*
2347  	 * We may have a previously queued request message hanging
2348  	 * around.  Cancel it to avoid corrupting the msgr.
2349  	 */
2350  	if (req->r_sent)
2351  		ceph_msg_revoke(req->r_request);
2352  
2353  	req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
2354  	if (req->r_attempts)
2355  		req->r_flags |= CEPH_OSD_FLAG_RETRY;
2356  	else
2357  		WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
2358  
2359  	encode_request_partial(req, req->r_request);
2360  
2361  	dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
2362  	     __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
2363  	     req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
2364  	     req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
2365  	     req->r_attempts);
2366  
2367  	req->r_t.paused = false;
2368  	req->r_stamp = jiffies;
2369  	req->r_attempts++;
2370  
2371  	req->r_sent = osd->o_incarnation;
2372  	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
2373  	ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
2374  }
2375  
maybe_request_map(struct ceph_osd_client * osdc)2376  static void maybe_request_map(struct ceph_osd_client *osdc)
2377  {
2378  	bool continuous = false;
2379  
2380  	verify_osdc_locked(osdc);
2381  	WARN_ON(!osdc->osdmap->epoch);
2382  
2383  	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2384  	    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
2385  	    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2386  		dout("%s osdc %p continuous\n", __func__, osdc);
2387  		continuous = true;
2388  	} else {
2389  		dout("%s osdc %p onetime\n", __func__, osdc);
2390  	}
2391  
2392  	if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2393  			       osdc->osdmap->epoch + 1, continuous))
2394  		ceph_monc_renew_subs(&osdc->client->monc);
2395  }
2396  
2397  static void complete_request(struct ceph_osd_request *req, int err);
2398  static void send_map_check(struct ceph_osd_request *req);
2399  
__submit_request(struct ceph_osd_request * req,bool wrlocked)2400  static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
2401  {
2402  	struct ceph_osd_client *osdc = req->r_osdc;
2403  	struct ceph_osd *osd;
2404  	enum calc_target_result ct_res;
2405  	int err = 0;
2406  	bool need_send = false;
2407  	bool promoted = false;
2408  
2409  	WARN_ON(req->r_tid);
2410  	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
2411  
2412  again:
2413  	ct_res = calc_target(osdc, &req->r_t, false);
2414  	if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
2415  		goto promote;
2416  
2417  	osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
2418  	if (IS_ERR(osd)) {
2419  		WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
2420  		goto promote;
2421  	}
2422  
2423  	if (osdc->abort_err) {
2424  		dout("req %p abort_err %d\n", req, osdc->abort_err);
2425  		err = osdc->abort_err;
2426  	} else if (osdc->osdmap->epoch < osdc->epoch_barrier) {
2427  		dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
2428  		     osdc->epoch_barrier);
2429  		req->r_t.paused = true;
2430  		maybe_request_map(osdc);
2431  	} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2432  		   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2433  		dout("req %p pausewr\n", req);
2434  		req->r_t.paused = true;
2435  		maybe_request_map(osdc);
2436  	} else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
2437  		   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2438  		dout("req %p pauserd\n", req);
2439  		req->r_t.paused = true;
2440  		maybe_request_map(osdc);
2441  	} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2442  		   !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
2443  				     CEPH_OSD_FLAG_FULL_FORCE)) &&
2444  		   (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2445  		    pool_full(osdc, req->r_t.base_oloc.pool))) {
2446  		dout("req %p full/pool_full\n", req);
2447  		if (ceph_test_opt(osdc->client, ABORT_ON_FULL)) {
2448  			err = -ENOSPC;
2449  		} else {
2450  			if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL))
2451  				pr_warn_ratelimited("cluster is full (osdmap FULL)\n");
2452  			else
2453  				pr_warn_ratelimited("pool %lld is full or reached quota\n",
2454  						    req->r_t.base_oloc.pool);
2455  			req->r_t.paused = true;
2456  			maybe_request_map(osdc);
2457  		}
2458  	} else if (!osd_homeless(osd)) {
2459  		need_send = true;
2460  	} else {
2461  		maybe_request_map(osdc);
2462  	}
2463  
2464  	mutex_lock(&osd->lock);
2465  	/*
2466  	 * Assign the tid atomically with send_request() to protect
2467  	 * multiple writes to the same object from racing with each
2468  	 * other, resulting in out of order ops on the OSDs.
2469  	 */
2470  	req->r_tid = atomic64_inc_return(&osdc->last_tid);
2471  	link_request(osd, req);
2472  	if (need_send)
2473  		send_request(req);
2474  	else if (err)
2475  		complete_request(req, err);
2476  	mutex_unlock(&osd->lock);
2477  
2478  	if (!err && ct_res == CALC_TARGET_POOL_DNE)
2479  		send_map_check(req);
2480  
2481  	if (promoted)
2482  		downgrade_write(&osdc->lock);
2483  	return;
2484  
2485  promote:
2486  	up_read(&osdc->lock);
2487  	down_write(&osdc->lock);
2488  	wrlocked = true;
2489  	promoted = true;
2490  	goto again;
2491  }
2492  
account_request(struct ceph_osd_request * req)2493  static void account_request(struct ceph_osd_request *req)
2494  {
2495  	WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
2496  	WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
2497  
2498  	req->r_flags |= CEPH_OSD_FLAG_ONDISK;
2499  	atomic_inc(&req->r_osdc->num_requests);
2500  
2501  	req->r_start_stamp = jiffies;
2502  	req->r_start_latency = ktime_get();
2503  }
2504  
submit_request(struct ceph_osd_request * req,bool wrlocked)2505  static void submit_request(struct ceph_osd_request *req, bool wrlocked)
2506  {
2507  	ceph_osdc_get_request(req);
2508  	account_request(req);
2509  	__submit_request(req, wrlocked);
2510  }
2511  
finish_request(struct ceph_osd_request * req)2512  static void finish_request(struct ceph_osd_request *req)
2513  {
2514  	struct ceph_osd_client *osdc = req->r_osdc;
2515  
2516  	WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
2517  	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2518  
2519  	req->r_end_latency = ktime_get();
2520  
2521  	if (req->r_osd) {
2522  		ceph_init_sparse_read(&req->r_osd->o_sparse_read);
2523  		unlink_request(req->r_osd, req);
2524  	}
2525  	atomic_dec(&osdc->num_requests);
2526  
2527  	/*
2528  	 * If an OSD has failed or returned and a request has been sent
2529  	 * twice, it's possible to get a reply and end up here while the
2530  	 * request message is queued for delivery.  We will ignore the
2531  	 * reply, so not a big deal, but better to try and catch it.
2532  	 */
2533  	ceph_msg_revoke(req->r_request);
2534  	ceph_msg_revoke_incoming(req->r_reply);
2535  }
2536  
__complete_request(struct ceph_osd_request * req)2537  static void __complete_request(struct ceph_osd_request *req)
2538  {
2539  	dout("%s req %p tid %llu cb %ps result %d\n", __func__, req,
2540  	     req->r_tid, req->r_callback, req->r_result);
2541  
2542  	if (req->r_callback)
2543  		req->r_callback(req);
2544  	complete_all(&req->r_completion);
2545  	ceph_osdc_put_request(req);
2546  }
2547  
complete_request_workfn(struct work_struct * work)2548  static void complete_request_workfn(struct work_struct *work)
2549  {
2550  	struct ceph_osd_request *req =
2551  	    container_of(work, struct ceph_osd_request, r_complete_work);
2552  
2553  	__complete_request(req);
2554  }
2555  
2556  /*
2557   * This is open-coded in handle_reply().
2558   */
complete_request(struct ceph_osd_request * req,int err)2559  static void complete_request(struct ceph_osd_request *req, int err)
2560  {
2561  	dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2562  
2563  	req->r_result = err;
2564  	finish_request(req);
2565  
2566  	INIT_WORK(&req->r_complete_work, complete_request_workfn);
2567  	queue_work(req->r_osdc->completion_wq, &req->r_complete_work);
2568  }
2569  
cancel_map_check(struct ceph_osd_request * req)2570  static void cancel_map_check(struct ceph_osd_request *req)
2571  {
2572  	struct ceph_osd_client *osdc = req->r_osdc;
2573  	struct ceph_osd_request *lookup_req;
2574  
2575  	verify_osdc_wrlocked(osdc);
2576  
2577  	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2578  	if (!lookup_req)
2579  		return;
2580  
2581  	WARN_ON(lookup_req != req);
2582  	erase_request_mc(&osdc->map_checks, req);
2583  	ceph_osdc_put_request(req);
2584  }
2585  
cancel_request(struct ceph_osd_request * req)2586  static void cancel_request(struct ceph_osd_request *req)
2587  {
2588  	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2589  
2590  	cancel_map_check(req);
2591  	finish_request(req);
2592  	complete_all(&req->r_completion);
2593  	ceph_osdc_put_request(req);
2594  }
2595  
abort_request(struct ceph_osd_request * req,int err)2596  static void abort_request(struct ceph_osd_request *req, int err)
2597  {
2598  	dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2599  
2600  	cancel_map_check(req);
2601  	complete_request(req, err);
2602  }
2603  
abort_fn(struct ceph_osd_request * req,void * arg)2604  static int abort_fn(struct ceph_osd_request *req, void *arg)
2605  {
2606  	int err = *(int *)arg;
2607  
2608  	abort_request(req, err);
2609  	return 0; /* continue iteration */
2610  }
2611  
2612  /*
2613   * Abort all in-flight requests with @err and arrange for all future
2614   * requests to be failed immediately.
2615   */
ceph_osdc_abort_requests(struct ceph_osd_client * osdc,int err)2616  void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err)
2617  {
2618  	dout("%s osdc %p err %d\n", __func__, osdc, err);
2619  	down_write(&osdc->lock);
2620  	for_each_request(osdc, abort_fn, &err);
2621  	osdc->abort_err = err;
2622  	up_write(&osdc->lock);
2623  }
2624  EXPORT_SYMBOL(ceph_osdc_abort_requests);
2625  
ceph_osdc_clear_abort_err(struct ceph_osd_client * osdc)2626  void ceph_osdc_clear_abort_err(struct ceph_osd_client *osdc)
2627  {
2628  	down_write(&osdc->lock);
2629  	osdc->abort_err = 0;
2630  	up_write(&osdc->lock);
2631  }
2632  EXPORT_SYMBOL(ceph_osdc_clear_abort_err);
2633  
update_epoch_barrier(struct ceph_osd_client * osdc,u32 eb)2634  static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2635  {
2636  	if (likely(eb > osdc->epoch_barrier)) {
2637  		dout("updating epoch_barrier from %u to %u\n",
2638  				osdc->epoch_barrier, eb);
2639  		osdc->epoch_barrier = eb;
2640  		/* Request map if we're not to the barrier yet */
2641  		if (eb > osdc->osdmap->epoch)
2642  			maybe_request_map(osdc);
2643  	}
2644  }
2645  
ceph_osdc_update_epoch_barrier(struct ceph_osd_client * osdc,u32 eb)2646  void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2647  {
2648  	down_read(&osdc->lock);
2649  	if (unlikely(eb > osdc->epoch_barrier)) {
2650  		up_read(&osdc->lock);
2651  		down_write(&osdc->lock);
2652  		update_epoch_barrier(osdc, eb);
2653  		up_write(&osdc->lock);
2654  	} else {
2655  		up_read(&osdc->lock);
2656  	}
2657  }
2658  EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2659  
2660  /*
2661   * We can end up releasing caps as a result of abort_request().
2662   * In that case, we probably want to ensure that the cap release message
2663   * has an updated epoch barrier in it, so set the epoch barrier prior to
2664   * aborting the first request.
2665   */
abort_on_full_fn(struct ceph_osd_request * req,void * arg)2666  static int abort_on_full_fn(struct ceph_osd_request *req, void *arg)
2667  {
2668  	struct ceph_osd_client *osdc = req->r_osdc;
2669  	bool *victims = arg;
2670  
2671  	if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2672  	    (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2673  	     pool_full(osdc, req->r_t.base_oloc.pool))) {
2674  		if (!*victims) {
2675  			update_epoch_barrier(osdc, osdc->osdmap->epoch);
2676  			*victims = true;
2677  		}
2678  		abort_request(req, -ENOSPC);
2679  	}
2680  
2681  	return 0; /* continue iteration */
2682  }
2683  
2684  /*
2685   * Drop all pending requests that are stalled waiting on a full condition to
2686   * clear, and complete them with ENOSPC as the return code. Set the
2687   * osdc->epoch_barrier to the latest map epoch that we've seen if any were
2688   * cancelled.
2689   */
ceph_osdc_abort_on_full(struct ceph_osd_client * osdc)2690  static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
2691  {
2692  	bool victims = false;
2693  
2694  	if (ceph_test_opt(osdc->client, ABORT_ON_FULL) &&
2695  	    (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)))
2696  		for_each_request(osdc, abort_on_full_fn, &victims);
2697  }
2698  
check_pool_dne(struct ceph_osd_request * req)2699  static void check_pool_dne(struct ceph_osd_request *req)
2700  {
2701  	struct ceph_osd_client *osdc = req->r_osdc;
2702  	struct ceph_osdmap *map = osdc->osdmap;
2703  
2704  	verify_osdc_wrlocked(osdc);
2705  	WARN_ON(!map->epoch);
2706  
2707  	if (req->r_attempts) {
2708  		/*
2709  		 * We sent a request earlier, which means that
2710  		 * previously the pool existed, and now it does not
2711  		 * (i.e., it was deleted).
2712  		 */
2713  		req->r_map_dne_bound = map->epoch;
2714  		dout("%s req %p tid %llu pool disappeared\n", __func__, req,
2715  		     req->r_tid);
2716  	} else {
2717  		dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
2718  		     req, req->r_tid, req->r_map_dne_bound, map->epoch);
2719  	}
2720  
2721  	if (req->r_map_dne_bound) {
2722  		if (map->epoch >= req->r_map_dne_bound) {
2723  			/* we had a new enough map */
2724  			pr_info_ratelimited("tid %llu pool does not exist\n",
2725  					    req->r_tid);
2726  			complete_request(req, -ENOENT);
2727  		}
2728  	} else {
2729  		send_map_check(req);
2730  	}
2731  }
2732  
map_check_cb(struct ceph_mon_generic_request * greq)2733  static void map_check_cb(struct ceph_mon_generic_request *greq)
2734  {
2735  	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
2736  	struct ceph_osd_request *req;
2737  	u64 tid = greq->private_data;
2738  
2739  	WARN_ON(greq->result || !greq->u.newest);
2740  
2741  	down_write(&osdc->lock);
2742  	req = lookup_request_mc(&osdc->map_checks, tid);
2743  	if (!req) {
2744  		dout("%s tid %llu dne\n", __func__, tid);
2745  		goto out_unlock;
2746  	}
2747  
2748  	dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
2749  	     req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
2750  	if (!req->r_map_dne_bound)
2751  		req->r_map_dne_bound = greq->u.newest;
2752  	erase_request_mc(&osdc->map_checks, req);
2753  	check_pool_dne(req);
2754  
2755  	ceph_osdc_put_request(req);
2756  out_unlock:
2757  	up_write(&osdc->lock);
2758  }
2759  
send_map_check(struct ceph_osd_request * req)2760  static void send_map_check(struct ceph_osd_request *req)
2761  {
2762  	struct ceph_osd_client *osdc = req->r_osdc;
2763  	struct ceph_osd_request *lookup_req;
2764  	int ret;
2765  
2766  	verify_osdc_wrlocked(osdc);
2767  
2768  	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2769  	if (lookup_req) {
2770  		WARN_ON(lookup_req != req);
2771  		return;
2772  	}
2773  
2774  	ceph_osdc_get_request(req);
2775  	insert_request_mc(&osdc->map_checks, req);
2776  	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
2777  					  map_check_cb, req->r_tid);
2778  	WARN_ON(ret);
2779  }
2780  
2781  /*
2782   * lingering requests, watch/notify v2 infrastructure
2783   */
linger_release(struct kref * kref)2784  static void linger_release(struct kref *kref)
2785  {
2786  	struct ceph_osd_linger_request *lreq =
2787  	    container_of(kref, struct ceph_osd_linger_request, kref);
2788  
2789  	dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
2790  	     lreq->reg_req, lreq->ping_req);
2791  	WARN_ON(!RB_EMPTY_NODE(&lreq->node));
2792  	WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
2793  	WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
2794  	WARN_ON(!list_empty(&lreq->scan_item));
2795  	WARN_ON(!list_empty(&lreq->pending_lworks));
2796  	WARN_ON(lreq->osd);
2797  
2798  	if (lreq->request_pl)
2799  		ceph_pagelist_release(lreq->request_pl);
2800  	if (lreq->notify_id_pages)
2801  		ceph_release_page_vector(lreq->notify_id_pages, 1);
2802  
2803  	ceph_osdc_put_request(lreq->reg_req);
2804  	ceph_osdc_put_request(lreq->ping_req);
2805  	target_destroy(&lreq->t);
2806  	kfree(lreq);
2807  }
2808  
linger_put(struct ceph_osd_linger_request * lreq)2809  static void linger_put(struct ceph_osd_linger_request *lreq)
2810  {
2811  	if (lreq)
2812  		kref_put(&lreq->kref, linger_release);
2813  }
2814  
2815  static struct ceph_osd_linger_request *
linger_get(struct ceph_osd_linger_request * lreq)2816  linger_get(struct ceph_osd_linger_request *lreq)
2817  {
2818  	kref_get(&lreq->kref);
2819  	return lreq;
2820  }
2821  
2822  static struct ceph_osd_linger_request *
linger_alloc(struct ceph_osd_client * osdc)2823  linger_alloc(struct ceph_osd_client *osdc)
2824  {
2825  	struct ceph_osd_linger_request *lreq;
2826  
2827  	lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
2828  	if (!lreq)
2829  		return NULL;
2830  
2831  	kref_init(&lreq->kref);
2832  	mutex_init(&lreq->lock);
2833  	RB_CLEAR_NODE(&lreq->node);
2834  	RB_CLEAR_NODE(&lreq->osdc_node);
2835  	RB_CLEAR_NODE(&lreq->mc_node);
2836  	INIT_LIST_HEAD(&lreq->scan_item);
2837  	INIT_LIST_HEAD(&lreq->pending_lworks);
2838  	init_completion(&lreq->reg_commit_wait);
2839  	init_completion(&lreq->notify_finish_wait);
2840  
2841  	lreq->osdc = osdc;
2842  	target_init(&lreq->t);
2843  
2844  	dout("%s lreq %p\n", __func__, lreq);
2845  	return lreq;
2846  }
2847  
DEFINE_RB_INSDEL_FUNCS(linger,struct ceph_osd_linger_request,linger_id,node)2848  DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
2849  DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
2850  DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
2851  
2852  /*
2853   * Create linger request <-> OSD session relation.
2854   *
2855   * @lreq has to be registered, @osd may be homeless.
2856   */
2857  static void link_linger(struct ceph_osd *osd,
2858  			struct ceph_osd_linger_request *lreq)
2859  {
2860  	verify_osd_locked(osd);
2861  	WARN_ON(!lreq->linger_id || lreq->osd);
2862  	dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2863  	     osd->o_osd, lreq, lreq->linger_id);
2864  
2865  	if (!osd_homeless(osd))
2866  		__remove_osd_from_lru(osd);
2867  	else
2868  		atomic_inc(&osd->o_osdc->num_homeless);
2869  
2870  	get_osd(osd);
2871  	insert_linger(&osd->o_linger_requests, lreq);
2872  	lreq->osd = osd;
2873  }
2874  
unlink_linger(struct ceph_osd * osd,struct ceph_osd_linger_request * lreq)2875  static void unlink_linger(struct ceph_osd *osd,
2876  			  struct ceph_osd_linger_request *lreq)
2877  {
2878  	verify_osd_locked(osd);
2879  	WARN_ON(lreq->osd != osd);
2880  	dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2881  	     osd->o_osd, lreq, lreq->linger_id);
2882  
2883  	lreq->osd = NULL;
2884  	erase_linger(&osd->o_linger_requests, lreq);
2885  	put_osd(osd);
2886  
2887  	if (!osd_homeless(osd))
2888  		maybe_move_osd_to_lru(osd);
2889  	else
2890  		atomic_dec(&osd->o_osdc->num_homeless);
2891  }
2892  
__linger_registered(struct ceph_osd_linger_request * lreq)2893  static bool __linger_registered(struct ceph_osd_linger_request *lreq)
2894  {
2895  	verify_osdc_locked(lreq->osdc);
2896  
2897  	return !RB_EMPTY_NODE(&lreq->osdc_node);
2898  }
2899  
linger_registered(struct ceph_osd_linger_request * lreq)2900  static bool linger_registered(struct ceph_osd_linger_request *lreq)
2901  {
2902  	struct ceph_osd_client *osdc = lreq->osdc;
2903  	bool registered;
2904  
2905  	down_read(&osdc->lock);
2906  	registered = __linger_registered(lreq);
2907  	up_read(&osdc->lock);
2908  
2909  	return registered;
2910  }
2911  
linger_register(struct ceph_osd_linger_request * lreq)2912  static void linger_register(struct ceph_osd_linger_request *lreq)
2913  {
2914  	struct ceph_osd_client *osdc = lreq->osdc;
2915  
2916  	verify_osdc_wrlocked(osdc);
2917  	WARN_ON(lreq->linger_id);
2918  
2919  	linger_get(lreq);
2920  	lreq->linger_id = ++osdc->last_linger_id;
2921  	insert_linger_osdc(&osdc->linger_requests, lreq);
2922  }
2923  
linger_unregister(struct ceph_osd_linger_request * lreq)2924  static void linger_unregister(struct ceph_osd_linger_request *lreq)
2925  {
2926  	struct ceph_osd_client *osdc = lreq->osdc;
2927  
2928  	verify_osdc_wrlocked(osdc);
2929  
2930  	erase_linger_osdc(&osdc->linger_requests, lreq);
2931  	linger_put(lreq);
2932  }
2933  
cancel_linger_request(struct ceph_osd_request * req)2934  static void cancel_linger_request(struct ceph_osd_request *req)
2935  {
2936  	struct ceph_osd_linger_request *lreq = req->r_priv;
2937  
2938  	WARN_ON(!req->r_linger);
2939  	cancel_request(req);
2940  	linger_put(lreq);
2941  }
2942  
2943  struct linger_work {
2944  	struct work_struct work;
2945  	struct ceph_osd_linger_request *lreq;
2946  	struct list_head pending_item;
2947  	unsigned long queued_stamp;
2948  
2949  	union {
2950  		struct {
2951  			u64 notify_id;
2952  			u64 notifier_id;
2953  			void *payload; /* points into @msg front */
2954  			size_t payload_len;
2955  
2956  			struct ceph_msg *msg; /* for ceph_msg_put() */
2957  		} notify;
2958  		struct {
2959  			int err;
2960  		} error;
2961  	};
2962  };
2963  
lwork_alloc(struct ceph_osd_linger_request * lreq,work_func_t workfn)2964  static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
2965  				       work_func_t workfn)
2966  {
2967  	struct linger_work *lwork;
2968  
2969  	lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
2970  	if (!lwork)
2971  		return NULL;
2972  
2973  	INIT_WORK(&lwork->work, workfn);
2974  	INIT_LIST_HEAD(&lwork->pending_item);
2975  	lwork->lreq = linger_get(lreq);
2976  
2977  	return lwork;
2978  }
2979  
lwork_free(struct linger_work * lwork)2980  static void lwork_free(struct linger_work *lwork)
2981  {
2982  	struct ceph_osd_linger_request *lreq = lwork->lreq;
2983  
2984  	mutex_lock(&lreq->lock);
2985  	list_del(&lwork->pending_item);
2986  	mutex_unlock(&lreq->lock);
2987  
2988  	linger_put(lreq);
2989  	kfree(lwork);
2990  }
2991  
lwork_queue(struct linger_work * lwork)2992  static void lwork_queue(struct linger_work *lwork)
2993  {
2994  	struct ceph_osd_linger_request *lreq = lwork->lreq;
2995  	struct ceph_osd_client *osdc = lreq->osdc;
2996  
2997  	verify_lreq_locked(lreq);
2998  	WARN_ON(!list_empty(&lwork->pending_item));
2999  
3000  	lwork->queued_stamp = jiffies;
3001  	list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
3002  	queue_work(osdc->notify_wq, &lwork->work);
3003  }
3004  
do_watch_notify(struct work_struct * w)3005  static void do_watch_notify(struct work_struct *w)
3006  {
3007  	struct linger_work *lwork = container_of(w, struct linger_work, work);
3008  	struct ceph_osd_linger_request *lreq = lwork->lreq;
3009  
3010  	if (!linger_registered(lreq)) {
3011  		dout("%s lreq %p not registered\n", __func__, lreq);
3012  		goto out;
3013  	}
3014  
3015  	WARN_ON(!lreq->is_watch);
3016  	dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
3017  	     __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
3018  	     lwork->notify.payload_len);
3019  	lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
3020  		  lwork->notify.notifier_id, lwork->notify.payload,
3021  		  lwork->notify.payload_len);
3022  
3023  out:
3024  	ceph_msg_put(lwork->notify.msg);
3025  	lwork_free(lwork);
3026  }
3027  
do_watch_error(struct work_struct * w)3028  static void do_watch_error(struct work_struct *w)
3029  {
3030  	struct linger_work *lwork = container_of(w, struct linger_work, work);
3031  	struct ceph_osd_linger_request *lreq = lwork->lreq;
3032  
3033  	if (!linger_registered(lreq)) {
3034  		dout("%s lreq %p not registered\n", __func__, lreq);
3035  		goto out;
3036  	}
3037  
3038  	dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
3039  	lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
3040  
3041  out:
3042  	lwork_free(lwork);
3043  }
3044  
queue_watch_error(struct ceph_osd_linger_request * lreq)3045  static void queue_watch_error(struct ceph_osd_linger_request *lreq)
3046  {
3047  	struct linger_work *lwork;
3048  
3049  	lwork = lwork_alloc(lreq, do_watch_error);
3050  	if (!lwork) {
3051  		pr_err("failed to allocate error-lwork\n");
3052  		return;
3053  	}
3054  
3055  	lwork->error.err = lreq->last_error;
3056  	lwork_queue(lwork);
3057  }
3058  
linger_reg_commit_complete(struct ceph_osd_linger_request * lreq,int result)3059  static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
3060  				       int result)
3061  {
3062  	if (!completion_done(&lreq->reg_commit_wait)) {
3063  		lreq->reg_commit_error = (result <= 0 ? result : 0);
3064  		complete_all(&lreq->reg_commit_wait);
3065  	}
3066  }
3067  
linger_commit_cb(struct ceph_osd_request * req)3068  static void linger_commit_cb(struct ceph_osd_request *req)
3069  {
3070  	struct ceph_osd_linger_request *lreq = req->r_priv;
3071  
3072  	mutex_lock(&lreq->lock);
3073  	if (req != lreq->reg_req) {
3074  		dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3075  		     __func__, lreq, lreq->linger_id, req, lreq->reg_req);
3076  		goto out;
3077  	}
3078  
3079  	dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
3080  	     lreq->linger_id, req->r_result);
3081  	linger_reg_commit_complete(lreq, req->r_result);
3082  	lreq->committed = true;
3083  
3084  	if (!lreq->is_watch) {
3085  		struct ceph_osd_data *osd_data =
3086  		    osd_req_op_data(req, 0, notify, response_data);
3087  		void *p = page_address(osd_data->pages[0]);
3088  
3089  		WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
3090  			osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
3091  
3092  		/* make note of the notify_id */
3093  		if (req->r_ops[0].outdata_len >= sizeof(u64)) {
3094  			lreq->notify_id = ceph_decode_64(&p);
3095  			dout("lreq %p notify_id %llu\n", lreq,
3096  			     lreq->notify_id);
3097  		} else {
3098  			dout("lreq %p no notify_id\n", lreq);
3099  		}
3100  	}
3101  
3102  out:
3103  	mutex_unlock(&lreq->lock);
3104  	linger_put(lreq);
3105  }
3106  
normalize_watch_error(int err)3107  static int normalize_watch_error(int err)
3108  {
3109  	/*
3110  	 * Translate ENOENT -> ENOTCONN so that a delete->disconnection
3111  	 * notification and a failure to reconnect because we raced with
3112  	 * the delete appear the same to the user.
3113  	 */
3114  	if (err == -ENOENT)
3115  		err = -ENOTCONN;
3116  
3117  	return err;
3118  }
3119  
linger_reconnect_cb(struct ceph_osd_request * req)3120  static void linger_reconnect_cb(struct ceph_osd_request *req)
3121  {
3122  	struct ceph_osd_linger_request *lreq = req->r_priv;
3123  
3124  	mutex_lock(&lreq->lock);
3125  	if (req != lreq->reg_req) {
3126  		dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3127  		     __func__, lreq, lreq->linger_id, req, lreq->reg_req);
3128  		goto out;
3129  	}
3130  
3131  	dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
3132  	     lreq, lreq->linger_id, req->r_result, lreq->last_error);
3133  	if (req->r_result < 0) {
3134  		if (!lreq->last_error) {
3135  			lreq->last_error = normalize_watch_error(req->r_result);
3136  			queue_watch_error(lreq);
3137  		}
3138  	}
3139  
3140  out:
3141  	mutex_unlock(&lreq->lock);
3142  	linger_put(lreq);
3143  }
3144  
send_linger(struct ceph_osd_linger_request * lreq)3145  static void send_linger(struct ceph_osd_linger_request *lreq)
3146  {
3147  	struct ceph_osd_client *osdc = lreq->osdc;
3148  	struct ceph_osd_request *req;
3149  	int ret;
3150  
3151  	verify_osdc_wrlocked(osdc);
3152  	mutex_lock(&lreq->lock);
3153  	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3154  
3155  	if (lreq->reg_req) {
3156  		if (lreq->reg_req->r_osd)
3157  			cancel_linger_request(lreq->reg_req);
3158  		ceph_osdc_put_request(lreq->reg_req);
3159  	}
3160  
3161  	req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO);
3162  	BUG_ON(!req);
3163  
3164  	target_copy(&req->r_t, &lreq->t);
3165  	req->r_mtime = lreq->mtime;
3166  
3167  	if (lreq->is_watch && lreq->committed) {
3168  		osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_RECONNECT,
3169  				      lreq->linger_id, ++lreq->register_gen);
3170  		dout("lreq %p reconnect register_gen %u\n", lreq,
3171  		     req->r_ops[0].watch.gen);
3172  		req->r_callback = linger_reconnect_cb;
3173  	} else {
3174  		if (lreq->is_watch) {
3175  			osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_WATCH,
3176  					      lreq->linger_id, 0);
3177  		} else {
3178  			lreq->notify_id = 0;
3179  
3180  			refcount_inc(&lreq->request_pl->refcnt);
3181  			osd_req_op_notify_init(req, 0, lreq->linger_id,
3182  					       lreq->request_pl);
3183  			ceph_osd_data_pages_init(
3184  			    osd_req_op_data(req, 0, notify, response_data),
3185  			    lreq->notify_id_pages, PAGE_SIZE, 0, false, false);
3186  		}
3187  		dout("lreq %p register\n", lreq);
3188  		req->r_callback = linger_commit_cb;
3189  	}
3190  
3191  	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3192  	BUG_ON(ret);
3193  
3194  	req->r_priv = linger_get(lreq);
3195  	req->r_linger = true;
3196  	lreq->reg_req = req;
3197  	mutex_unlock(&lreq->lock);
3198  
3199  	submit_request(req, true);
3200  }
3201  
linger_ping_cb(struct ceph_osd_request * req)3202  static void linger_ping_cb(struct ceph_osd_request *req)
3203  {
3204  	struct ceph_osd_linger_request *lreq = req->r_priv;
3205  
3206  	mutex_lock(&lreq->lock);
3207  	if (req != lreq->ping_req) {
3208  		dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n",
3209  		     __func__, lreq, lreq->linger_id, req, lreq->ping_req);
3210  		goto out;
3211  	}
3212  
3213  	dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
3214  	     __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
3215  	     lreq->last_error);
3216  	if (lreq->register_gen == req->r_ops[0].watch.gen) {
3217  		if (!req->r_result) {
3218  			lreq->watch_valid_thru = lreq->ping_sent;
3219  		} else if (!lreq->last_error) {
3220  			lreq->last_error = normalize_watch_error(req->r_result);
3221  			queue_watch_error(lreq);
3222  		}
3223  	} else {
3224  		dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
3225  		     lreq->register_gen, req->r_ops[0].watch.gen);
3226  	}
3227  
3228  out:
3229  	mutex_unlock(&lreq->lock);
3230  	linger_put(lreq);
3231  }
3232  
send_linger_ping(struct ceph_osd_linger_request * lreq)3233  static void send_linger_ping(struct ceph_osd_linger_request *lreq)
3234  {
3235  	struct ceph_osd_client *osdc = lreq->osdc;
3236  	struct ceph_osd_request *req;
3237  	int ret;
3238  
3239  	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
3240  		dout("%s PAUSERD\n", __func__);
3241  		return;
3242  	}
3243  
3244  	lreq->ping_sent = jiffies;
3245  	dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
3246  	     __func__, lreq, lreq->linger_id, lreq->ping_sent,
3247  	     lreq->register_gen);
3248  
3249  	if (lreq->ping_req) {
3250  		if (lreq->ping_req->r_osd)
3251  			cancel_linger_request(lreq->ping_req);
3252  		ceph_osdc_put_request(lreq->ping_req);
3253  	}
3254  
3255  	req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO);
3256  	BUG_ON(!req);
3257  
3258  	target_copy(&req->r_t, &lreq->t);
3259  	osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_PING, lreq->linger_id,
3260  			      lreq->register_gen);
3261  	req->r_callback = linger_ping_cb;
3262  
3263  	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3264  	BUG_ON(ret);
3265  
3266  	req->r_priv = linger_get(lreq);
3267  	req->r_linger = true;
3268  	lreq->ping_req = req;
3269  
3270  	ceph_osdc_get_request(req);
3271  	account_request(req);
3272  	req->r_tid = atomic64_inc_return(&osdc->last_tid);
3273  	link_request(lreq->osd, req);
3274  	send_request(req);
3275  }
3276  
linger_submit(struct ceph_osd_linger_request * lreq)3277  static void linger_submit(struct ceph_osd_linger_request *lreq)
3278  {
3279  	struct ceph_osd_client *osdc = lreq->osdc;
3280  	struct ceph_osd *osd;
3281  
3282  	down_write(&osdc->lock);
3283  	linger_register(lreq);
3284  
3285  	calc_target(osdc, &lreq->t, false);
3286  	osd = lookup_create_osd(osdc, lreq->t.osd, true);
3287  	link_linger(osd, lreq);
3288  
3289  	send_linger(lreq);
3290  	up_write(&osdc->lock);
3291  }
3292  
cancel_linger_map_check(struct ceph_osd_linger_request * lreq)3293  static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
3294  {
3295  	struct ceph_osd_client *osdc = lreq->osdc;
3296  	struct ceph_osd_linger_request *lookup_lreq;
3297  
3298  	verify_osdc_wrlocked(osdc);
3299  
3300  	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3301  				       lreq->linger_id);
3302  	if (!lookup_lreq)
3303  		return;
3304  
3305  	WARN_ON(lookup_lreq != lreq);
3306  	erase_linger_mc(&osdc->linger_map_checks, lreq);
3307  	linger_put(lreq);
3308  }
3309  
3310  /*
3311   * @lreq has to be both registered and linked.
3312   */
__linger_cancel(struct ceph_osd_linger_request * lreq)3313  static void __linger_cancel(struct ceph_osd_linger_request *lreq)
3314  {
3315  	if (lreq->ping_req && lreq->ping_req->r_osd)
3316  		cancel_linger_request(lreq->ping_req);
3317  	if (lreq->reg_req && lreq->reg_req->r_osd)
3318  		cancel_linger_request(lreq->reg_req);
3319  	cancel_linger_map_check(lreq);
3320  	unlink_linger(lreq->osd, lreq);
3321  	linger_unregister(lreq);
3322  }
3323  
linger_cancel(struct ceph_osd_linger_request * lreq)3324  static void linger_cancel(struct ceph_osd_linger_request *lreq)
3325  {
3326  	struct ceph_osd_client *osdc = lreq->osdc;
3327  
3328  	down_write(&osdc->lock);
3329  	if (__linger_registered(lreq))
3330  		__linger_cancel(lreq);
3331  	up_write(&osdc->lock);
3332  }
3333  
3334  static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
3335  
check_linger_pool_dne(struct ceph_osd_linger_request * lreq)3336  static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
3337  {
3338  	struct ceph_osd_client *osdc = lreq->osdc;
3339  	struct ceph_osdmap *map = osdc->osdmap;
3340  
3341  	verify_osdc_wrlocked(osdc);
3342  	WARN_ON(!map->epoch);
3343  
3344  	if (lreq->register_gen) {
3345  		lreq->map_dne_bound = map->epoch;
3346  		dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
3347  		     lreq, lreq->linger_id);
3348  	} else {
3349  		dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
3350  		     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3351  		     map->epoch);
3352  	}
3353  
3354  	if (lreq->map_dne_bound) {
3355  		if (map->epoch >= lreq->map_dne_bound) {
3356  			/* we had a new enough map */
3357  			pr_info("linger_id %llu pool does not exist\n",
3358  				lreq->linger_id);
3359  			linger_reg_commit_complete(lreq, -ENOENT);
3360  			__linger_cancel(lreq);
3361  		}
3362  	} else {
3363  		send_linger_map_check(lreq);
3364  	}
3365  }
3366  
linger_map_check_cb(struct ceph_mon_generic_request * greq)3367  static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
3368  {
3369  	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
3370  	struct ceph_osd_linger_request *lreq;
3371  	u64 linger_id = greq->private_data;
3372  
3373  	WARN_ON(greq->result || !greq->u.newest);
3374  
3375  	down_write(&osdc->lock);
3376  	lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
3377  	if (!lreq) {
3378  		dout("%s linger_id %llu dne\n", __func__, linger_id);
3379  		goto out_unlock;
3380  	}
3381  
3382  	dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
3383  	     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3384  	     greq->u.newest);
3385  	if (!lreq->map_dne_bound)
3386  		lreq->map_dne_bound = greq->u.newest;
3387  	erase_linger_mc(&osdc->linger_map_checks, lreq);
3388  	check_linger_pool_dne(lreq);
3389  
3390  	linger_put(lreq);
3391  out_unlock:
3392  	up_write(&osdc->lock);
3393  }
3394  
send_linger_map_check(struct ceph_osd_linger_request * lreq)3395  static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
3396  {
3397  	struct ceph_osd_client *osdc = lreq->osdc;
3398  	struct ceph_osd_linger_request *lookup_lreq;
3399  	int ret;
3400  
3401  	verify_osdc_wrlocked(osdc);
3402  
3403  	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3404  				       lreq->linger_id);
3405  	if (lookup_lreq) {
3406  		WARN_ON(lookup_lreq != lreq);
3407  		return;
3408  	}
3409  
3410  	linger_get(lreq);
3411  	insert_linger_mc(&osdc->linger_map_checks, lreq);
3412  	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
3413  					  linger_map_check_cb, lreq->linger_id);
3414  	WARN_ON(ret);
3415  }
3416  
linger_reg_commit_wait(struct ceph_osd_linger_request * lreq)3417  static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
3418  {
3419  	int ret;
3420  
3421  	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3422  	ret = wait_for_completion_killable(&lreq->reg_commit_wait);
3423  	return ret ?: lreq->reg_commit_error;
3424  }
3425  
linger_notify_finish_wait(struct ceph_osd_linger_request * lreq,unsigned long timeout)3426  static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq,
3427  				     unsigned long timeout)
3428  {
3429  	long left;
3430  
3431  	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3432  	left = wait_for_completion_killable_timeout(&lreq->notify_finish_wait,
3433  						ceph_timeout_jiffies(timeout));
3434  	if (left <= 0)
3435  		left = left ?: -ETIMEDOUT;
3436  	else
3437  		left = lreq->notify_finish_error; /* completed */
3438  
3439  	return left;
3440  }
3441  
3442  /*
3443   * Timeout callback, called every N seconds.  When 1 or more OSD
3444   * requests has been active for more than N seconds, we send a keepalive
3445   * (tag + timestamp) to its OSD to ensure any communications channel
3446   * reset is detected.
3447   */
handle_timeout(struct work_struct * work)3448  static void handle_timeout(struct work_struct *work)
3449  {
3450  	struct ceph_osd_client *osdc =
3451  		container_of(work, struct ceph_osd_client, timeout_work.work);
3452  	struct ceph_options *opts = osdc->client->options;
3453  	unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
3454  	unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout;
3455  	LIST_HEAD(slow_osds);
3456  	struct rb_node *n, *p;
3457  
3458  	dout("%s osdc %p\n", __func__, osdc);
3459  	down_write(&osdc->lock);
3460  
3461  	/*
3462  	 * ping osds that are a bit slow.  this ensures that if there
3463  	 * is a break in the TCP connection we will notice, and reopen
3464  	 * a connection with that osd (from the fault callback).
3465  	 */
3466  	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
3467  		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3468  		bool found = false;
3469  
3470  		for (p = rb_first(&osd->o_requests); p; ) {
3471  			struct ceph_osd_request *req =
3472  			    rb_entry(p, struct ceph_osd_request, r_node);
3473  
3474  			p = rb_next(p); /* abort_request() */
3475  
3476  			if (time_before(req->r_stamp, cutoff)) {
3477  				dout(" req %p tid %llu on osd%d is laggy\n",
3478  				     req, req->r_tid, osd->o_osd);
3479  				found = true;
3480  			}
3481  			if (opts->osd_request_timeout &&
3482  			    time_before(req->r_start_stamp, expiry_cutoff)) {
3483  				pr_err_ratelimited("tid %llu on osd%d timeout\n",
3484  				       req->r_tid, osd->o_osd);
3485  				abort_request(req, -ETIMEDOUT);
3486  			}
3487  		}
3488  		for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
3489  			struct ceph_osd_linger_request *lreq =
3490  			    rb_entry(p, struct ceph_osd_linger_request, node);
3491  
3492  			dout(" lreq %p linger_id %llu is served by osd%d\n",
3493  			     lreq, lreq->linger_id, osd->o_osd);
3494  			found = true;
3495  
3496  			mutex_lock(&lreq->lock);
3497  			if (lreq->is_watch && lreq->committed && !lreq->last_error)
3498  				send_linger_ping(lreq);
3499  			mutex_unlock(&lreq->lock);
3500  		}
3501  
3502  		if (found)
3503  			list_move_tail(&osd->o_keepalive_item, &slow_osds);
3504  	}
3505  
3506  	if (opts->osd_request_timeout) {
3507  		for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
3508  			struct ceph_osd_request *req =
3509  			    rb_entry(p, struct ceph_osd_request, r_node);
3510  
3511  			p = rb_next(p); /* abort_request() */
3512  
3513  			if (time_before(req->r_start_stamp, expiry_cutoff)) {
3514  				pr_err_ratelimited("tid %llu on osd%d timeout\n",
3515  				       req->r_tid, osdc->homeless_osd.o_osd);
3516  				abort_request(req, -ETIMEDOUT);
3517  			}
3518  		}
3519  	}
3520  
3521  	if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
3522  		maybe_request_map(osdc);
3523  
3524  	while (!list_empty(&slow_osds)) {
3525  		struct ceph_osd *osd = list_first_entry(&slow_osds,
3526  							struct ceph_osd,
3527  							o_keepalive_item);
3528  		list_del_init(&osd->o_keepalive_item);
3529  		ceph_con_keepalive(&osd->o_con);
3530  	}
3531  
3532  	up_write(&osdc->lock);
3533  	schedule_delayed_work(&osdc->timeout_work,
3534  			      osdc->client->options->osd_keepalive_timeout);
3535  }
3536  
handle_osds_timeout(struct work_struct * work)3537  static void handle_osds_timeout(struct work_struct *work)
3538  {
3539  	struct ceph_osd_client *osdc =
3540  		container_of(work, struct ceph_osd_client,
3541  			     osds_timeout_work.work);
3542  	unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
3543  	struct ceph_osd *osd, *nosd;
3544  
3545  	dout("%s osdc %p\n", __func__, osdc);
3546  	down_write(&osdc->lock);
3547  	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
3548  		if (time_before(jiffies, osd->lru_ttl))
3549  			break;
3550  
3551  		WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
3552  		WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
3553  		close_osd(osd);
3554  	}
3555  
3556  	up_write(&osdc->lock);
3557  	schedule_delayed_work(&osdc->osds_timeout_work,
3558  			      round_jiffies_relative(delay));
3559  }
3560  
ceph_oloc_decode(void ** p,void * end,struct ceph_object_locator * oloc)3561  static int ceph_oloc_decode(void **p, void *end,
3562  			    struct ceph_object_locator *oloc)
3563  {
3564  	u8 struct_v, struct_cv;
3565  	u32 len;
3566  	void *struct_end;
3567  	int ret = 0;
3568  
3569  	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3570  	struct_v = ceph_decode_8(p);
3571  	struct_cv = ceph_decode_8(p);
3572  	if (struct_v < 3) {
3573  		pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
3574  			struct_v, struct_cv);
3575  		goto e_inval;
3576  	}
3577  	if (struct_cv > 6) {
3578  		pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
3579  			struct_v, struct_cv);
3580  		goto e_inval;
3581  	}
3582  	len = ceph_decode_32(p);
3583  	ceph_decode_need(p, end, len, e_inval);
3584  	struct_end = *p + len;
3585  
3586  	oloc->pool = ceph_decode_64(p);
3587  	*p += 4; /* skip preferred */
3588  
3589  	len = ceph_decode_32(p);
3590  	if (len > 0) {
3591  		pr_warn("ceph_object_locator::key is set\n");
3592  		goto e_inval;
3593  	}
3594  
3595  	if (struct_v >= 5) {
3596  		bool changed = false;
3597  
3598  		len = ceph_decode_32(p);
3599  		if (len > 0) {
3600  			ceph_decode_need(p, end, len, e_inval);
3601  			if (!oloc->pool_ns ||
3602  			    ceph_compare_string(oloc->pool_ns, *p, len))
3603  				changed = true;
3604  			*p += len;
3605  		} else {
3606  			if (oloc->pool_ns)
3607  				changed = true;
3608  		}
3609  		if (changed) {
3610  			/* redirect changes namespace */
3611  			pr_warn("ceph_object_locator::nspace is changed\n");
3612  			goto e_inval;
3613  		}
3614  	}
3615  
3616  	if (struct_v >= 6) {
3617  		s64 hash = ceph_decode_64(p);
3618  		if (hash != -1) {
3619  			pr_warn("ceph_object_locator::hash is set\n");
3620  			goto e_inval;
3621  		}
3622  	}
3623  
3624  	/* skip the rest */
3625  	*p = struct_end;
3626  out:
3627  	return ret;
3628  
3629  e_inval:
3630  	ret = -EINVAL;
3631  	goto out;
3632  }
3633  
ceph_redirect_decode(void ** p,void * end,struct ceph_request_redirect * redir)3634  static int ceph_redirect_decode(void **p, void *end,
3635  				struct ceph_request_redirect *redir)
3636  {
3637  	u8 struct_v, struct_cv;
3638  	u32 len;
3639  	void *struct_end;
3640  	int ret;
3641  
3642  	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3643  	struct_v = ceph_decode_8(p);
3644  	struct_cv = ceph_decode_8(p);
3645  	if (struct_cv > 1) {
3646  		pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
3647  			struct_v, struct_cv);
3648  		goto e_inval;
3649  	}
3650  	len = ceph_decode_32(p);
3651  	ceph_decode_need(p, end, len, e_inval);
3652  	struct_end = *p + len;
3653  
3654  	ret = ceph_oloc_decode(p, end, &redir->oloc);
3655  	if (ret)
3656  		goto out;
3657  
3658  	len = ceph_decode_32(p);
3659  	if (len > 0) {
3660  		pr_warn("ceph_request_redirect::object_name is set\n");
3661  		goto e_inval;
3662  	}
3663  
3664  	/* skip the rest */
3665  	*p = struct_end;
3666  out:
3667  	return ret;
3668  
3669  e_inval:
3670  	ret = -EINVAL;
3671  	goto out;
3672  }
3673  
3674  struct MOSDOpReply {
3675  	struct ceph_pg pgid;
3676  	u64 flags;
3677  	int result;
3678  	u32 epoch;
3679  	int num_ops;
3680  	u32 outdata_len[CEPH_OSD_MAX_OPS];
3681  	s32 rval[CEPH_OSD_MAX_OPS];
3682  	int retry_attempt;
3683  	struct ceph_eversion replay_version;
3684  	u64 user_version;
3685  	struct ceph_request_redirect redirect;
3686  };
3687  
decode_MOSDOpReply(const struct ceph_msg * msg,struct MOSDOpReply * m)3688  static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
3689  {
3690  	void *p = msg->front.iov_base;
3691  	void *const end = p + msg->front.iov_len;
3692  	u16 version = le16_to_cpu(msg->hdr.version);
3693  	struct ceph_eversion bad_replay_version;
3694  	u8 decode_redir;
3695  	u32 len;
3696  	int ret;
3697  	int i;
3698  
3699  	ceph_decode_32_safe(&p, end, len, e_inval);
3700  	ceph_decode_need(&p, end, len, e_inval);
3701  	p += len; /* skip oid */
3702  
3703  	ret = ceph_decode_pgid(&p, end, &m->pgid);
3704  	if (ret)
3705  		return ret;
3706  
3707  	ceph_decode_64_safe(&p, end, m->flags, e_inval);
3708  	ceph_decode_32_safe(&p, end, m->result, e_inval);
3709  	ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
3710  	memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
3711  	p += sizeof(bad_replay_version);
3712  	ceph_decode_32_safe(&p, end, m->epoch, e_inval);
3713  
3714  	ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
3715  	if (m->num_ops > ARRAY_SIZE(m->outdata_len))
3716  		goto e_inval;
3717  
3718  	ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
3719  			 e_inval);
3720  	for (i = 0; i < m->num_ops; i++) {
3721  		struct ceph_osd_op *op = p;
3722  
3723  		m->outdata_len[i] = le32_to_cpu(op->payload_len);
3724  		p += sizeof(*op);
3725  	}
3726  
3727  	ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
3728  	for (i = 0; i < m->num_ops; i++)
3729  		ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
3730  
3731  	if (version >= 5) {
3732  		ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
3733  		memcpy(&m->replay_version, p, sizeof(m->replay_version));
3734  		p += sizeof(m->replay_version);
3735  		ceph_decode_64_safe(&p, end, m->user_version, e_inval);
3736  	} else {
3737  		m->replay_version = bad_replay_version; /* struct */
3738  		m->user_version = le64_to_cpu(m->replay_version.version);
3739  	}
3740  
3741  	if (version >= 6) {
3742  		if (version >= 7)
3743  			ceph_decode_8_safe(&p, end, decode_redir, e_inval);
3744  		else
3745  			decode_redir = 1;
3746  	} else {
3747  		decode_redir = 0;
3748  	}
3749  
3750  	if (decode_redir) {
3751  		ret = ceph_redirect_decode(&p, end, &m->redirect);
3752  		if (ret)
3753  			return ret;
3754  	} else {
3755  		ceph_oloc_init(&m->redirect.oloc);
3756  	}
3757  
3758  	return 0;
3759  
3760  e_inval:
3761  	return -EINVAL;
3762  }
3763  
3764  /*
3765   * Handle MOSDOpReply.  Set ->r_result and call the callback if it is
3766   * specified.
3767   */
handle_reply(struct ceph_osd * osd,struct ceph_msg * msg)3768  static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3769  {
3770  	struct ceph_osd_client *osdc = osd->o_osdc;
3771  	struct ceph_osd_request *req;
3772  	struct MOSDOpReply m;
3773  	u64 tid = le64_to_cpu(msg->hdr.tid);
3774  	u32 data_len = 0;
3775  	int ret;
3776  	int i;
3777  
3778  	dout("%s msg %p tid %llu\n", __func__, msg, tid);
3779  
3780  	down_read(&osdc->lock);
3781  	if (!osd_registered(osd)) {
3782  		dout("%s osd%d unknown\n", __func__, osd->o_osd);
3783  		goto out_unlock_osdc;
3784  	}
3785  	WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
3786  
3787  	mutex_lock(&osd->lock);
3788  	req = lookup_request(&osd->o_requests, tid);
3789  	if (!req) {
3790  		dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
3791  		goto out_unlock_session;
3792  	}
3793  
3794  	m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
3795  	ret = decode_MOSDOpReply(msg, &m);
3796  	m.redirect.oloc.pool_ns = NULL;
3797  	if (ret) {
3798  		pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
3799  		       req->r_tid, ret);
3800  		ceph_msg_dump(msg);
3801  		goto fail_request;
3802  	}
3803  	dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
3804  	     __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
3805  	     m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
3806  	     le64_to_cpu(m.replay_version.version), m.user_version);
3807  
3808  	if (m.retry_attempt >= 0) {
3809  		if (m.retry_attempt != req->r_attempts - 1) {
3810  			dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
3811  			     req, req->r_tid, m.retry_attempt,
3812  			     req->r_attempts - 1);
3813  			goto out_unlock_session;
3814  		}
3815  	} else {
3816  		WARN_ON(1); /* MOSDOpReply v4 is assumed */
3817  	}
3818  
3819  	if (!ceph_oloc_empty(&m.redirect.oloc)) {
3820  		dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
3821  		     m.redirect.oloc.pool);
3822  		unlink_request(osd, req);
3823  		mutex_unlock(&osd->lock);
3824  
3825  		/*
3826  		 * Not ceph_oloc_copy() - changing pool_ns is not
3827  		 * supported.
3828  		 */
3829  		req->r_t.target_oloc.pool = m.redirect.oloc.pool;
3830  		req->r_flags |= CEPH_OSD_FLAG_REDIRECTED |
3831  				CEPH_OSD_FLAG_IGNORE_OVERLAY |
3832  				CEPH_OSD_FLAG_IGNORE_CACHE;
3833  		req->r_tid = 0;
3834  		__submit_request(req, false);
3835  		goto out_unlock_osdc;
3836  	}
3837  
3838  	if (m.result == -EAGAIN) {
3839  		dout("req %p tid %llu EAGAIN\n", req, req->r_tid);
3840  		unlink_request(osd, req);
3841  		mutex_unlock(&osd->lock);
3842  
3843  		/*
3844  		 * The object is missing on the replica or not (yet)
3845  		 * readable.  Clear pgid to force a resend to the primary
3846  		 * via legacy_change.
3847  		 */
3848  		req->r_t.pgid.pool = 0;
3849  		req->r_t.pgid.seed = 0;
3850  		WARN_ON(!req->r_t.used_replica);
3851  		req->r_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
3852  				  CEPH_OSD_FLAG_LOCALIZE_READS);
3853  		req->r_tid = 0;
3854  		__submit_request(req, false);
3855  		goto out_unlock_osdc;
3856  	}
3857  
3858  	if (m.num_ops != req->r_num_ops) {
3859  		pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
3860  		       req->r_num_ops, req->r_tid);
3861  		goto fail_request;
3862  	}
3863  	for (i = 0; i < req->r_num_ops; i++) {
3864  		dout(" req %p tid %llu op %d rval %d len %u\n", req,
3865  		     req->r_tid, i, m.rval[i], m.outdata_len[i]);
3866  		req->r_ops[i].rval = m.rval[i];
3867  		req->r_ops[i].outdata_len = m.outdata_len[i];
3868  		data_len += m.outdata_len[i];
3869  	}
3870  	if (data_len != le32_to_cpu(msg->hdr.data_len)) {
3871  		pr_err("sum of lens %u != %u for tid %llu\n", data_len,
3872  		       le32_to_cpu(msg->hdr.data_len), req->r_tid);
3873  		goto fail_request;
3874  	}
3875  	dout("%s req %p tid %llu result %d data_len %u\n", __func__,
3876  	     req, req->r_tid, m.result, data_len);
3877  
3878  	/*
3879  	 * Since we only ever request ONDISK, we should only ever get
3880  	 * one (type of) reply back.
3881  	 */
3882  	WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
3883  	req->r_version = m.user_version;
3884  	req->r_result = m.result ?: data_len;
3885  	finish_request(req);
3886  	mutex_unlock(&osd->lock);
3887  	up_read(&osdc->lock);
3888  
3889  	__complete_request(req);
3890  	return;
3891  
3892  fail_request:
3893  	complete_request(req, -EIO);
3894  out_unlock_session:
3895  	mutex_unlock(&osd->lock);
3896  out_unlock_osdc:
3897  	up_read(&osdc->lock);
3898  }
3899  
set_pool_was_full(struct ceph_osd_client * osdc)3900  static void set_pool_was_full(struct ceph_osd_client *osdc)
3901  {
3902  	struct rb_node *n;
3903  
3904  	for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
3905  		struct ceph_pg_pool_info *pi =
3906  		    rb_entry(n, struct ceph_pg_pool_info, node);
3907  
3908  		pi->was_full = __pool_full(pi);
3909  	}
3910  }
3911  
pool_cleared_full(struct ceph_osd_client * osdc,s64 pool_id)3912  static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
3913  {
3914  	struct ceph_pg_pool_info *pi;
3915  
3916  	pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
3917  	if (!pi)
3918  		return false;
3919  
3920  	return pi->was_full && !__pool_full(pi);
3921  }
3922  
3923  static enum calc_target_result
recalc_linger_target(struct ceph_osd_linger_request * lreq)3924  recalc_linger_target(struct ceph_osd_linger_request *lreq)
3925  {
3926  	struct ceph_osd_client *osdc = lreq->osdc;
3927  	enum calc_target_result ct_res;
3928  
3929  	ct_res = calc_target(osdc, &lreq->t, true);
3930  	if (ct_res == CALC_TARGET_NEED_RESEND) {
3931  		struct ceph_osd *osd;
3932  
3933  		osd = lookup_create_osd(osdc, lreq->t.osd, true);
3934  		if (osd != lreq->osd) {
3935  			unlink_linger(lreq->osd, lreq);
3936  			link_linger(osd, lreq);
3937  		}
3938  	}
3939  
3940  	return ct_res;
3941  }
3942  
3943  /*
3944   * Requeue requests whose mapping to an OSD has changed.
3945   */
scan_requests(struct ceph_osd * osd,bool force_resend,bool cleared_full,bool check_pool_cleared_full,struct rb_root * need_resend,struct list_head * need_resend_linger)3946  static void scan_requests(struct ceph_osd *osd,
3947  			  bool force_resend,
3948  			  bool cleared_full,
3949  			  bool check_pool_cleared_full,
3950  			  struct rb_root *need_resend,
3951  			  struct list_head *need_resend_linger)
3952  {
3953  	struct ceph_osd_client *osdc = osd->o_osdc;
3954  	struct rb_node *n;
3955  	bool force_resend_writes;
3956  
3957  	for (n = rb_first(&osd->o_linger_requests); n; ) {
3958  		struct ceph_osd_linger_request *lreq =
3959  		    rb_entry(n, struct ceph_osd_linger_request, node);
3960  		enum calc_target_result ct_res;
3961  
3962  		n = rb_next(n); /* recalc_linger_target() */
3963  
3964  		dout("%s lreq %p linger_id %llu\n", __func__, lreq,
3965  		     lreq->linger_id);
3966  		ct_res = recalc_linger_target(lreq);
3967  		switch (ct_res) {
3968  		case CALC_TARGET_NO_ACTION:
3969  			force_resend_writes = cleared_full ||
3970  			    (check_pool_cleared_full &&
3971  			     pool_cleared_full(osdc, lreq->t.base_oloc.pool));
3972  			if (!force_resend && !force_resend_writes)
3973  				break;
3974  
3975  			fallthrough;
3976  		case CALC_TARGET_NEED_RESEND:
3977  			cancel_linger_map_check(lreq);
3978  			/*
3979  			 * scan_requests() for the previous epoch(s)
3980  			 * may have already added it to the list, since
3981  			 * it's not unlinked here.
3982  			 */
3983  			if (list_empty(&lreq->scan_item))
3984  				list_add_tail(&lreq->scan_item, need_resend_linger);
3985  			break;
3986  		case CALC_TARGET_POOL_DNE:
3987  			list_del_init(&lreq->scan_item);
3988  			check_linger_pool_dne(lreq);
3989  			break;
3990  		}
3991  	}
3992  
3993  	for (n = rb_first(&osd->o_requests); n; ) {
3994  		struct ceph_osd_request *req =
3995  		    rb_entry(n, struct ceph_osd_request, r_node);
3996  		enum calc_target_result ct_res;
3997  
3998  		n = rb_next(n); /* unlink_request(), check_pool_dne() */
3999  
4000  		dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
4001  		ct_res = calc_target(osdc, &req->r_t, false);
4002  		switch (ct_res) {
4003  		case CALC_TARGET_NO_ACTION:
4004  			force_resend_writes = cleared_full ||
4005  			    (check_pool_cleared_full &&
4006  			     pool_cleared_full(osdc, req->r_t.base_oloc.pool));
4007  			if (!force_resend &&
4008  			    (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
4009  			     !force_resend_writes))
4010  				break;
4011  
4012  			fallthrough;
4013  		case CALC_TARGET_NEED_RESEND:
4014  			cancel_map_check(req);
4015  			unlink_request(osd, req);
4016  			insert_request(need_resend, req);
4017  			break;
4018  		case CALC_TARGET_POOL_DNE:
4019  			check_pool_dne(req);
4020  			break;
4021  		}
4022  	}
4023  }
4024  
handle_one_map(struct ceph_osd_client * osdc,void * p,void * end,bool incremental,struct rb_root * need_resend,struct list_head * need_resend_linger)4025  static int handle_one_map(struct ceph_osd_client *osdc,
4026  			  void *p, void *end, bool incremental,
4027  			  struct rb_root *need_resend,
4028  			  struct list_head *need_resend_linger)
4029  {
4030  	struct ceph_osdmap *newmap;
4031  	struct rb_node *n;
4032  	bool skipped_map = false;
4033  	bool was_full;
4034  
4035  	was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
4036  	set_pool_was_full(osdc);
4037  
4038  	if (incremental)
4039  		newmap = osdmap_apply_incremental(&p, end,
4040  						  ceph_msgr2(osdc->client),
4041  						  osdc->osdmap);
4042  	else
4043  		newmap = ceph_osdmap_decode(&p, end, ceph_msgr2(osdc->client));
4044  	if (IS_ERR(newmap))
4045  		return PTR_ERR(newmap);
4046  
4047  	if (newmap != osdc->osdmap) {
4048  		/*
4049  		 * Preserve ->was_full before destroying the old map.
4050  		 * For pools that weren't in the old map, ->was_full
4051  		 * should be false.
4052  		 */
4053  		for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
4054  			struct ceph_pg_pool_info *pi =
4055  			    rb_entry(n, struct ceph_pg_pool_info, node);
4056  			struct ceph_pg_pool_info *old_pi;
4057  
4058  			old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
4059  			if (old_pi)
4060  				pi->was_full = old_pi->was_full;
4061  			else
4062  				WARN_ON(pi->was_full);
4063  		}
4064  
4065  		if (osdc->osdmap->epoch &&
4066  		    osdc->osdmap->epoch + 1 < newmap->epoch) {
4067  			WARN_ON(incremental);
4068  			skipped_map = true;
4069  		}
4070  
4071  		ceph_osdmap_destroy(osdc->osdmap);
4072  		osdc->osdmap = newmap;
4073  	}
4074  
4075  	was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
4076  	scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
4077  		      need_resend, need_resend_linger);
4078  
4079  	for (n = rb_first(&osdc->osds); n; ) {
4080  		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4081  
4082  		n = rb_next(n); /* close_osd() */
4083  
4084  		scan_requests(osd, skipped_map, was_full, true, need_resend,
4085  			      need_resend_linger);
4086  		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
4087  		    memcmp(&osd->o_con.peer_addr,
4088  			   ceph_osd_addr(osdc->osdmap, osd->o_osd),
4089  			   sizeof(struct ceph_entity_addr)))
4090  			close_osd(osd);
4091  	}
4092  
4093  	return 0;
4094  }
4095  
kick_requests(struct ceph_osd_client * osdc,struct rb_root * need_resend,struct list_head * need_resend_linger)4096  static void kick_requests(struct ceph_osd_client *osdc,
4097  			  struct rb_root *need_resend,
4098  			  struct list_head *need_resend_linger)
4099  {
4100  	struct ceph_osd_linger_request *lreq, *nlreq;
4101  	enum calc_target_result ct_res;
4102  	struct rb_node *n;
4103  
4104  	/* make sure need_resend targets reflect latest map */
4105  	for (n = rb_first(need_resend); n; ) {
4106  		struct ceph_osd_request *req =
4107  		    rb_entry(n, struct ceph_osd_request, r_node);
4108  
4109  		n = rb_next(n);
4110  
4111  		if (req->r_t.epoch < osdc->osdmap->epoch) {
4112  			ct_res = calc_target(osdc, &req->r_t, false);
4113  			if (ct_res == CALC_TARGET_POOL_DNE) {
4114  				erase_request(need_resend, req);
4115  				check_pool_dne(req);
4116  			}
4117  		}
4118  	}
4119  
4120  	for (n = rb_first(need_resend); n; ) {
4121  		struct ceph_osd_request *req =
4122  		    rb_entry(n, struct ceph_osd_request, r_node);
4123  		struct ceph_osd *osd;
4124  
4125  		n = rb_next(n);
4126  		erase_request(need_resend, req); /* before link_request() */
4127  
4128  		osd = lookup_create_osd(osdc, req->r_t.osd, true);
4129  		link_request(osd, req);
4130  		if (!req->r_linger) {
4131  			if (!osd_homeless(osd) && !req->r_t.paused)
4132  				send_request(req);
4133  		} else {
4134  			cancel_linger_request(req);
4135  		}
4136  	}
4137  
4138  	list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
4139  		if (!osd_homeless(lreq->osd))
4140  			send_linger(lreq);
4141  
4142  		list_del_init(&lreq->scan_item);
4143  	}
4144  }
4145  
4146  /*
4147   * Process updated osd map.
4148   *
4149   * The message contains any number of incremental and full maps, normally
4150   * indicating some sort of topology change in the cluster.  Kick requests
4151   * off to different OSDs as needed.
4152   */
ceph_osdc_handle_map(struct ceph_osd_client * osdc,struct ceph_msg * msg)4153  void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
4154  {
4155  	void *p = msg->front.iov_base;
4156  	void *const end = p + msg->front.iov_len;
4157  	u32 nr_maps, maplen;
4158  	u32 epoch;
4159  	struct ceph_fsid fsid;
4160  	struct rb_root need_resend = RB_ROOT;
4161  	LIST_HEAD(need_resend_linger);
4162  	bool handled_incremental = false;
4163  	bool was_pauserd, was_pausewr;
4164  	bool pauserd, pausewr;
4165  	int err;
4166  
4167  	dout("%s have %u\n", __func__, osdc->osdmap->epoch);
4168  	down_write(&osdc->lock);
4169  
4170  	/* verify fsid */
4171  	ceph_decode_need(&p, end, sizeof(fsid), bad);
4172  	ceph_decode_copy(&p, &fsid, sizeof(fsid));
4173  	if (ceph_check_fsid(osdc->client, &fsid) < 0)
4174  		goto bad;
4175  
4176  	was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
4177  	was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
4178  		      ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
4179  		      have_pool_full(osdc);
4180  
4181  	/* incremental maps */
4182  	ceph_decode_32_safe(&p, end, nr_maps, bad);
4183  	dout(" %d inc maps\n", nr_maps);
4184  	while (nr_maps > 0) {
4185  		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4186  		epoch = ceph_decode_32(&p);
4187  		maplen = ceph_decode_32(&p);
4188  		ceph_decode_need(&p, end, maplen, bad);
4189  		if (osdc->osdmap->epoch &&
4190  		    osdc->osdmap->epoch + 1 == epoch) {
4191  			dout("applying incremental map %u len %d\n",
4192  			     epoch, maplen);
4193  			err = handle_one_map(osdc, p, p + maplen, true,
4194  					     &need_resend, &need_resend_linger);
4195  			if (err)
4196  				goto bad;
4197  			handled_incremental = true;
4198  		} else {
4199  			dout("ignoring incremental map %u len %d\n",
4200  			     epoch, maplen);
4201  		}
4202  		p += maplen;
4203  		nr_maps--;
4204  	}
4205  	if (handled_incremental)
4206  		goto done;
4207  
4208  	/* full maps */
4209  	ceph_decode_32_safe(&p, end, nr_maps, bad);
4210  	dout(" %d full maps\n", nr_maps);
4211  	while (nr_maps) {
4212  		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
4213  		epoch = ceph_decode_32(&p);
4214  		maplen = ceph_decode_32(&p);
4215  		ceph_decode_need(&p, end, maplen, bad);
4216  		if (nr_maps > 1) {
4217  			dout("skipping non-latest full map %u len %d\n",
4218  			     epoch, maplen);
4219  		} else if (osdc->osdmap->epoch >= epoch) {
4220  			dout("skipping full map %u len %d, "
4221  			     "older than our %u\n", epoch, maplen,
4222  			     osdc->osdmap->epoch);
4223  		} else {
4224  			dout("taking full map %u len %d\n", epoch, maplen);
4225  			err = handle_one_map(osdc, p, p + maplen, false,
4226  					     &need_resend, &need_resend_linger);
4227  			if (err)
4228  				goto bad;
4229  		}
4230  		p += maplen;
4231  		nr_maps--;
4232  	}
4233  
4234  done:
4235  	/*
4236  	 * subscribe to subsequent osdmap updates if full to ensure
4237  	 * we find out when we are no longer full and stop returning
4238  	 * ENOSPC.
4239  	 */
4240  	pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
4241  	pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
4242  		  ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
4243  		  have_pool_full(osdc);
4244  	if (was_pauserd || was_pausewr || pauserd || pausewr ||
4245  	    osdc->osdmap->epoch < osdc->epoch_barrier)
4246  		maybe_request_map(osdc);
4247  
4248  	kick_requests(osdc, &need_resend, &need_resend_linger);
4249  
4250  	ceph_osdc_abort_on_full(osdc);
4251  	ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
4252  			  osdc->osdmap->epoch);
4253  	up_write(&osdc->lock);
4254  	wake_up_all(&osdc->client->auth_wq);
4255  	return;
4256  
4257  bad:
4258  	pr_err("osdc handle_map corrupt msg\n");
4259  	ceph_msg_dump(msg);
4260  	up_write(&osdc->lock);
4261  }
4262  
4263  /*
4264   * Resubmit requests pending on the given osd.
4265   */
kick_osd_requests(struct ceph_osd * osd)4266  static void kick_osd_requests(struct ceph_osd *osd)
4267  {
4268  	struct rb_node *n;
4269  
4270  	clear_backoffs(osd);
4271  
4272  	for (n = rb_first(&osd->o_requests); n; ) {
4273  		struct ceph_osd_request *req =
4274  		    rb_entry(n, struct ceph_osd_request, r_node);
4275  
4276  		n = rb_next(n); /* cancel_linger_request() */
4277  
4278  		if (!req->r_linger) {
4279  			if (!req->r_t.paused)
4280  				send_request(req);
4281  		} else {
4282  			cancel_linger_request(req);
4283  		}
4284  	}
4285  	for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
4286  		struct ceph_osd_linger_request *lreq =
4287  		    rb_entry(n, struct ceph_osd_linger_request, node);
4288  
4289  		send_linger(lreq);
4290  	}
4291  }
4292  
4293  /*
4294   * If the osd connection drops, we need to resubmit all requests.
4295   */
osd_fault(struct ceph_connection * con)4296  static void osd_fault(struct ceph_connection *con)
4297  {
4298  	struct ceph_osd *osd = con->private;
4299  	struct ceph_osd_client *osdc = osd->o_osdc;
4300  
4301  	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
4302  
4303  	down_write(&osdc->lock);
4304  	if (!osd_registered(osd)) {
4305  		dout("%s osd%d unknown\n", __func__, osd->o_osd);
4306  		goto out_unlock;
4307  	}
4308  
4309  	if (!reopen_osd(osd))
4310  		kick_osd_requests(osd);
4311  	maybe_request_map(osdc);
4312  
4313  out_unlock:
4314  	up_write(&osdc->lock);
4315  }
4316  
4317  struct MOSDBackoff {
4318  	struct ceph_spg spgid;
4319  	u32 map_epoch;
4320  	u8 op;
4321  	u64 id;
4322  	struct ceph_hobject_id *begin;
4323  	struct ceph_hobject_id *end;
4324  };
4325  
decode_MOSDBackoff(const struct ceph_msg * msg,struct MOSDBackoff * m)4326  static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
4327  {
4328  	void *p = msg->front.iov_base;
4329  	void *const end = p + msg->front.iov_len;
4330  	u8 struct_v;
4331  	u32 struct_len;
4332  	int ret;
4333  
4334  	ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
4335  	if (ret)
4336  		return ret;
4337  
4338  	ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
4339  	if (ret)
4340  		return ret;
4341  
4342  	ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
4343  	ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
4344  	ceph_decode_8_safe(&p, end, m->op, e_inval);
4345  	ceph_decode_64_safe(&p, end, m->id, e_inval);
4346  
4347  	m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
4348  	if (!m->begin)
4349  		return -ENOMEM;
4350  
4351  	ret = decode_hoid(&p, end, m->begin);
4352  	if (ret) {
4353  		free_hoid(m->begin);
4354  		return ret;
4355  	}
4356  
4357  	m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
4358  	if (!m->end) {
4359  		free_hoid(m->begin);
4360  		return -ENOMEM;
4361  	}
4362  
4363  	ret = decode_hoid(&p, end, m->end);
4364  	if (ret) {
4365  		free_hoid(m->begin);
4366  		free_hoid(m->end);
4367  		return ret;
4368  	}
4369  
4370  	return 0;
4371  
4372  e_inval:
4373  	return -EINVAL;
4374  }
4375  
create_backoff_message(const struct ceph_osd_backoff * backoff,u32 map_epoch)4376  static struct ceph_msg *create_backoff_message(
4377  				const struct ceph_osd_backoff *backoff,
4378  				u32 map_epoch)
4379  {
4380  	struct ceph_msg *msg;
4381  	void *p, *end;
4382  	int msg_size;
4383  
4384  	msg_size = CEPH_ENCODING_START_BLK_LEN +
4385  			CEPH_PGID_ENCODING_LEN + 1; /* spgid */
4386  	msg_size += 4 + 1 + 8; /* map_epoch, op, id */
4387  	msg_size += CEPH_ENCODING_START_BLK_LEN +
4388  			hoid_encoding_size(backoff->begin);
4389  	msg_size += CEPH_ENCODING_START_BLK_LEN +
4390  			hoid_encoding_size(backoff->end);
4391  
4392  	msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
4393  	if (!msg)
4394  		return NULL;
4395  
4396  	p = msg->front.iov_base;
4397  	end = p + msg->front_alloc_len;
4398  
4399  	encode_spgid(&p, &backoff->spgid);
4400  	ceph_encode_32(&p, map_epoch);
4401  	ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
4402  	ceph_encode_64(&p, backoff->id);
4403  	encode_hoid(&p, end, backoff->begin);
4404  	encode_hoid(&p, end, backoff->end);
4405  	BUG_ON(p != end);
4406  
4407  	msg->front.iov_len = p - msg->front.iov_base;
4408  	msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
4409  	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
4410  
4411  	return msg;
4412  }
4413  
handle_backoff_block(struct ceph_osd * osd,struct MOSDBackoff * m)4414  static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
4415  {
4416  	struct ceph_spg_mapping *spg;
4417  	struct ceph_osd_backoff *backoff;
4418  	struct ceph_msg *msg;
4419  
4420  	dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4421  	     m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4422  
4423  	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
4424  	if (!spg) {
4425  		spg = alloc_spg_mapping();
4426  		if (!spg) {
4427  			pr_err("%s failed to allocate spg\n", __func__);
4428  			return;
4429  		}
4430  		spg->spgid = m->spgid; /* struct */
4431  		insert_spg_mapping(&osd->o_backoff_mappings, spg);
4432  	}
4433  
4434  	backoff = alloc_backoff();
4435  	if (!backoff) {
4436  		pr_err("%s failed to allocate backoff\n", __func__);
4437  		return;
4438  	}
4439  	backoff->spgid = m->spgid; /* struct */
4440  	backoff->id = m->id;
4441  	backoff->begin = m->begin;
4442  	m->begin = NULL; /* backoff now owns this */
4443  	backoff->end = m->end;
4444  	m->end = NULL;   /* ditto */
4445  
4446  	insert_backoff(&spg->backoffs, backoff);
4447  	insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4448  
4449  	/*
4450  	 * Ack with original backoff's epoch so that the OSD can
4451  	 * discard this if there was a PG split.
4452  	 */
4453  	msg = create_backoff_message(backoff, m->map_epoch);
4454  	if (!msg) {
4455  		pr_err("%s failed to allocate msg\n", __func__);
4456  		return;
4457  	}
4458  	ceph_con_send(&osd->o_con, msg);
4459  }
4460  
target_contained_by(const struct ceph_osd_request_target * t,const struct ceph_hobject_id * begin,const struct ceph_hobject_id * end)4461  static bool target_contained_by(const struct ceph_osd_request_target *t,
4462  				const struct ceph_hobject_id *begin,
4463  				const struct ceph_hobject_id *end)
4464  {
4465  	struct ceph_hobject_id hoid;
4466  	int cmp;
4467  
4468  	hoid_fill_from_target(&hoid, t);
4469  	cmp = hoid_compare(&hoid, begin);
4470  	return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
4471  }
4472  
handle_backoff_unblock(struct ceph_osd * osd,const struct MOSDBackoff * m)4473  static void handle_backoff_unblock(struct ceph_osd *osd,
4474  				   const struct MOSDBackoff *m)
4475  {
4476  	struct ceph_spg_mapping *spg;
4477  	struct ceph_osd_backoff *backoff;
4478  	struct rb_node *n;
4479  
4480  	dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4481  	     m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4482  
4483  	backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
4484  	if (!backoff) {
4485  		pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
4486  		       __func__, osd->o_osd, m->spgid.pgid.pool,
4487  		       m->spgid.pgid.seed, m->spgid.shard, m->id);
4488  		return;
4489  	}
4490  
4491  	if (hoid_compare(backoff->begin, m->begin) &&
4492  	    hoid_compare(backoff->end, m->end)) {
4493  		pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
4494  		       __func__, osd->o_osd, m->spgid.pgid.pool,
4495  		       m->spgid.pgid.seed, m->spgid.shard, m->id);
4496  		/* unblock it anyway... */
4497  	}
4498  
4499  	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
4500  	BUG_ON(!spg);
4501  
4502  	erase_backoff(&spg->backoffs, backoff);
4503  	erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4504  	free_backoff(backoff);
4505  
4506  	if (RB_EMPTY_ROOT(&spg->backoffs)) {
4507  		erase_spg_mapping(&osd->o_backoff_mappings, spg);
4508  		free_spg_mapping(spg);
4509  	}
4510  
4511  	for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
4512  		struct ceph_osd_request *req =
4513  		    rb_entry(n, struct ceph_osd_request, r_node);
4514  
4515  		if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
4516  			/*
4517  			 * Match against @m, not @backoff -- the PG may
4518  			 * have split on the OSD.
4519  			 */
4520  			if (target_contained_by(&req->r_t, m->begin, m->end)) {
4521  				/*
4522  				 * If no other installed backoff applies,
4523  				 * resend.
4524  				 */
4525  				send_request(req);
4526  			}
4527  		}
4528  	}
4529  }
4530  
handle_backoff(struct ceph_osd * osd,struct ceph_msg * msg)4531  static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
4532  {
4533  	struct ceph_osd_client *osdc = osd->o_osdc;
4534  	struct MOSDBackoff m;
4535  	int ret;
4536  
4537  	down_read(&osdc->lock);
4538  	if (!osd_registered(osd)) {
4539  		dout("%s osd%d unknown\n", __func__, osd->o_osd);
4540  		up_read(&osdc->lock);
4541  		return;
4542  	}
4543  	WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
4544  
4545  	mutex_lock(&osd->lock);
4546  	ret = decode_MOSDBackoff(msg, &m);
4547  	if (ret) {
4548  		pr_err("failed to decode MOSDBackoff: %d\n", ret);
4549  		ceph_msg_dump(msg);
4550  		goto out_unlock;
4551  	}
4552  
4553  	switch (m.op) {
4554  	case CEPH_OSD_BACKOFF_OP_BLOCK:
4555  		handle_backoff_block(osd, &m);
4556  		break;
4557  	case CEPH_OSD_BACKOFF_OP_UNBLOCK:
4558  		handle_backoff_unblock(osd, &m);
4559  		break;
4560  	default:
4561  		pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
4562  	}
4563  
4564  	free_hoid(m.begin);
4565  	free_hoid(m.end);
4566  
4567  out_unlock:
4568  	mutex_unlock(&osd->lock);
4569  	up_read(&osdc->lock);
4570  }
4571  
4572  /*
4573   * Process osd watch notifications
4574   */
handle_watch_notify(struct ceph_osd_client * osdc,struct ceph_msg * msg)4575  static void handle_watch_notify(struct ceph_osd_client *osdc,
4576  				struct ceph_msg *msg)
4577  {
4578  	void *p = msg->front.iov_base;
4579  	void *const end = p + msg->front.iov_len;
4580  	struct ceph_osd_linger_request *lreq;
4581  	struct linger_work *lwork;
4582  	u8 proto_ver, opcode;
4583  	u64 cookie, notify_id;
4584  	u64 notifier_id = 0;
4585  	s32 return_code = 0;
4586  	void *payload = NULL;
4587  	u32 payload_len = 0;
4588  
4589  	ceph_decode_8_safe(&p, end, proto_ver, bad);
4590  	ceph_decode_8_safe(&p, end, opcode, bad);
4591  	ceph_decode_64_safe(&p, end, cookie, bad);
4592  	p += 8; /* skip ver */
4593  	ceph_decode_64_safe(&p, end, notify_id, bad);
4594  
4595  	if (proto_ver >= 1) {
4596  		ceph_decode_32_safe(&p, end, payload_len, bad);
4597  		ceph_decode_need(&p, end, payload_len, bad);
4598  		payload = p;
4599  		p += payload_len;
4600  	}
4601  
4602  	if (le16_to_cpu(msg->hdr.version) >= 2)
4603  		ceph_decode_32_safe(&p, end, return_code, bad);
4604  
4605  	if (le16_to_cpu(msg->hdr.version) >= 3)
4606  		ceph_decode_64_safe(&p, end, notifier_id, bad);
4607  
4608  	down_read(&osdc->lock);
4609  	lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
4610  	if (!lreq) {
4611  		dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
4612  		     cookie);
4613  		goto out_unlock_osdc;
4614  	}
4615  
4616  	mutex_lock(&lreq->lock);
4617  	dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
4618  	     opcode, cookie, lreq, lreq->is_watch);
4619  	if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
4620  		if (!lreq->last_error) {
4621  			lreq->last_error = -ENOTCONN;
4622  			queue_watch_error(lreq);
4623  		}
4624  	} else if (!lreq->is_watch) {
4625  		/* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
4626  		if (lreq->notify_id && lreq->notify_id != notify_id) {
4627  			dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
4628  			     lreq->notify_id, notify_id);
4629  		} else if (!completion_done(&lreq->notify_finish_wait)) {
4630  			struct ceph_msg_data *data =
4631  			    msg->num_data_items ? &msg->data[0] : NULL;
4632  
4633  			if (data) {
4634  				if (lreq->preply_pages) {
4635  					WARN_ON(data->type !=
4636  							CEPH_MSG_DATA_PAGES);
4637  					*lreq->preply_pages = data->pages;
4638  					*lreq->preply_len = data->length;
4639  					data->own_pages = false;
4640  				}
4641  			}
4642  			lreq->notify_finish_error = return_code;
4643  			complete_all(&lreq->notify_finish_wait);
4644  		}
4645  	} else {
4646  		/* CEPH_WATCH_EVENT_NOTIFY */
4647  		lwork = lwork_alloc(lreq, do_watch_notify);
4648  		if (!lwork) {
4649  			pr_err("failed to allocate notify-lwork\n");
4650  			goto out_unlock_lreq;
4651  		}
4652  
4653  		lwork->notify.notify_id = notify_id;
4654  		lwork->notify.notifier_id = notifier_id;
4655  		lwork->notify.payload = payload;
4656  		lwork->notify.payload_len = payload_len;
4657  		lwork->notify.msg = ceph_msg_get(msg);
4658  		lwork_queue(lwork);
4659  	}
4660  
4661  out_unlock_lreq:
4662  	mutex_unlock(&lreq->lock);
4663  out_unlock_osdc:
4664  	up_read(&osdc->lock);
4665  	return;
4666  
4667  bad:
4668  	pr_err("osdc handle_watch_notify corrupt msg\n");
4669  }
4670  
4671  /*
4672   * Register request, send initial attempt.
4673   */
ceph_osdc_start_request(struct ceph_osd_client * osdc,struct ceph_osd_request * req)4674  void ceph_osdc_start_request(struct ceph_osd_client *osdc,
4675  			     struct ceph_osd_request *req)
4676  {
4677  	down_read(&osdc->lock);
4678  	submit_request(req, false);
4679  	up_read(&osdc->lock);
4680  }
4681  EXPORT_SYMBOL(ceph_osdc_start_request);
4682  
4683  /*
4684   * Unregister request.  If @req was registered, it isn't completed:
4685   * r_result isn't set and __complete_request() isn't invoked.
4686   *
4687   * If @req wasn't registered, this call may have raced with
4688   * handle_reply(), in which case r_result would already be set and
4689   * __complete_request() would be getting invoked, possibly even
4690   * concurrently with this call.
4691   */
ceph_osdc_cancel_request(struct ceph_osd_request * req)4692  void ceph_osdc_cancel_request(struct ceph_osd_request *req)
4693  {
4694  	struct ceph_osd_client *osdc = req->r_osdc;
4695  
4696  	down_write(&osdc->lock);
4697  	if (req->r_osd)
4698  		cancel_request(req);
4699  	up_write(&osdc->lock);
4700  }
4701  EXPORT_SYMBOL(ceph_osdc_cancel_request);
4702  
4703  /*
4704   * @timeout: in jiffies, 0 means "wait forever"
4705   */
wait_request_timeout(struct ceph_osd_request * req,unsigned long timeout)4706  static int wait_request_timeout(struct ceph_osd_request *req,
4707  				unsigned long timeout)
4708  {
4709  	long left;
4710  
4711  	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
4712  	left = wait_for_completion_killable_timeout(&req->r_completion,
4713  						ceph_timeout_jiffies(timeout));
4714  	if (left <= 0) {
4715  		left = left ?: -ETIMEDOUT;
4716  		ceph_osdc_cancel_request(req);
4717  	} else {
4718  		left = req->r_result; /* completed */
4719  	}
4720  
4721  	return left;
4722  }
4723  
4724  /*
4725   * wait for a request to complete
4726   */
ceph_osdc_wait_request(struct ceph_osd_client * osdc,struct ceph_osd_request * req)4727  int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
4728  			   struct ceph_osd_request *req)
4729  {
4730  	return wait_request_timeout(req, 0);
4731  }
4732  EXPORT_SYMBOL(ceph_osdc_wait_request);
4733  
4734  /*
4735   * sync - wait for all in-flight requests to flush.  avoid starvation.
4736   */
ceph_osdc_sync(struct ceph_osd_client * osdc)4737  void ceph_osdc_sync(struct ceph_osd_client *osdc)
4738  {
4739  	struct rb_node *n, *p;
4740  	u64 last_tid = atomic64_read(&osdc->last_tid);
4741  
4742  again:
4743  	down_read(&osdc->lock);
4744  	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
4745  		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4746  
4747  		mutex_lock(&osd->lock);
4748  		for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
4749  			struct ceph_osd_request *req =
4750  			    rb_entry(p, struct ceph_osd_request, r_node);
4751  
4752  			if (req->r_tid > last_tid)
4753  				break;
4754  
4755  			if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
4756  				continue;
4757  
4758  			ceph_osdc_get_request(req);
4759  			mutex_unlock(&osd->lock);
4760  			up_read(&osdc->lock);
4761  			dout("%s waiting on req %p tid %llu last_tid %llu\n",
4762  			     __func__, req, req->r_tid, last_tid);
4763  			wait_for_completion(&req->r_completion);
4764  			ceph_osdc_put_request(req);
4765  			goto again;
4766  		}
4767  
4768  		mutex_unlock(&osd->lock);
4769  	}
4770  
4771  	up_read(&osdc->lock);
4772  	dout("%s done last_tid %llu\n", __func__, last_tid);
4773  }
4774  EXPORT_SYMBOL(ceph_osdc_sync);
4775  
4776  /*
4777   * Returns a handle, caller owns a ref.
4778   */
4779  struct ceph_osd_linger_request *
ceph_osdc_watch(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,rados_watchcb2_t wcb,rados_watcherrcb_t errcb,void * data)4780  ceph_osdc_watch(struct ceph_osd_client *osdc,
4781  		struct ceph_object_id *oid,
4782  		struct ceph_object_locator *oloc,
4783  		rados_watchcb2_t wcb,
4784  		rados_watcherrcb_t errcb,
4785  		void *data)
4786  {
4787  	struct ceph_osd_linger_request *lreq;
4788  	int ret;
4789  
4790  	lreq = linger_alloc(osdc);
4791  	if (!lreq)
4792  		return ERR_PTR(-ENOMEM);
4793  
4794  	lreq->is_watch = true;
4795  	lreq->wcb = wcb;
4796  	lreq->errcb = errcb;
4797  	lreq->data = data;
4798  	lreq->watch_valid_thru = jiffies;
4799  
4800  	ceph_oid_copy(&lreq->t.base_oid, oid);
4801  	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4802  	lreq->t.flags = CEPH_OSD_FLAG_WRITE;
4803  	ktime_get_real_ts64(&lreq->mtime);
4804  
4805  	linger_submit(lreq);
4806  	ret = linger_reg_commit_wait(lreq);
4807  	if (ret) {
4808  		linger_cancel(lreq);
4809  		goto err_put_lreq;
4810  	}
4811  
4812  	return lreq;
4813  
4814  err_put_lreq:
4815  	linger_put(lreq);
4816  	return ERR_PTR(ret);
4817  }
4818  EXPORT_SYMBOL(ceph_osdc_watch);
4819  
4820  /*
4821   * Releases a ref.
4822   *
4823   * Times out after mount_timeout to preserve rbd unmap behaviour
4824   * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
4825   * with mount_timeout").
4826   */
ceph_osdc_unwatch(struct ceph_osd_client * osdc,struct ceph_osd_linger_request * lreq)4827  int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
4828  		      struct ceph_osd_linger_request *lreq)
4829  {
4830  	struct ceph_options *opts = osdc->client->options;
4831  	struct ceph_osd_request *req;
4832  	int ret;
4833  
4834  	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4835  	if (!req)
4836  		return -ENOMEM;
4837  
4838  	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4839  	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4840  	req->r_flags = CEPH_OSD_FLAG_WRITE;
4841  	ktime_get_real_ts64(&req->r_mtime);
4842  	osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_UNWATCH,
4843  			      lreq->linger_id, 0);
4844  
4845  	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4846  	if (ret)
4847  		goto out_put_req;
4848  
4849  	ceph_osdc_start_request(osdc, req);
4850  	linger_cancel(lreq);
4851  	linger_put(lreq);
4852  	ret = wait_request_timeout(req, opts->mount_timeout);
4853  
4854  out_put_req:
4855  	ceph_osdc_put_request(req);
4856  	return ret;
4857  }
4858  EXPORT_SYMBOL(ceph_osdc_unwatch);
4859  
osd_req_op_notify_ack_init(struct ceph_osd_request * req,int which,u64 notify_id,u64 cookie,void * payload,u32 payload_len)4860  static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
4861  				      u64 notify_id, u64 cookie, void *payload,
4862  				      u32 payload_len)
4863  {
4864  	struct ceph_osd_req_op *op;
4865  	struct ceph_pagelist *pl;
4866  	int ret;
4867  
4868  	op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
4869  
4870  	pl = ceph_pagelist_alloc(GFP_NOIO);
4871  	if (!pl)
4872  		return -ENOMEM;
4873  
4874  	ret = ceph_pagelist_encode_64(pl, notify_id);
4875  	ret |= ceph_pagelist_encode_64(pl, cookie);
4876  	if (payload) {
4877  		ret |= ceph_pagelist_encode_32(pl, payload_len);
4878  		ret |= ceph_pagelist_append(pl, payload, payload_len);
4879  	} else {
4880  		ret |= ceph_pagelist_encode_32(pl, 0);
4881  	}
4882  	if (ret) {
4883  		ceph_pagelist_release(pl);
4884  		return -ENOMEM;
4885  	}
4886  
4887  	ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
4888  	op->indata_len = pl->length;
4889  	return 0;
4890  }
4891  
ceph_osdc_notify_ack(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,u64 notify_id,u64 cookie,void * payload,u32 payload_len)4892  int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
4893  			 struct ceph_object_id *oid,
4894  			 struct ceph_object_locator *oloc,
4895  			 u64 notify_id,
4896  			 u64 cookie,
4897  			 void *payload,
4898  			 u32 payload_len)
4899  {
4900  	struct ceph_osd_request *req;
4901  	int ret;
4902  
4903  	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4904  	if (!req)
4905  		return -ENOMEM;
4906  
4907  	ceph_oid_copy(&req->r_base_oid, oid);
4908  	ceph_oloc_copy(&req->r_base_oloc, oloc);
4909  	req->r_flags = CEPH_OSD_FLAG_READ;
4910  
4911  	ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4912  					 payload_len);
4913  	if (ret)
4914  		goto out_put_req;
4915  
4916  	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4917  	if (ret)
4918  		goto out_put_req;
4919  
4920  	ceph_osdc_start_request(osdc, req);
4921  	ret = ceph_osdc_wait_request(osdc, req);
4922  
4923  out_put_req:
4924  	ceph_osdc_put_request(req);
4925  	return ret;
4926  }
4927  EXPORT_SYMBOL(ceph_osdc_notify_ack);
4928  
4929  /*
4930   * @timeout: in seconds
4931   *
4932   * @preply_{pages,len} are initialized both on success and error.
4933   * The caller is responsible for:
4934   *
4935   *     ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
4936   */
ceph_osdc_notify(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,void * payload,u32 payload_len,u32 timeout,struct page *** preply_pages,size_t * preply_len)4937  int ceph_osdc_notify(struct ceph_osd_client *osdc,
4938  		     struct ceph_object_id *oid,
4939  		     struct ceph_object_locator *oloc,
4940  		     void *payload,
4941  		     u32 payload_len,
4942  		     u32 timeout,
4943  		     struct page ***preply_pages,
4944  		     size_t *preply_len)
4945  {
4946  	struct ceph_osd_linger_request *lreq;
4947  	int ret;
4948  
4949  	WARN_ON(!timeout);
4950  	if (preply_pages) {
4951  		*preply_pages = NULL;
4952  		*preply_len = 0;
4953  	}
4954  
4955  	lreq = linger_alloc(osdc);
4956  	if (!lreq)
4957  		return -ENOMEM;
4958  
4959  	lreq->request_pl = ceph_pagelist_alloc(GFP_NOIO);
4960  	if (!lreq->request_pl) {
4961  		ret = -ENOMEM;
4962  		goto out_put_lreq;
4963  	}
4964  
4965  	ret = ceph_pagelist_encode_32(lreq->request_pl, 1); /* prot_ver */
4966  	ret |= ceph_pagelist_encode_32(lreq->request_pl, timeout);
4967  	ret |= ceph_pagelist_encode_32(lreq->request_pl, payload_len);
4968  	ret |= ceph_pagelist_append(lreq->request_pl, payload, payload_len);
4969  	if (ret) {
4970  		ret = -ENOMEM;
4971  		goto out_put_lreq;
4972  	}
4973  
4974  	/* for notify_id */
4975  	lreq->notify_id_pages = ceph_alloc_page_vector(1, GFP_NOIO);
4976  	if (IS_ERR(lreq->notify_id_pages)) {
4977  		ret = PTR_ERR(lreq->notify_id_pages);
4978  		lreq->notify_id_pages = NULL;
4979  		goto out_put_lreq;
4980  	}
4981  
4982  	lreq->preply_pages = preply_pages;
4983  	lreq->preply_len = preply_len;
4984  
4985  	ceph_oid_copy(&lreq->t.base_oid, oid);
4986  	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4987  	lreq->t.flags = CEPH_OSD_FLAG_READ;
4988  
4989  	linger_submit(lreq);
4990  	ret = linger_reg_commit_wait(lreq);
4991  	if (!ret)
4992  		ret = linger_notify_finish_wait(lreq,
4993  				 msecs_to_jiffies(2 * timeout * MSEC_PER_SEC));
4994  	else
4995  		dout("lreq %p failed to initiate notify %d\n", lreq, ret);
4996  
4997  	linger_cancel(lreq);
4998  out_put_lreq:
4999  	linger_put(lreq);
5000  	return ret;
5001  }
5002  EXPORT_SYMBOL(ceph_osdc_notify);
5003  
5004  /*
5005   * Return the number of milliseconds since the watch was last
5006   * confirmed, or an error.  If there is an error, the watch is no
5007   * longer valid, and should be destroyed with ceph_osdc_unwatch().
5008   */
ceph_osdc_watch_check(struct ceph_osd_client * osdc,struct ceph_osd_linger_request * lreq)5009  int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
5010  			  struct ceph_osd_linger_request *lreq)
5011  {
5012  	unsigned long stamp, age;
5013  	int ret;
5014  
5015  	down_read(&osdc->lock);
5016  	mutex_lock(&lreq->lock);
5017  	stamp = lreq->watch_valid_thru;
5018  	if (!list_empty(&lreq->pending_lworks)) {
5019  		struct linger_work *lwork =
5020  		    list_first_entry(&lreq->pending_lworks,
5021  				     struct linger_work,
5022  				     pending_item);
5023  
5024  		if (time_before(lwork->queued_stamp, stamp))
5025  			stamp = lwork->queued_stamp;
5026  	}
5027  	age = jiffies - stamp;
5028  	dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__,
5029  	     lreq, lreq->linger_id, age, lreq->last_error);
5030  	/* we are truncating to msecs, so return a safe upper bound */
5031  	ret = lreq->last_error ?: 1 + jiffies_to_msecs(age);
5032  
5033  	mutex_unlock(&lreq->lock);
5034  	up_read(&osdc->lock);
5035  	return ret;
5036  }
5037  
decode_watcher(void ** p,void * end,struct ceph_watch_item * item)5038  static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
5039  {
5040  	u8 struct_v;
5041  	u32 struct_len;
5042  	int ret;
5043  
5044  	ret = ceph_start_decoding(p, end, 2, "watch_item_t",
5045  				  &struct_v, &struct_len);
5046  	if (ret)
5047  		goto bad;
5048  
5049  	ret = -EINVAL;
5050  	ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad);
5051  	ceph_decode_64_safe(p, end, item->cookie, bad);
5052  	ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */
5053  
5054  	if (struct_v >= 2) {
5055  		ret = ceph_decode_entity_addr(p, end, &item->addr);
5056  		if (ret)
5057  			goto bad;
5058  	} else {
5059  		ret = 0;
5060  	}
5061  
5062  	dout("%s %s%llu cookie %llu addr %s\n", __func__,
5063  	     ENTITY_NAME(item->name), item->cookie,
5064  	     ceph_pr_addr(&item->addr));
5065  bad:
5066  	return ret;
5067  }
5068  
decode_watchers(void ** p,void * end,struct ceph_watch_item ** watchers,u32 * num_watchers)5069  static int decode_watchers(void **p, void *end,
5070  			   struct ceph_watch_item **watchers,
5071  			   u32 *num_watchers)
5072  {
5073  	u8 struct_v;
5074  	u32 struct_len;
5075  	int i;
5076  	int ret;
5077  
5078  	ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
5079  				  &struct_v, &struct_len);
5080  	if (ret)
5081  		return ret;
5082  
5083  	*num_watchers = ceph_decode_32(p);
5084  	*watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
5085  	if (!*watchers)
5086  		return -ENOMEM;
5087  
5088  	for (i = 0; i < *num_watchers; i++) {
5089  		ret = decode_watcher(p, end, *watchers + i);
5090  		if (ret) {
5091  			kfree(*watchers);
5092  			return ret;
5093  		}
5094  	}
5095  
5096  	return 0;
5097  }
5098  
5099  /*
5100   * On success, the caller is responsible for:
5101   *
5102   *     kfree(watchers);
5103   */
ceph_osdc_list_watchers(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,struct ceph_watch_item ** watchers,u32 * num_watchers)5104  int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
5105  			    struct ceph_object_id *oid,
5106  			    struct ceph_object_locator *oloc,
5107  			    struct ceph_watch_item **watchers,
5108  			    u32 *num_watchers)
5109  {
5110  	struct ceph_osd_request *req;
5111  	struct page **pages;
5112  	int ret;
5113  
5114  	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
5115  	if (!req)
5116  		return -ENOMEM;
5117  
5118  	ceph_oid_copy(&req->r_base_oid, oid);
5119  	ceph_oloc_copy(&req->r_base_oloc, oloc);
5120  	req->r_flags = CEPH_OSD_FLAG_READ;
5121  
5122  	pages = ceph_alloc_page_vector(1, GFP_NOIO);
5123  	if (IS_ERR(pages)) {
5124  		ret = PTR_ERR(pages);
5125  		goto out_put_req;
5126  	}
5127  
5128  	osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
5129  	ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
5130  						 response_data),
5131  				 pages, PAGE_SIZE, 0, false, true);
5132  
5133  	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5134  	if (ret)
5135  		goto out_put_req;
5136  
5137  	ceph_osdc_start_request(osdc, req);
5138  	ret = ceph_osdc_wait_request(osdc, req);
5139  	if (ret >= 0) {
5140  		void *p = page_address(pages[0]);
5141  		void *const end = p + req->r_ops[0].outdata_len;
5142  
5143  		ret = decode_watchers(&p, end, watchers, num_watchers);
5144  	}
5145  
5146  out_put_req:
5147  	ceph_osdc_put_request(req);
5148  	return ret;
5149  }
5150  EXPORT_SYMBOL(ceph_osdc_list_watchers);
5151  
5152  /*
5153   * Call all pending notify callbacks - for use after a watch is
5154   * unregistered, to make sure no more callbacks for it will be invoked
5155   */
ceph_osdc_flush_notifies(struct ceph_osd_client * osdc)5156  void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
5157  {
5158  	dout("%s osdc %p\n", __func__, osdc);
5159  	flush_workqueue(osdc->notify_wq);
5160  }
5161  EXPORT_SYMBOL(ceph_osdc_flush_notifies);
5162  
ceph_osdc_maybe_request_map(struct ceph_osd_client * osdc)5163  void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
5164  {
5165  	down_read(&osdc->lock);
5166  	maybe_request_map(osdc);
5167  	up_read(&osdc->lock);
5168  }
5169  EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
5170  
5171  /*
5172   * Execute an OSD class method on an object.
5173   *
5174   * @flags: CEPH_OSD_FLAG_*
5175   * @resp_len: in/out param for reply length
5176   */
ceph_osdc_call(struct ceph_osd_client * osdc,struct ceph_object_id * oid,struct ceph_object_locator * oloc,const char * class,const char * method,unsigned int flags,struct page * req_page,size_t req_len,struct page ** resp_pages,size_t * resp_len)5177  int ceph_osdc_call(struct ceph_osd_client *osdc,
5178  		   struct ceph_object_id *oid,
5179  		   struct ceph_object_locator *oloc,
5180  		   const char *class, const char *method,
5181  		   unsigned int flags,
5182  		   struct page *req_page, size_t req_len,
5183  		   struct page **resp_pages, size_t *resp_len)
5184  {
5185  	struct ceph_osd_request *req;
5186  	int ret;
5187  
5188  	if (req_len > PAGE_SIZE)
5189  		return -E2BIG;
5190  
5191  	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
5192  	if (!req)
5193  		return -ENOMEM;
5194  
5195  	ceph_oid_copy(&req->r_base_oid, oid);
5196  	ceph_oloc_copy(&req->r_base_oloc, oloc);
5197  	req->r_flags = flags;
5198  
5199  	ret = osd_req_op_cls_init(req, 0, class, method);
5200  	if (ret)
5201  		goto out_put_req;
5202  
5203  	if (req_page)
5204  		osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
5205  						  0, false, false);
5206  	if (resp_pages)
5207  		osd_req_op_cls_response_data_pages(req, 0, resp_pages,
5208  						   *resp_len, 0, false, false);
5209  
5210  	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
5211  	if (ret)
5212  		goto out_put_req;
5213  
5214  	ceph_osdc_start_request(osdc, req);
5215  	ret = ceph_osdc_wait_request(osdc, req);
5216  	if (ret >= 0) {
5217  		ret = req->r_ops[0].rval;
5218  		if (resp_pages)
5219  			*resp_len = req->r_ops[0].outdata_len;
5220  	}
5221  
5222  out_put_req:
5223  	ceph_osdc_put_request(req);
5224  	return ret;
5225  }
5226  EXPORT_SYMBOL(ceph_osdc_call);
5227  
5228  /*
5229   * reset all osd connections
5230   */
ceph_osdc_reopen_osds(struct ceph_osd_client * osdc)5231  void ceph_osdc_reopen_osds(struct ceph_osd_client *osdc)
5232  {
5233  	struct rb_node *n;
5234  
5235  	down_write(&osdc->lock);
5236  	for (n = rb_first(&osdc->osds); n; ) {
5237  		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
5238  
5239  		n = rb_next(n);
5240  		if (!reopen_osd(osd))
5241  			kick_osd_requests(osd);
5242  	}
5243  	up_write(&osdc->lock);
5244  }
5245  
5246  /*
5247   * init, shutdown
5248   */
ceph_osdc_init(struct ceph_osd_client * osdc,struct ceph_client * client)5249  int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
5250  {
5251  	int err;
5252  
5253  	dout("init\n");
5254  	osdc->client = client;
5255  	init_rwsem(&osdc->lock);
5256  	osdc->osds = RB_ROOT;
5257  	INIT_LIST_HEAD(&osdc->osd_lru);
5258  	spin_lock_init(&osdc->osd_lru_lock);
5259  	osd_init(&osdc->homeless_osd);
5260  	osdc->homeless_osd.o_osdc = osdc;
5261  	osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
5262  	osdc->last_linger_id = CEPH_LINGER_ID_START;
5263  	osdc->linger_requests = RB_ROOT;
5264  	osdc->map_checks = RB_ROOT;
5265  	osdc->linger_map_checks = RB_ROOT;
5266  	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
5267  	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
5268  
5269  	err = -ENOMEM;
5270  	osdc->osdmap = ceph_osdmap_alloc();
5271  	if (!osdc->osdmap)
5272  		goto out;
5273  
5274  	osdc->req_mempool = mempool_create_slab_pool(10,
5275  						     ceph_osd_request_cache);
5276  	if (!osdc->req_mempool)
5277  		goto out_map;
5278  
5279  	err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
5280  				PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op");
5281  	if (err < 0)
5282  		goto out_mempool;
5283  	err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
5284  				PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10,
5285  				"osd_op_reply");
5286  	if (err < 0)
5287  		goto out_msgpool;
5288  
5289  	err = -ENOMEM;
5290  	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
5291  	if (!osdc->notify_wq)
5292  		goto out_msgpool_reply;
5293  
5294  	osdc->completion_wq = create_singlethread_workqueue("ceph-completion");
5295  	if (!osdc->completion_wq)
5296  		goto out_notify_wq;
5297  
5298  	schedule_delayed_work(&osdc->timeout_work,
5299  			      osdc->client->options->osd_keepalive_timeout);
5300  	schedule_delayed_work(&osdc->osds_timeout_work,
5301  	    round_jiffies_relative(osdc->client->options->osd_idle_ttl));
5302  
5303  	return 0;
5304  
5305  out_notify_wq:
5306  	destroy_workqueue(osdc->notify_wq);
5307  out_msgpool_reply:
5308  	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5309  out_msgpool:
5310  	ceph_msgpool_destroy(&osdc->msgpool_op);
5311  out_mempool:
5312  	mempool_destroy(osdc->req_mempool);
5313  out_map:
5314  	ceph_osdmap_destroy(osdc->osdmap);
5315  out:
5316  	return err;
5317  }
5318  
ceph_osdc_stop(struct ceph_osd_client * osdc)5319  void ceph_osdc_stop(struct ceph_osd_client *osdc)
5320  {
5321  	destroy_workqueue(osdc->completion_wq);
5322  	destroy_workqueue(osdc->notify_wq);
5323  	cancel_delayed_work_sync(&osdc->timeout_work);
5324  	cancel_delayed_work_sync(&osdc->osds_timeout_work);
5325  
5326  	down_write(&osdc->lock);
5327  	while (!RB_EMPTY_ROOT(&osdc->osds)) {
5328  		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
5329  						struct ceph_osd, o_node);
5330  		close_osd(osd);
5331  	}
5332  	up_write(&osdc->lock);
5333  	WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
5334  	osd_cleanup(&osdc->homeless_osd);
5335  
5336  	WARN_ON(!list_empty(&osdc->osd_lru));
5337  	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
5338  	WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
5339  	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
5340  	WARN_ON(atomic_read(&osdc->num_requests));
5341  	WARN_ON(atomic_read(&osdc->num_homeless));
5342  
5343  	ceph_osdmap_destroy(osdc->osdmap);
5344  	mempool_destroy(osdc->req_mempool);
5345  	ceph_msgpool_destroy(&osdc->msgpool_op);
5346  	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5347  }
5348  
osd_req_op_copy_from_init(struct ceph_osd_request * req,u64 src_snapid,u64 src_version,struct ceph_object_id * src_oid,struct ceph_object_locator * src_oloc,u32 src_fadvise_flags,u32 dst_fadvise_flags,u32 truncate_seq,u64 truncate_size,u8 copy_from_flags)5349  int osd_req_op_copy_from_init(struct ceph_osd_request *req,
5350  			      u64 src_snapid, u64 src_version,
5351  			      struct ceph_object_id *src_oid,
5352  			      struct ceph_object_locator *src_oloc,
5353  			      u32 src_fadvise_flags,
5354  			      u32 dst_fadvise_flags,
5355  			      u32 truncate_seq, u64 truncate_size,
5356  			      u8 copy_from_flags)
5357  {
5358  	struct ceph_osd_req_op *op;
5359  	struct page **pages;
5360  	void *p, *end;
5361  
5362  	pages = ceph_alloc_page_vector(1, GFP_KERNEL);
5363  	if (IS_ERR(pages))
5364  		return PTR_ERR(pages);
5365  
5366  	op = osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2,
5367  			     dst_fadvise_flags);
5368  	op->copy_from.snapid = src_snapid;
5369  	op->copy_from.src_version = src_version;
5370  	op->copy_from.flags = copy_from_flags;
5371  	op->copy_from.src_fadvise_flags = src_fadvise_flags;
5372  
5373  	p = page_address(pages[0]);
5374  	end = p + PAGE_SIZE;
5375  	ceph_encode_string(&p, end, src_oid->name, src_oid->name_len);
5376  	encode_oloc(&p, end, src_oloc);
5377  	ceph_encode_32(&p, truncate_seq);
5378  	ceph_encode_64(&p, truncate_size);
5379  	op->indata_len = PAGE_SIZE - (end - p);
5380  
5381  	ceph_osd_data_pages_init(&op->copy_from.osd_data, pages,
5382  				 op->indata_len, 0, false, true);
5383  	return 0;
5384  }
5385  EXPORT_SYMBOL(osd_req_op_copy_from_init);
5386  
ceph_osdc_setup(void)5387  int __init ceph_osdc_setup(void)
5388  {
5389  	size_t size = sizeof(struct ceph_osd_request) +
5390  	    CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
5391  
5392  	BUG_ON(ceph_osd_request_cache);
5393  	ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
5394  						   0, 0, NULL);
5395  
5396  	return ceph_osd_request_cache ? 0 : -ENOMEM;
5397  }
5398  
ceph_osdc_cleanup(void)5399  void ceph_osdc_cleanup(void)
5400  {
5401  	BUG_ON(!ceph_osd_request_cache);
5402  	kmem_cache_destroy(ceph_osd_request_cache);
5403  	ceph_osd_request_cache = NULL;
5404  }
5405  
5406  /*
5407   * handle incoming message
5408   */
osd_dispatch(struct ceph_connection * con,struct ceph_msg * msg)5409  static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5410  {
5411  	struct ceph_osd *osd = con->private;
5412  	struct ceph_osd_client *osdc = osd->o_osdc;
5413  	int type = le16_to_cpu(msg->hdr.type);
5414  
5415  	switch (type) {
5416  	case CEPH_MSG_OSD_MAP:
5417  		ceph_osdc_handle_map(osdc, msg);
5418  		break;
5419  	case CEPH_MSG_OSD_OPREPLY:
5420  		handle_reply(osd, msg);
5421  		break;
5422  	case CEPH_MSG_OSD_BACKOFF:
5423  		handle_backoff(osd, msg);
5424  		break;
5425  	case CEPH_MSG_WATCH_NOTIFY:
5426  		handle_watch_notify(osdc, msg);
5427  		break;
5428  
5429  	default:
5430  		pr_err("received unknown message type %d %s\n", type,
5431  		       ceph_msg_type_name(type));
5432  	}
5433  
5434  	ceph_msg_put(msg);
5435  }
5436  
5437  /* How much sparse data was requested? */
sparse_data_requested(struct ceph_osd_request * req)5438  static u64 sparse_data_requested(struct ceph_osd_request *req)
5439  {
5440  	u64 len = 0;
5441  
5442  	if (req->r_flags & CEPH_OSD_FLAG_READ) {
5443  		int i;
5444  
5445  		for (i = 0; i < req->r_num_ops; ++i) {
5446  			struct ceph_osd_req_op *op = &req->r_ops[i];
5447  
5448  			if (op->op == CEPH_OSD_OP_SPARSE_READ)
5449  				len += op->extent.length;
5450  		}
5451  	}
5452  	return len;
5453  }
5454  
5455  /*
5456   * Lookup and return message for incoming reply.  Don't try to do
5457   * anything about a larger than preallocated data portion of the
5458   * message at the moment - for now, just skip the message.
5459   */
get_reply(struct ceph_connection * con,struct ceph_msg_header * hdr,int * skip)5460  static struct ceph_msg *get_reply(struct ceph_connection *con,
5461  				  struct ceph_msg_header *hdr,
5462  				  int *skip)
5463  {
5464  	struct ceph_osd *osd = con->private;
5465  	struct ceph_osd_client *osdc = osd->o_osdc;
5466  	struct ceph_msg *m = NULL;
5467  	struct ceph_osd_request *req;
5468  	int front_len = le32_to_cpu(hdr->front_len);
5469  	int data_len = le32_to_cpu(hdr->data_len);
5470  	u64 tid = le64_to_cpu(hdr->tid);
5471  	u64 srlen;
5472  
5473  	down_read(&osdc->lock);
5474  	if (!osd_registered(osd)) {
5475  		dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
5476  		*skip = 1;
5477  		goto out_unlock_osdc;
5478  	}
5479  	WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
5480  
5481  	mutex_lock(&osd->lock);
5482  	req = lookup_request(&osd->o_requests, tid);
5483  	if (!req) {
5484  		dout("%s osd%d tid %llu unknown, skipping\n", __func__,
5485  		     osd->o_osd, tid);
5486  		*skip = 1;
5487  		goto out_unlock_session;
5488  	}
5489  
5490  	ceph_msg_revoke_incoming(req->r_reply);
5491  
5492  	if (front_len > req->r_reply->front_alloc_len) {
5493  		pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
5494  			__func__, osd->o_osd, req->r_tid, front_len,
5495  			req->r_reply->front_alloc_len);
5496  		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
5497  				 false);
5498  		if (!m)
5499  			goto out_unlock_session;
5500  		ceph_msg_put(req->r_reply);
5501  		req->r_reply = m;
5502  	}
5503  
5504  	srlen = sparse_data_requested(req);
5505  	if (!srlen && data_len > req->r_reply->data_length) {
5506  		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
5507  			__func__, osd->o_osd, req->r_tid, data_len,
5508  			req->r_reply->data_length);
5509  		m = NULL;
5510  		*skip = 1;
5511  		goto out_unlock_session;
5512  	}
5513  
5514  	m = ceph_msg_get(req->r_reply);
5515  	m->sparse_read_total = srlen;
5516  
5517  	dout("get_reply tid %lld %p\n", tid, m);
5518  
5519  out_unlock_session:
5520  	mutex_unlock(&osd->lock);
5521  out_unlock_osdc:
5522  	up_read(&osdc->lock);
5523  	return m;
5524  }
5525  
alloc_msg_with_page_vector(struct ceph_msg_header * hdr)5526  static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
5527  {
5528  	struct ceph_msg *m;
5529  	int type = le16_to_cpu(hdr->type);
5530  	u32 front_len = le32_to_cpu(hdr->front_len);
5531  	u32 data_len = le32_to_cpu(hdr->data_len);
5532  
5533  	m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false);
5534  	if (!m)
5535  		return NULL;
5536  
5537  	if (data_len) {
5538  		struct page **pages;
5539  
5540  		pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
5541  					       GFP_NOIO);
5542  		if (IS_ERR(pages)) {
5543  			ceph_msg_put(m);
5544  			return NULL;
5545  		}
5546  
5547  		ceph_msg_data_add_pages(m, pages, data_len, 0, true);
5548  	}
5549  
5550  	return m;
5551  }
5552  
osd_alloc_msg(struct ceph_connection * con,struct ceph_msg_header * hdr,int * skip)5553  static struct ceph_msg *osd_alloc_msg(struct ceph_connection *con,
5554  				      struct ceph_msg_header *hdr,
5555  				      int *skip)
5556  {
5557  	struct ceph_osd *osd = con->private;
5558  	int type = le16_to_cpu(hdr->type);
5559  
5560  	*skip = 0;
5561  	switch (type) {
5562  	case CEPH_MSG_OSD_MAP:
5563  	case CEPH_MSG_OSD_BACKOFF:
5564  	case CEPH_MSG_WATCH_NOTIFY:
5565  		return alloc_msg_with_page_vector(hdr);
5566  	case CEPH_MSG_OSD_OPREPLY:
5567  		return get_reply(con, hdr, skip);
5568  	default:
5569  		pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
5570  			osd->o_osd, type);
5571  		*skip = 1;
5572  		return NULL;
5573  	}
5574  }
5575  
5576  /*
5577   * Wrappers to refcount containing ceph_osd struct
5578   */
osd_get_con(struct ceph_connection * con)5579  static struct ceph_connection *osd_get_con(struct ceph_connection *con)
5580  {
5581  	struct ceph_osd *osd = con->private;
5582  	if (get_osd(osd))
5583  		return con;
5584  	return NULL;
5585  }
5586  
osd_put_con(struct ceph_connection * con)5587  static void osd_put_con(struct ceph_connection *con)
5588  {
5589  	struct ceph_osd *osd = con->private;
5590  	put_osd(osd);
5591  }
5592  
5593  /*
5594   * authentication
5595   */
5596  
5597  /*
5598   * Note: returned pointer is the address of a structure that's
5599   * managed separately.  Caller must *not* attempt to free it.
5600   */
5601  static struct ceph_auth_handshake *
osd_get_authorizer(struct ceph_connection * con,int * proto,int force_new)5602  osd_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5603  {
5604  	struct ceph_osd *o = con->private;
5605  	struct ceph_osd_client *osdc = o->o_osdc;
5606  	struct ceph_auth_client *ac = osdc->client->monc.auth;
5607  	struct ceph_auth_handshake *auth = &o->o_auth;
5608  	int ret;
5609  
5610  	ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_OSD,
5611  					 force_new, proto, NULL, NULL);
5612  	if (ret)
5613  		return ERR_PTR(ret);
5614  
5615  	return auth;
5616  }
5617  
osd_add_authorizer_challenge(struct ceph_connection * con,void * challenge_buf,int challenge_buf_len)5618  static int osd_add_authorizer_challenge(struct ceph_connection *con,
5619  				    void *challenge_buf, int challenge_buf_len)
5620  {
5621  	struct ceph_osd *o = con->private;
5622  	struct ceph_osd_client *osdc = o->o_osdc;
5623  	struct ceph_auth_client *ac = osdc->client->monc.auth;
5624  
5625  	return ceph_auth_add_authorizer_challenge(ac, o->o_auth.authorizer,
5626  					    challenge_buf, challenge_buf_len);
5627  }
5628  
osd_verify_authorizer_reply(struct ceph_connection * con)5629  static int osd_verify_authorizer_reply(struct ceph_connection *con)
5630  {
5631  	struct ceph_osd *o = con->private;
5632  	struct ceph_osd_client *osdc = o->o_osdc;
5633  	struct ceph_auth_client *ac = osdc->client->monc.auth;
5634  	struct ceph_auth_handshake *auth = &o->o_auth;
5635  
5636  	return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5637  		auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5638  		NULL, NULL, NULL, NULL);
5639  }
5640  
osd_invalidate_authorizer(struct ceph_connection * con)5641  static int osd_invalidate_authorizer(struct ceph_connection *con)
5642  {
5643  	struct ceph_osd *o = con->private;
5644  	struct ceph_osd_client *osdc = o->o_osdc;
5645  	struct ceph_auth_client *ac = osdc->client->monc.auth;
5646  
5647  	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
5648  	return ceph_monc_validate_auth(&osdc->client->monc);
5649  }
5650  
osd_get_auth_request(struct ceph_connection * con,void * buf,int * buf_len,void ** authorizer,int * authorizer_len)5651  static int osd_get_auth_request(struct ceph_connection *con,
5652  				void *buf, int *buf_len,
5653  				void **authorizer, int *authorizer_len)
5654  {
5655  	struct ceph_osd *o = con->private;
5656  	struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5657  	struct ceph_auth_handshake *auth = &o->o_auth;
5658  	int ret;
5659  
5660  	ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_OSD,
5661  				       buf, buf_len);
5662  	if (ret)
5663  		return ret;
5664  
5665  	*authorizer = auth->authorizer_buf;
5666  	*authorizer_len = auth->authorizer_buf_len;
5667  	return 0;
5668  }
5669  
osd_handle_auth_reply_more(struct ceph_connection * con,void * reply,int reply_len,void * buf,int * buf_len,void ** authorizer,int * authorizer_len)5670  static int osd_handle_auth_reply_more(struct ceph_connection *con,
5671  				      void *reply, int reply_len,
5672  				      void *buf, int *buf_len,
5673  				      void **authorizer, int *authorizer_len)
5674  {
5675  	struct ceph_osd *o = con->private;
5676  	struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5677  	struct ceph_auth_handshake *auth = &o->o_auth;
5678  	int ret;
5679  
5680  	ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5681  					      buf, buf_len);
5682  	if (ret)
5683  		return ret;
5684  
5685  	*authorizer = auth->authorizer_buf;
5686  	*authorizer_len = auth->authorizer_buf_len;
5687  	return 0;
5688  }
5689  
osd_handle_auth_done(struct ceph_connection * con,u64 global_id,void * reply,int reply_len,u8 * session_key,int * session_key_len,u8 * con_secret,int * con_secret_len)5690  static int osd_handle_auth_done(struct ceph_connection *con,
5691  				u64 global_id, void *reply, int reply_len,
5692  				u8 *session_key, int *session_key_len,
5693  				u8 *con_secret, int *con_secret_len)
5694  {
5695  	struct ceph_osd *o = con->private;
5696  	struct ceph_auth_client *ac = o->o_osdc->client->monc.auth;
5697  	struct ceph_auth_handshake *auth = &o->o_auth;
5698  
5699  	return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5700  					       session_key, session_key_len,
5701  					       con_secret, con_secret_len);
5702  }
5703  
osd_handle_auth_bad_method(struct ceph_connection * con,int used_proto,int result,const int * allowed_protos,int proto_cnt,const int * allowed_modes,int mode_cnt)5704  static int osd_handle_auth_bad_method(struct ceph_connection *con,
5705  				      int used_proto, int result,
5706  				      const int *allowed_protos, int proto_cnt,
5707  				      const int *allowed_modes, int mode_cnt)
5708  {
5709  	struct ceph_osd *o = con->private;
5710  	struct ceph_mon_client *monc = &o->o_osdc->client->monc;
5711  	int ret;
5712  
5713  	if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_OSD,
5714  					    used_proto, result,
5715  					    allowed_protos, proto_cnt,
5716  					    allowed_modes, mode_cnt)) {
5717  		ret = ceph_monc_validate_auth(monc);
5718  		if (ret)
5719  			return ret;
5720  	}
5721  
5722  	return -EACCES;
5723  }
5724  
osd_reencode_message(struct ceph_msg * msg)5725  static void osd_reencode_message(struct ceph_msg *msg)
5726  {
5727  	int type = le16_to_cpu(msg->hdr.type);
5728  
5729  	if (type == CEPH_MSG_OSD_OP)
5730  		encode_request_finish(msg);
5731  }
5732  
osd_sign_message(struct ceph_msg * msg)5733  static int osd_sign_message(struct ceph_msg *msg)
5734  {
5735  	struct ceph_osd *o = msg->con->private;
5736  	struct ceph_auth_handshake *auth = &o->o_auth;
5737  
5738  	return ceph_auth_sign_message(auth, msg);
5739  }
5740  
osd_check_message_signature(struct ceph_msg * msg)5741  static int osd_check_message_signature(struct ceph_msg *msg)
5742  {
5743  	struct ceph_osd *o = msg->con->private;
5744  	struct ceph_auth_handshake *auth = &o->o_auth;
5745  
5746  	return ceph_auth_check_message_signature(auth, msg);
5747  }
5748  
advance_cursor(struct ceph_msg_data_cursor * cursor,size_t len,bool zero)5749  static void advance_cursor(struct ceph_msg_data_cursor *cursor, size_t len,
5750  			   bool zero)
5751  {
5752  	while (len) {
5753  		struct page *page;
5754  		size_t poff, plen;
5755  
5756  		page = ceph_msg_data_next(cursor, &poff, &plen);
5757  		if (plen > len)
5758  			plen = len;
5759  		if (zero)
5760  			zero_user_segment(page, poff, poff + plen);
5761  		len -= plen;
5762  		ceph_msg_data_advance(cursor, plen);
5763  	}
5764  }
5765  
prep_next_sparse_read(struct ceph_connection * con,struct ceph_msg_data_cursor * cursor)5766  static int prep_next_sparse_read(struct ceph_connection *con,
5767  				 struct ceph_msg_data_cursor *cursor)
5768  {
5769  	struct ceph_osd *o = con->private;
5770  	struct ceph_sparse_read *sr = &o->o_sparse_read;
5771  	struct ceph_osd_request *req;
5772  	struct ceph_osd_req_op *op;
5773  
5774  	spin_lock(&o->o_requests_lock);
5775  	req = lookup_request(&o->o_requests, le64_to_cpu(con->in_msg->hdr.tid));
5776  	if (!req) {
5777  		spin_unlock(&o->o_requests_lock);
5778  		return -EBADR;
5779  	}
5780  
5781  	if (o->o_sparse_op_idx < 0) {
5782  		dout("%s: [%d] starting new sparse read req\n",
5783  		     __func__, o->o_osd);
5784  	} else {
5785  		u64 end;
5786  
5787  		op = &req->r_ops[o->o_sparse_op_idx];
5788  
5789  		WARN_ON_ONCE(op->extent.sparse_ext);
5790  
5791  		/* hand back buffer we took earlier */
5792  		op->extent.sparse_ext = sr->sr_extent;
5793  		sr->sr_extent = NULL;
5794  		op->extent.sparse_ext_cnt = sr->sr_count;
5795  		sr->sr_ext_len = 0;
5796  		dout("%s: [%d] completed extent array len %d cursor->resid %zd\n",
5797  		     __func__, o->o_osd, op->extent.sparse_ext_cnt, cursor->resid);
5798  		/* Advance to end of data for this operation */
5799  		end = ceph_sparse_ext_map_end(op);
5800  		if (end < sr->sr_req_len)
5801  			advance_cursor(cursor, sr->sr_req_len - end, false);
5802  	}
5803  
5804  	ceph_init_sparse_read(sr);
5805  
5806  	/* find next op in this request (if any) */
5807  	while (++o->o_sparse_op_idx < req->r_num_ops) {
5808  		op = &req->r_ops[o->o_sparse_op_idx];
5809  		if (op->op == CEPH_OSD_OP_SPARSE_READ)
5810  			goto found;
5811  	}
5812  
5813  	/* reset for next sparse read request */
5814  	spin_unlock(&o->o_requests_lock);
5815  	o->o_sparse_op_idx = -1;
5816  	return 0;
5817  found:
5818  	sr->sr_req_off = op->extent.offset;
5819  	sr->sr_req_len = op->extent.length;
5820  	sr->sr_pos = sr->sr_req_off;
5821  	dout("%s: [%d] new sparse read op at idx %d 0x%llx~0x%llx\n", __func__,
5822  	     o->o_osd, o->o_sparse_op_idx, sr->sr_req_off, sr->sr_req_len);
5823  
5824  	/* hand off request's sparse extent map buffer */
5825  	sr->sr_ext_len = op->extent.sparse_ext_cnt;
5826  	op->extent.sparse_ext_cnt = 0;
5827  	sr->sr_extent = op->extent.sparse_ext;
5828  	op->extent.sparse_ext = NULL;
5829  
5830  	spin_unlock(&o->o_requests_lock);
5831  	return 1;
5832  }
5833  
5834  #ifdef __BIG_ENDIAN
convert_extent_map(struct ceph_sparse_read * sr)5835  static inline void convert_extent_map(struct ceph_sparse_read *sr)
5836  {
5837  	int i;
5838  
5839  	for (i = 0; i < sr->sr_count; i++) {
5840  		struct ceph_sparse_extent *ext = &sr->sr_extent[i];
5841  
5842  		ext->off = le64_to_cpu((__force __le64)ext->off);
5843  		ext->len = le64_to_cpu((__force __le64)ext->len);
5844  	}
5845  }
5846  #else
convert_extent_map(struct ceph_sparse_read * sr)5847  static inline void convert_extent_map(struct ceph_sparse_read *sr)
5848  {
5849  }
5850  #endif
5851  
5852  #define MAX_EXTENTS 4096
5853  
osd_sparse_read(struct ceph_connection * con,struct ceph_msg_data_cursor * cursor,char ** pbuf)5854  static int osd_sparse_read(struct ceph_connection *con,
5855  			   struct ceph_msg_data_cursor *cursor,
5856  			   char **pbuf)
5857  {
5858  	struct ceph_osd *o = con->private;
5859  	struct ceph_sparse_read *sr = &o->o_sparse_read;
5860  	u32 count = sr->sr_count;
5861  	u64 eoff, elen, len = 0;
5862  	int i, ret;
5863  
5864  	switch (sr->sr_state) {
5865  	case CEPH_SPARSE_READ_HDR:
5866  next_op:
5867  		ret = prep_next_sparse_read(con, cursor);
5868  		if (ret <= 0)
5869  			return ret;
5870  
5871  		/* number of extents */
5872  		ret = sizeof(sr->sr_count);
5873  		*pbuf = (char *)&sr->sr_count;
5874  		sr->sr_state = CEPH_SPARSE_READ_EXTENTS;
5875  		break;
5876  	case CEPH_SPARSE_READ_EXTENTS:
5877  		/* Convert sr_count to host-endian */
5878  		count = le32_to_cpu((__force __le32)sr->sr_count);
5879  		sr->sr_count = count;
5880  		dout("[%d] got %u extents\n", o->o_osd, count);
5881  
5882  		if (count > 0) {
5883  			if (!sr->sr_extent || count > sr->sr_ext_len) {
5884  				/*
5885  				 * Apply a hard cap to the number of extents.
5886  				 * If we have more, assume something is wrong.
5887  				 */
5888  				if (count > MAX_EXTENTS) {
5889  					dout("%s: OSD returned 0x%x extents in a single reply!\n",
5890  					     __func__, count);
5891  					return -EREMOTEIO;
5892  				}
5893  
5894  				/* no extent array provided, or too short */
5895  				kfree(sr->sr_extent);
5896  				sr->sr_extent = kmalloc_array(count,
5897  							      sizeof(*sr->sr_extent),
5898  							      GFP_NOIO);
5899  				if (!sr->sr_extent)
5900  					return -ENOMEM;
5901  				sr->sr_ext_len = count;
5902  			}
5903  			ret = count * sizeof(*sr->sr_extent);
5904  			*pbuf = (char *)sr->sr_extent;
5905  			sr->sr_state = CEPH_SPARSE_READ_DATA_LEN;
5906  			break;
5907  		}
5908  		/* No extents? Read data len */
5909  		fallthrough;
5910  	case CEPH_SPARSE_READ_DATA_LEN:
5911  		convert_extent_map(sr);
5912  		ret = sizeof(sr->sr_datalen);
5913  		*pbuf = (char *)&sr->sr_datalen;
5914  		sr->sr_state = CEPH_SPARSE_READ_DATA_PRE;
5915  		break;
5916  	case CEPH_SPARSE_READ_DATA_PRE:
5917  		/* Convert sr_datalen to host-endian */
5918  		sr->sr_datalen = le32_to_cpu((__force __le32)sr->sr_datalen);
5919  		for (i = 0; i < count; i++)
5920  			len += sr->sr_extent[i].len;
5921  		if (sr->sr_datalen != len) {
5922  			pr_warn_ratelimited("data len %u != extent len %llu\n",
5923  					    sr->sr_datalen, len);
5924  			return -EREMOTEIO;
5925  		}
5926  		sr->sr_state = CEPH_SPARSE_READ_DATA;
5927  		fallthrough;
5928  	case CEPH_SPARSE_READ_DATA:
5929  		if (sr->sr_index >= count) {
5930  			sr->sr_state = CEPH_SPARSE_READ_HDR;
5931  			goto next_op;
5932  		}
5933  
5934  		eoff = sr->sr_extent[sr->sr_index].off;
5935  		elen = sr->sr_extent[sr->sr_index].len;
5936  
5937  		dout("[%d] ext %d off 0x%llx len 0x%llx\n",
5938  		     o->o_osd, sr->sr_index, eoff, elen);
5939  
5940  		if (elen > INT_MAX) {
5941  			dout("Sparse read extent length too long (0x%llx)\n",
5942  			     elen);
5943  			return -EREMOTEIO;
5944  		}
5945  
5946  		/* zero out anything from sr_pos to start of extent */
5947  		if (sr->sr_pos < eoff)
5948  			advance_cursor(cursor, eoff - sr->sr_pos, true);
5949  
5950  		/* Set position to end of extent */
5951  		sr->sr_pos = eoff + elen;
5952  
5953  		/* send back the new length and nullify the ptr */
5954  		cursor->sr_resid = elen;
5955  		ret = elen;
5956  		*pbuf = NULL;
5957  
5958  		/* Bump the array index */
5959  		++sr->sr_index;
5960  		break;
5961  	}
5962  	return ret;
5963  }
5964  
5965  static const struct ceph_connection_operations osd_con_ops = {
5966  	.get = osd_get_con,
5967  	.put = osd_put_con,
5968  	.sparse_read = osd_sparse_read,
5969  	.alloc_msg = osd_alloc_msg,
5970  	.dispatch = osd_dispatch,
5971  	.fault = osd_fault,
5972  	.reencode_message = osd_reencode_message,
5973  	.get_authorizer = osd_get_authorizer,
5974  	.add_authorizer_challenge = osd_add_authorizer_challenge,
5975  	.verify_authorizer_reply = osd_verify_authorizer_reply,
5976  	.invalidate_authorizer = osd_invalidate_authorizer,
5977  	.sign_message = osd_sign_message,
5978  	.check_message_signature = osd_check_message_signature,
5979  	.get_auth_request = osd_get_auth_request,
5980  	.handle_auth_reply_more = osd_handle_auth_reply_more,
5981  	.handle_auth_done = osd_handle_auth_done,
5982  	.handle_auth_bad_method = osd_handle_auth_bad_method,
5983  };
5984