xref: /openbmc/linux/net/ceph/osd_client.c (revision 2eb0f624b709e78ec8e2f4c3412947703db99301)
1 // SPDX-License-Identifier: GPL-2.0
2 
3 #include <linux/ceph/ceph_debug.h>
4 
5 #include <linux/module.h>
6 #include <linux/err.h>
7 #include <linux/highmem.h>
8 #include <linux/mm.h>
9 #include <linux/pagemap.h>
10 #include <linux/slab.h>
11 #include <linux/uaccess.h>
12 #ifdef CONFIG_BLOCK
13 #include <linux/bio.h>
14 #endif
15 
16 #include <linux/ceph/ceph_features.h>
17 #include <linux/ceph/libceph.h>
18 #include <linux/ceph/osd_client.h>
19 #include <linux/ceph/messenger.h>
20 #include <linux/ceph/decode.h>
21 #include <linux/ceph/auth.h>
22 #include <linux/ceph/pagelist.h>
23 #include <linux/ceph/striper.h>
24 
25 #define OSD_OPREPLY_FRONT_LEN	512
26 
27 static struct kmem_cache	*ceph_osd_request_cache;
28 
29 static const struct ceph_connection_operations osd_con_ops;
30 
31 /*
32  * Implement client access to distributed object storage cluster.
33  *
34  * All data objects are stored within a cluster/cloud of OSDs, or
35  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
36  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
37  * remote daemons serving up and coordinating consistent and safe
38  * access to storage.
39  *
40  * Cluster membership and the mapping of data objects onto storage devices
41  * are described by the osd map.
42  *
43  * We keep track of pending OSD requests (read, write), resubmit
44  * requests to different OSDs when the cluster topology/data layout
45  * change, or retry the affected requests when the communications
46  * channel with an OSD is reset.
47  */
48 
49 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
50 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
51 static void link_linger(struct ceph_osd *osd,
52 			struct ceph_osd_linger_request *lreq);
53 static void unlink_linger(struct ceph_osd *osd,
54 			  struct ceph_osd_linger_request *lreq);
55 static void clear_backoffs(struct ceph_osd *osd);
56 
57 #if 1
58 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
59 {
60 	bool wrlocked = true;
61 
62 	if (unlikely(down_read_trylock(sem))) {
63 		wrlocked = false;
64 		up_read(sem);
65 	}
66 
67 	return wrlocked;
68 }
69 static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
70 {
71 	WARN_ON(!rwsem_is_locked(&osdc->lock));
72 }
73 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
74 {
75 	WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
76 }
77 static inline void verify_osd_locked(struct ceph_osd *osd)
78 {
79 	struct ceph_osd_client *osdc = osd->o_osdc;
80 
81 	WARN_ON(!(mutex_is_locked(&osd->lock) &&
82 		  rwsem_is_locked(&osdc->lock)) &&
83 		!rwsem_is_wrlocked(&osdc->lock));
84 }
85 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
86 {
87 	WARN_ON(!mutex_is_locked(&lreq->lock));
88 }
89 #else
90 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
91 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
92 static inline void verify_osd_locked(struct ceph_osd *osd) { }
93 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
94 #endif
95 
96 /*
97  * calculate the mapping of a file extent onto an object, and fill out the
98  * request accordingly.  shorten extent as necessary if it crosses an
99  * object boundary.
100  *
101  * fill osd op in request message.
102  */
103 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
104 			u64 *objnum, u64 *objoff, u64 *objlen)
105 {
106 	u64 orig_len = *plen;
107 	u32 xlen;
108 
109 	/* object extent? */
110 	ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
111 					  objoff, &xlen);
112 	*objlen = xlen;
113 	if (*objlen < orig_len) {
114 		*plen = *objlen;
115 		dout(" skipping last %llu, final file extent %llu~%llu\n",
116 		     orig_len - *plen, off, *plen);
117 	}
118 
119 	dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
120 	return 0;
121 }
122 
123 static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
124 {
125 	memset(osd_data, 0, sizeof (*osd_data));
126 	osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
127 }
128 
129 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
130 			struct page **pages, u64 length, u32 alignment,
131 			bool pages_from_pool, bool own_pages)
132 {
133 	osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
134 	osd_data->pages = pages;
135 	osd_data->length = length;
136 	osd_data->alignment = alignment;
137 	osd_data->pages_from_pool = pages_from_pool;
138 	osd_data->own_pages = own_pages;
139 }
140 
141 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
142 			struct ceph_pagelist *pagelist)
143 {
144 	osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
145 	osd_data->pagelist = pagelist;
146 }
147 
148 #ifdef CONFIG_BLOCK
149 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
150 				   struct ceph_bio_iter *bio_pos,
151 				   u32 bio_length)
152 {
153 	osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
154 	osd_data->bio_pos = *bio_pos;
155 	osd_data->bio_length = bio_length;
156 }
157 #endif /* CONFIG_BLOCK */
158 
159 static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
160 				     struct ceph_bvec_iter *bvec_pos)
161 {
162 	osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
163 	osd_data->bvec_pos = *bvec_pos;
164 }
165 
166 #define osd_req_op_data(oreq, whch, typ, fld)				\
167 ({									\
168 	struct ceph_osd_request *__oreq = (oreq);			\
169 	unsigned int __whch = (whch);					\
170 	BUG_ON(__whch >= __oreq->r_num_ops);				\
171 	&__oreq->r_ops[__whch].typ.fld;					\
172 })
173 
174 static struct ceph_osd_data *
175 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
176 {
177 	BUG_ON(which >= osd_req->r_num_ops);
178 
179 	return &osd_req->r_ops[which].raw_data_in;
180 }
181 
182 struct ceph_osd_data *
183 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
184 			unsigned int which)
185 {
186 	return osd_req_op_data(osd_req, which, extent, osd_data);
187 }
188 EXPORT_SYMBOL(osd_req_op_extent_osd_data);
189 
190 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
191 			unsigned int which, struct page **pages,
192 			u64 length, u32 alignment,
193 			bool pages_from_pool, bool own_pages)
194 {
195 	struct ceph_osd_data *osd_data;
196 
197 	osd_data = osd_req_op_raw_data_in(osd_req, which);
198 	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
199 				pages_from_pool, own_pages);
200 }
201 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
202 
203 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
204 			unsigned int which, struct page **pages,
205 			u64 length, u32 alignment,
206 			bool pages_from_pool, bool own_pages)
207 {
208 	struct ceph_osd_data *osd_data;
209 
210 	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
211 	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
212 				pages_from_pool, own_pages);
213 }
214 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
215 
216 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
217 			unsigned int which, struct ceph_pagelist *pagelist)
218 {
219 	struct ceph_osd_data *osd_data;
220 
221 	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
222 	ceph_osd_data_pagelist_init(osd_data, pagelist);
223 }
224 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
225 
226 #ifdef CONFIG_BLOCK
227 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
228 				    unsigned int which,
229 				    struct ceph_bio_iter *bio_pos,
230 				    u32 bio_length)
231 {
232 	struct ceph_osd_data *osd_data;
233 
234 	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
235 	ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
236 }
237 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
238 #endif /* CONFIG_BLOCK */
239 
240 void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
241 					 unsigned int which,
242 					 struct ceph_bvec_iter *bvec_pos)
243 {
244 	struct ceph_osd_data *osd_data;
245 
246 	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
247 	ceph_osd_data_bvecs_init(osd_data, bvec_pos);
248 }
249 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
250 
251 static void osd_req_op_cls_request_info_pagelist(
252 			struct ceph_osd_request *osd_req,
253 			unsigned int which, struct ceph_pagelist *pagelist)
254 {
255 	struct ceph_osd_data *osd_data;
256 
257 	osd_data = osd_req_op_data(osd_req, which, cls, request_info);
258 	ceph_osd_data_pagelist_init(osd_data, pagelist);
259 }
260 
261 void osd_req_op_cls_request_data_pagelist(
262 			struct ceph_osd_request *osd_req,
263 			unsigned int which, struct ceph_pagelist *pagelist)
264 {
265 	struct ceph_osd_data *osd_data;
266 
267 	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
268 	ceph_osd_data_pagelist_init(osd_data, pagelist);
269 	osd_req->r_ops[which].cls.indata_len += pagelist->length;
270 	osd_req->r_ops[which].indata_len += pagelist->length;
271 }
272 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
273 
274 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
275 			unsigned int which, struct page **pages, u64 length,
276 			u32 alignment, bool pages_from_pool, bool own_pages)
277 {
278 	struct ceph_osd_data *osd_data;
279 
280 	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
281 	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
282 				pages_from_pool, own_pages);
283 	osd_req->r_ops[which].cls.indata_len += length;
284 	osd_req->r_ops[which].indata_len += length;
285 }
286 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
287 
288 void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
289 				       unsigned int which,
290 				       struct bio_vec *bvecs, u32 bytes)
291 {
292 	struct ceph_osd_data *osd_data;
293 	struct ceph_bvec_iter it = {
294 		.bvecs = bvecs,
295 		.iter = { .bi_size = bytes },
296 	};
297 
298 	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
299 	ceph_osd_data_bvecs_init(osd_data, &it);
300 	osd_req->r_ops[which].cls.indata_len += bytes;
301 	osd_req->r_ops[which].indata_len += bytes;
302 }
303 EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
304 
305 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
306 			unsigned int which, struct page **pages, u64 length,
307 			u32 alignment, bool pages_from_pool, bool own_pages)
308 {
309 	struct ceph_osd_data *osd_data;
310 
311 	osd_data = osd_req_op_data(osd_req, which, cls, response_data);
312 	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
313 				pages_from_pool, own_pages);
314 }
315 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
316 
317 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
318 {
319 	switch (osd_data->type) {
320 	case CEPH_OSD_DATA_TYPE_NONE:
321 		return 0;
322 	case CEPH_OSD_DATA_TYPE_PAGES:
323 		return osd_data->length;
324 	case CEPH_OSD_DATA_TYPE_PAGELIST:
325 		return (u64)osd_data->pagelist->length;
326 #ifdef CONFIG_BLOCK
327 	case CEPH_OSD_DATA_TYPE_BIO:
328 		return (u64)osd_data->bio_length;
329 #endif /* CONFIG_BLOCK */
330 	case CEPH_OSD_DATA_TYPE_BVECS:
331 		return osd_data->bvec_pos.iter.bi_size;
332 	default:
333 		WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
334 		return 0;
335 	}
336 }
337 
338 static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
339 {
340 	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
341 		int num_pages;
342 
343 		num_pages = calc_pages_for((u64)osd_data->alignment,
344 						(u64)osd_data->length);
345 		ceph_release_page_vector(osd_data->pages, num_pages);
346 	}
347 	ceph_osd_data_init(osd_data);
348 }
349 
350 static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
351 			unsigned int which)
352 {
353 	struct ceph_osd_req_op *op;
354 
355 	BUG_ON(which >= osd_req->r_num_ops);
356 	op = &osd_req->r_ops[which];
357 
358 	switch (op->op) {
359 	case CEPH_OSD_OP_READ:
360 	case CEPH_OSD_OP_WRITE:
361 	case CEPH_OSD_OP_WRITEFULL:
362 		ceph_osd_data_release(&op->extent.osd_data);
363 		break;
364 	case CEPH_OSD_OP_CALL:
365 		ceph_osd_data_release(&op->cls.request_info);
366 		ceph_osd_data_release(&op->cls.request_data);
367 		ceph_osd_data_release(&op->cls.response_data);
368 		break;
369 	case CEPH_OSD_OP_SETXATTR:
370 	case CEPH_OSD_OP_CMPXATTR:
371 		ceph_osd_data_release(&op->xattr.osd_data);
372 		break;
373 	case CEPH_OSD_OP_STAT:
374 		ceph_osd_data_release(&op->raw_data_in);
375 		break;
376 	case CEPH_OSD_OP_NOTIFY_ACK:
377 		ceph_osd_data_release(&op->notify_ack.request_data);
378 		break;
379 	case CEPH_OSD_OP_NOTIFY:
380 		ceph_osd_data_release(&op->notify.request_data);
381 		ceph_osd_data_release(&op->notify.response_data);
382 		break;
383 	case CEPH_OSD_OP_LIST_WATCHERS:
384 		ceph_osd_data_release(&op->list_watchers.response_data);
385 		break;
386 	default:
387 		break;
388 	}
389 }
390 
391 /*
392  * Assumes @t is zero-initialized.
393  */
394 static void target_init(struct ceph_osd_request_target *t)
395 {
396 	ceph_oid_init(&t->base_oid);
397 	ceph_oloc_init(&t->base_oloc);
398 	ceph_oid_init(&t->target_oid);
399 	ceph_oloc_init(&t->target_oloc);
400 
401 	ceph_osds_init(&t->acting);
402 	ceph_osds_init(&t->up);
403 	t->size = -1;
404 	t->min_size = -1;
405 
406 	t->osd = CEPH_HOMELESS_OSD;
407 }
408 
409 static void target_copy(struct ceph_osd_request_target *dest,
410 			const struct ceph_osd_request_target *src)
411 {
412 	ceph_oid_copy(&dest->base_oid, &src->base_oid);
413 	ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
414 	ceph_oid_copy(&dest->target_oid, &src->target_oid);
415 	ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
416 
417 	dest->pgid = src->pgid; /* struct */
418 	dest->spgid = src->spgid; /* struct */
419 	dest->pg_num = src->pg_num;
420 	dest->pg_num_mask = src->pg_num_mask;
421 	ceph_osds_copy(&dest->acting, &src->acting);
422 	ceph_osds_copy(&dest->up, &src->up);
423 	dest->size = src->size;
424 	dest->min_size = src->min_size;
425 	dest->sort_bitwise = src->sort_bitwise;
426 
427 	dest->flags = src->flags;
428 	dest->paused = src->paused;
429 
430 	dest->epoch = src->epoch;
431 	dest->last_force_resend = src->last_force_resend;
432 
433 	dest->osd = src->osd;
434 }
435 
436 static void target_destroy(struct ceph_osd_request_target *t)
437 {
438 	ceph_oid_destroy(&t->base_oid);
439 	ceph_oloc_destroy(&t->base_oloc);
440 	ceph_oid_destroy(&t->target_oid);
441 	ceph_oloc_destroy(&t->target_oloc);
442 }
443 
444 /*
445  * requests
446  */
447 static void request_release_checks(struct ceph_osd_request *req)
448 {
449 	WARN_ON(!RB_EMPTY_NODE(&req->r_node));
450 	WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
451 	WARN_ON(!list_empty(&req->r_unsafe_item));
452 	WARN_ON(req->r_osd);
453 }
454 
455 static void ceph_osdc_release_request(struct kref *kref)
456 {
457 	struct ceph_osd_request *req = container_of(kref,
458 					    struct ceph_osd_request, r_kref);
459 	unsigned int which;
460 
461 	dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
462 	     req->r_request, req->r_reply);
463 	request_release_checks(req);
464 
465 	if (req->r_request)
466 		ceph_msg_put(req->r_request);
467 	if (req->r_reply)
468 		ceph_msg_put(req->r_reply);
469 
470 	for (which = 0; which < req->r_num_ops; which++)
471 		osd_req_op_data_release(req, which);
472 
473 	target_destroy(&req->r_t);
474 	ceph_put_snap_context(req->r_snapc);
475 
476 	if (req->r_mempool)
477 		mempool_free(req, req->r_osdc->req_mempool);
478 	else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
479 		kmem_cache_free(ceph_osd_request_cache, req);
480 	else
481 		kfree(req);
482 }
483 
484 void ceph_osdc_get_request(struct ceph_osd_request *req)
485 {
486 	dout("%s %p (was %d)\n", __func__, req,
487 	     kref_read(&req->r_kref));
488 	kref_get(&req->r_kref);
489 }
490 EXPORT_SYMBOL(ceph_osdc_get_request);
491 
492 void ceph_osdc_put_request(struct ceph_osd_request *req)
493 {
494 	if (req) {
495 		dout("%s %p (was %d)\n", __func__, req,
496 		     kref_read(&req->r_kref));
497 		kref_put(&req->r_kref, ceph_osdc_release_request);
498 	}
499 }
500 EXPORT_SYMBOL(ceph_osdc_put_request);
501 
502 static void request_init(struct ceph_osd_request *req)
503 {
504 	/* req only, each op is zeroed in _osd_req_op_init() */
505 	memset(req, 0, sizeof(*req));
506 
507 	kref_init(&req->r_kref);
508 	init_completion(&req->r_completion);
509 	RB_CLEAR_NODE(&req->r_node);
510 	RB_CLEAR_NODE(&req->r_mc_node);
511 	INIT_LIST_HEAD(&req->r_unsafe_item);
512 
513 	target_init(&req->r_t);
514 }
515 
516 /*
517  * This is ugly, but it allows us to reuse linger registration and ping
518  * requests, keeping the structure of the code around send_linger{_ping}()
519  * reasonable.  Setting up a min_nr=2 mempool for each linger request
520  * and dealing with copying ops (this blasts req only, watch op remains
521  * intact) isn't any better.
522  */
523 static void request_reinit(struct ceph_osd_request *req)
524 {
525 	struct ceph_osd_client *osdc = req->r_osdc;
526 	bool mempool = req->r_mempool;
527 	unsigned int num_ops = req->r_num_ops;
528 	u64 snapid = req->r_snapid;
529 	struct ceph_snap_context *snapc = req->r_snapc;
530 	bool linger = req->r_linger;
531 	struct ceph_msg *request_msg = req->r_request;
532 	struct ceph_msg *reply_msg = req->r_reply;
533 
534 	dout("%s req %p\n", __func__, req);
535 	WARN_ON(kref_read(&req->r_kref) != 1);
536 	request_release_checks(req);
537 
538 	WARN_ON(kref_read(&request_msg->kref) != 1);
539 	WARN_ON(kref_read(&reply_msg->kref) != 1);
540 	target_destroy(&req->r_t);
541 
542 	request_init(req);
543 	req->r_osdc = osdc;
544 	req->r_mempool = mempool;
545 	req->r_num_ops = num_ops;
546 	req->r_snapid = snapid;
547 	req->r_snapc = snapc;
548 	req->r_linger = linger;
549 	req->r_request = request_msg;
550 	req->r_reply = reply_msg;
551 }
552 
553 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
554 					       struct ceph_snap_context *snapc,
555 					       unsigned int num_ops,
556 					       bool use_mempool,
557 					       gfp_t gfp_flags)
558 {
559 	struct ceph_osd_request *req;
560 
561 	if (use_mempool) {
562 		BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
563 		req = mempool_alloc(osdc->req_mempool, gfp_flags);
564 	} else if (num_ops <= CEPH_OSD_SLAB_OPS) {
565 		req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
566 	} else {
567 		BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
568 		req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
569 			      gfp_flags);
570 	}
571 	if (unlikely(!req))
572 		return NULL;
573 
574 	request_init(req);
575 	req->r_osdc = osdc;
576 	req->r_mempool = use_mempool;
577 	req->r_num_ops = num_ops;
578 	req->r_snapid = CEPH_NOSNAP;
579 	req->r_snapc = ceph_get_snap_context(snapc);
580 
581 	dout("%s req %p\n", __func__, req);
582 	return req;
583 }
584 EXPORT_SYMBOL(ceph_osdc_alloc_request);
585 
586 static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
587 {
588 	return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
589 }
590 
591 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
592 {
593 	struct ceph_osd_client *osdc = req->r_osdc;
594 	struct ceph_msg *msg;
595 	int msg_size;
596 
597 	WARN_ON(ceph_oid_empty(&req->r_base_oid));
598 	WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
599 
600 	/* create request message */
601 	msg_size = CEPH_ENCODING_START_BLK_LEN +
602 			CEPH_PGID_ENCODING_LEN + 1; /* spgid */
603 	msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
604 	msg_size += CEPH_ENCODING_START_BLK_LEN +
605 			sizeof(struct ceph_osd_reqid); /* reqid */
606 	msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
607 	msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
608 	msg_size += CEPH_ENCODING_START_BLK_LEN +
609 			ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
610 	msg_size += 4 + req->r_base_oid.name_len; /* oid */
611 	msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
612 	msg_size += 8; /* snapid */
613 	msg_size += 8; /* snap_seq */
614 	msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
615 	msg_size += 4 + 8; /* retry_attempt, features */
616 
617 	if (req->r_mempool)
618 		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
619 	else
620 		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
621 	if (!msg)
622 		return -ENOMEM;
623 
624 	memset(msg->front.iov_base, 0, msg->front.iov_len);
625 	req->r_request = msg;
626 
627 	/* create reply message */
628 	msg_size = OSD_OPREPLY_FRONT_LEN;
629 	msg_size += req->r_base_oid.name_len;
630 	msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
631 
632 	if (req->r_mempool)
633 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
634 	else
635 		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
636 	if (!msg)
637 		return -ENOMEM;
638 
639 	req->r_reply = msg;
640 
641 	return 0;
642 }
643 EXPORT_SYMBOL(ceph_osdc_alloc_messages);
644 
645 static bool osd_req_opcode_valid(u16 opcode)
646 {
647 	switch (opcode) {
648 #define GENERATE_CASE(op, opcode, str)	case CEPH_OSD_OP_##op: return true;
649 __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
650 #undef GENERATE_CASE
651 	default:
652 		return false;
653 	}
654 }
655 
656 /*
657  * This is an osd op init function for opcodes that have no data or
658  * other information associated with them.  It also serves as a
659  * common init routine for all the other init functions, below.
660  */
661 static struct ceph_osd_req_op *
662 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
663 		 u16 opcode, u32 flags)
664 {
665 	struct ceph_osd_req_op *op;
666 
667 	BUG_ON(which >= osd_req->r_num_ops);
668 	BUG_ON(!osd_req_opcode_valid(opcode));
669 
670 	op = &osd_req->r_ops[which];
671 	memset(op, 0, sizeof (*op));
672 	op->op = opcode;
673 	op->flags = flags;
674 
675 	return op;
676 }
677 
678 void osd_req_op_init(struct ceph_osd_request *osd_req,
679 		     unsigned int which, u16 opcode, u32 flags)
680 {
681 	(void)_osd_req_op_init(osd_req, which, opcode, flags);
682 }
683 EXPORT_SYMBOL(osd_req_op_init);
684 
685 void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
686 				unsigned int which, u16 opcode,
687 				u64 offset, u64 length,
688 				u64 truncate_size, u32 truncate_seq)
689 {
690 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
691 						      opcode, 0);
692 	size_t payload_len = 0;
693 
694 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
695 	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
696 	       opcode != CEPH_OSD_OP_TRUNCATE);
697 
698 	op->extent.offset = offset;
699 	op->extent.length = length;
700 	op->extent.truncate_size = truncate_size;
701 	op->extent.truncate_seq = truncate_seq;
702 	if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
703 		payload_len += length;
704 
705 	op->indata_len = payload_len;
706 }
707 EXPORT_SYMBOL(osd_req_op_extent_init);
708 
709 void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
710 				unsigned int which, u64 length)
711 {
712 	struct ceph_osd_req_op *op;
713 	u64 previous;
714 
715 	BUG_ON(which >= osd_req->r_num_ops);
716 	op = &osd_req->r_ops[which];
717 	previous = op->extent.length;
718 
719 	if (length == previous)
720 		return;		/* Nothing to do */
721 	BUG_ON(length > previous);
722 
723 	op->extent.length = length;
724 	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
725 		op->indata_len -= previous - length;
726 }
727 EXPORT_SYMBOL(osd_req_op_extent_update);
728 
729 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
730 				unsigned int which, u64 offset_inc)
731 {
732 	struct ceph_osd_req_op *op, *prev_op;
733 
734 	BUG_ON(which + 1 >= osd_req->r_num_ops);
735 
736 	prev_op = &osd_req->r_ops[which];
737 	op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
738 	/* dup previous one */
739 	op->indata_len = prev_op->indata_len;
740 	op->outdata_len = prev_op->outdata_len;
741 	op->extent = prev_op->extent;
742 	/* adjust offset */
743 	op->extent.offset += offset_inc;
744 	op->extent.length -= offset_inc;
745 
746 	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
747 		op->indata_len -= offset_inc;
748 }
749 EXPORT_SYMBOL(osd_req_op_extent_dup_last);
750 
751 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
752 			u16 opcode, const char *class, const char *method)
753 {
754 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
755 						      opcode, 0);
756 	struct ceph_pagelist *pagelist;
757 	size_t payload_len = 0;
758 	size_t size;
759 
760 	BUG_ON(opcode != CEPH_OSD_OP_CALL);
761 
762 	pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
763 	BUG_ON(!pagelist);
764 	ceph_pagelist_init(pagelist);
765 
766 	op->cls.class_name = class;
767 	size = strlen(class);
768 	BUG_ON(size > (size_t) U8_MAX);
769 	op->cls.class_len = size;
770 	ceph_pagelist_append(pagelist, class, size);
771 	payload_len += size;
772 
773 	op->cls.method_name = method;
774 	size = strlen(method);
775 	BUG_ON(size > (size_t) U8_MAX);
776 	op->cls.method_len = size;
777 	ceph_pagelist_append(pagelist, method, size);
778 	payload_len += size;
779 
780 	osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
781 
782 	op->indata_len = payload_len;
783 }
784 EXPORT_SYMBOL(osd_req_op_cls_init);
785 
786 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
787 			  u16 opcode, const char *name, const void *value,
788 			  size_t size, u8 cmp_op, u8 cmp_mode)
789 {
790 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
791 						      opcode, 0);
792 	struct ceph_pagelist *pagelist;
793 	size_t payload_len;
794 
795 	BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
796 
797 	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
798 	if (!pagelist)
799 		return -ENOMEM;
800 
801 	ceph_pagelist_init(pagelist);
802 
803 	payload_len = strlen(name);
804 	op->xattr.name_len = payload_len;
805 	ceph_pagelist_append(pagelist, name, payload_len);
806 
807 	op->xattr.value_len = size;
808 	ceph_pagelist_append(pagelist, value, size);
809 	payload_len += size;
810 
811 	op->xattr.cmp_op = cmp_op;
812 	op->xattr.cmp_mode = cmp_mode;
813 
814 	ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
815 	op->indata_len = payload_len;
816 	return 0;
817 }
818 EXPORT_SYMBOL(osd_req_op_xattr_init);
819 
820 /*
821  * @watch_opcode: CEPH_OSD_WATCH_OP_*
822  */
823 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
824 				  u64 cookie, u8 watch_opcode)
825 {
826 	struct ceph_osd_req_op *op;
827 
828 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
829 	op->watch.cookie = cookie;
830 	op->watch.op = watch_opcode;
831 	op->watch.gen = 0;
832 }
833 
834 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
835 				unsigned int which,
836 				u64 expected_object_size,
837 				u64 expected_write_size)
838 {
839 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
840 						      CEPH_OSD_OP_SETALLOCHINT,
841 						      0);
842 
843 	op->alloc_hint.expected_object_size = expected_object_size;
844 	op->alloc_hint.expected_write_size = expected_write_size;
845 
846 	/*
847 	 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
848 	 * not worth a feature bit.  Set FAILOK per-op flag to make
849 	 * sure older osds don't trip over an unsupported opcode.
850 	 */
851 	op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
852 }
853 EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
854 
855 static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
856 				struct ceph_osd_data *osd_data)
857 {
858 	u64 length = ceph_osd_data_length(osd_data);
859 
860 	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
861 		BUG_ON(length > (u64) SIZE_MAX);
862 		if (length)
863 			ceph_msg_data_add_pages(msg, osd_data->pages,
864 					length, osd_data->alignment);
865 	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
866 		BUG_ON(!length);
867 		ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
868 #ifdef CONFIG_BLOCK
869 	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
870 		ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
871 #endif
872 	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
873 		ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
874 	} else {
875 		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
876 	}
877 }
878 
879 static u32 osd_req_encode_op(struct ceph_osd_op *dst,
880 			     const struct ceph_osd_req_op *src)
881 {
882 	if (WARN_ON(!osd_req_opcode_valid(src->op))) {
883 		pr_err("unrecognized osd opcode %d\n", src->op);
884 
885 		return 0;
886 	}
887 
888 	switch (src->op) {
889 	case CEPH_OSD_OP_STAT:
890 		break;
891 	case CEPH_OSD_OP_READ:
892 	case CEPH_OSD_OP_WRITE:
893 	case CEPH_OSD_OP_WRITEFULL:
894 	case CEPH_OSD_OP_ZERO:
895 	case CEPH_OSD_OP_TRUNCATE:
896 		dst->extent.offset = cpu_to_le64(src->extent.offset);
897 		dst->extent.length = cpu_to_le64(src->extent.length);
898 		dst->extent.truncate_size =
899 			cpu_to_le64(src->extent.truncate_size);
900 		dst->extent.truncate_seq =
901 			cpu_to_le32(src->extent.truncate_seq);
902 		break;
903 	case CEPH_OSD_OP_CALL:
904 		dst->cls.class_len = src->cls.class_len;
905 		dst->cls.method_len = src->cls.method_len;
906 		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
907 		break;
908 	case CEPH_OSD_OP_WATCH:
909 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
910 		dst->watch.ver = cpu_to_le64(0);
911 		dst->watch.op = src->watch.op;
912 		dst->watch.gen = cpu_to_le32(src->watch.gen);
913 		break;
914 	case CEPH_OSD_OP_NOTIFY_ACK:
915 		break;
916 	case CEPH_OSD_OP_NOTIFY:
917 		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
918 		break;
919 	case CEPH_OSD_OP_LIST_WATCHERS:
920 		break;
921 	case CEPH_OSD_OP_SETALLOCHINT:
922 		dst->alloc_hint.expected_object_size =
923 		    cpu_to_le64(src->alloc_hint.expected_object_size);
924 		dst->alloc_hint.expected_write_size =
925 		    cpu_to_le64(src->alloc_hint.expected_write_size);
926 		break;
927 	case CEPH_OSD_OP_SETXATTR:
928 	case CEPH_OSD_OP_CMPXATTR:
929 		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
930 		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
931 		dst->xattr.cmp_op = src->xattr.cmp_op;
932 		dst->xattr.cmp_mode = src->xattr.cmp_mode;
933 		break;
934 	case CEPH_OSD_OP_CREATE:
935 	case CEPH_OSD_OP_DELETE:
936 		break;
937 	default:
938 		pr_err("unsupported osd opcode %s\n",
939 			ceph_osd_op_name(src->op));
940 		WARN_ON(1);
941 
942 		return 0;
943 	}
944 
945 	dst->op = cpu_to_le16(src->op);
946 	dst->flags = cpu_to_le32(src->flags);
947 	dst->payload_len = cpu_to_le32(src->indata_len);
948 
949 	return src->indata_len;
950 }
951 
952 /*
953  * build new request AND message, calculate layout, and adjust file
954  * extent as needed.
955  *
956  * if the file was recently truncated, we include information about its
957  * old and new size so that the object can be updated appropriately.  (we
958  * avoid synchronously deleting truncated objects because it's slow.)
959  */
960 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
961 					       struct ceph_file_layout *layout,
962 					       struct ceph_vino vino,
963 					       u64 off, u64 *plen,
964 					       unsigned int which, int num_ops,
965 					       int opcode, int flags,
966 					       struct ceph_snap_context *snapc,
967 					       u32 truncate_seq,
968 					       u64 truncate_size,
969 					       bool use_mempool)
970 {
971 	struct ceph_osd_request *req;
972 	u64 objnum = 0;
973 	u64 objoff = 0;
974 	u64 objlen = 0;
975 	int r;
976 
977 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
978 	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
979 	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
980 
981 	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
982 					GFP_NOFS);
983 	if (!req) {
984 		r = -ENOMEM;
985 		goto fail;
986 	}
987 
988 	/* calculate max write size */
989 	r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
990 	if (r)
991 		goto fail;
992 
993 	if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
994 		osd_req_op_init(req, which, opcode, 0);
995 	} else {
996 		u32 object_size = layout->object_size;
997 		u32 object_base = off - objoff;
998 		if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
999 			if (truncate_size <= object_base) {
1000 				truncate_size = 0;
1001 			} else {
1002 				truncate_size -= object_base;
1003 				if (truncate_size > object_size)
1004 					truncate_size = object_size;
1005 			}
1006 		}
1007 		osd_req_op_extent_init(req, which, opcode, objoff, objlen,
1008 				       truncate_size, truncate_seq);
1009 	}
1010 
1011 	req->r_abort_on_full = true;
1012 	req->r_flags = flags;
1013 	req->r_base_oloc.pool = layout->pool_id;
1014 	req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
1015 	ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
1016 
1017 	req->r_snapid = vino.snap;
1018 	if (flags & CEPH_OSD_FLAG_WRITE)
1019 		req->r_data_offset = off;
1020 
1021 	r = ceph_osdc_alloc_messages(req, GFP_NOFS);
1022 	if (r)
1023 		goto fail;
1024 
1025 	return req;
1026 
1027 fail:
1028 	ceph_osdc_put_request(req);
1029 	return ERR_PTR(r);
1030 }
1031 EXPORT_SYMBOL(ceph_osdc_new_request);
1032 
1033 /*
1034  * We keep osd requests in an rbtree, sorted by ->r_tid.
1035  */
1036 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
1037 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
1038 
1039 static bool osd_homeless(struct ceph_osd *osd)
1040 {
1041 	return osd->o_osd == CEPH_HOMELESS_OSD;
1042 }
1043 
1044 static bool osd_registered(struct ceph_osd *osd)
1045 {
1046 	verify_osdc_locked(osd->o_osdc);
1047 
1048 	return !RB_EMPTY_NODE(&osd->o_node);
1049 }
1050 
1051 /*
1052  * Assumes @osd is zero-initialized.
1053  */
1054 static void osd_init(struct ceph_osd *osd)
1055 {
1056 	refcount_set(&osd->o_ref, 1);
1057 	RB_CLEAR_NODE(&osd->o_node);
1058 	osd->o_requests = RB_ROOT;
1059 	osd->o_linger_requests = RB_ROOT;
1060 	osd->o_backoff_mappings = RB_ROOT;
1061 	osd->o_backoffs_by_id = RB_ROOT;
1062 	INIT_LIST_HEAD(&osd->o_osd_lru);
1063 	INIT_LIST_HEAD(&osd->o_keepalive_item);
1064 	osd->o_incarnation = 1;
1065 	mutex_init(&osd->lock);
1066 }
1067 
1068 static void osd_cleanup(struct ceph_osd *osd)
1069 {
1070 	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1071 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1072 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1073 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
1074 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
1075 	WARN_ON(!list_empty(&osd->o_osd_lru));
1076 	WARN_ON(!list_empty(&osd->o_keepalive_item));
1077 
1078 	if (osd->o_auth.authorizer) {
1079 		WARN_ON(osd_homeless(osd));
1080 		ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1081 	}
1082 }
1083 
1084 /*
1085  * Track open sessions with osds.
1086  */
1087 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1088 {
1089 	struct ceph_osd *osd;
1090 
1091 	WARN_ON(onum == CEPH_HOMELESS_OSD);
1092 
1093 	osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
1094 	osd_init(osd);
1095 	osd->o_osdc = osdc;
1096 	osd->o_osd = onum;
1097 
1098 	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1099 
1100 	return osd;
1101 }
1102 
1103 static struct ceph_osd *get_osd(struct ceph_osd *osd)
1104 {
1105 	if (refcount_inc_not_zero(&osd->o_ref)) {
1106 		dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
1107 		     refcount_read(&osd->o_ref));
1108 		return osd;
1109 	} else {
1110 		dout("get_osd %p FAIL\n", osd);
1111 		return NULL;
1112 	}
1113 }
1114 
1115 static void put_osd(struct ceph_osd *osd)
1116 {
1117 	dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
1118 	     refcount_read(&osd->o_ref) - 1);
1119 	if (refcount_dec_and_test(&osd->o_ref)) {
1120 		osd_cleanup(osd);
1121 		kfree(osd);
1122 	}
1123 }
1124 
1125 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1126 
1127 static void __move_osd_to_lru(struct ceph_osd *osd)
1128 {
1129 	struct ceph_osd_client *osdc = osd->o_osdc;
1130 
1131 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1132 	BUG_ON(!list_empty(&osd->o_osd_lru));
1133 
1134 	spin_lock(&osdc->osd_lru_lock);
1135 	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1136 	spin_unlock(&osdc->osd_lru_lock);
1137 
1138 	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1139 }
1140 
1141 static void maybe_move_osd_to_lru(struct ceph_osd *osd)
1142 {
1143 	if (RB_EMPTY_ROOT(&osd->o_requests) &&
1144 	    RB_EMPTY_ROOT(&osd->o_linger_requests))
1145 		__move_osd_to_lru(osd);
1146 }
1147 
1148 static void __remove_osd_from_lru(struct ceph_osd *osd)
1149 {
1150 	struct ceph_osd_client *osdc = osd->o_osdc;
1151 
1152 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1153 
1154 	spin_lock(&osdc->osd_lru_lock);
1155 	if (!list_empty(&osd->o_osd_lru))
1156 		list_del_init(&osd->o_osd_lru);
1157 	spin_unlock(&osdc->osd_lru_lock);
1158 }
1159 
1160 /*
1161  * Close the connection and assign any leftover requests to the
1162  * homeless session.
1163  */
1164 static void close_osd(struct ceph_osd *osd)
1165 {
1166 	struct ceph_osd_client *osdc = osd->o_osdc;
1167 	struct rb_node *n;
1168 
1169 	verify_osdc_wrlocked(osdc);
1170 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1171 
1172 	ceph_con_close(&osd->o_con);
1173 
1174 	for (n = rb_first(&osd->o_requests); n; ) {
1175 		struct ceph_osd_request *req =
1176 		    rb_entry(n, struct ceph_osd_request, r_node);
1177 
1178 		n = rb_next(n); /* unlink_request() */
1179 
1180 		dout(" reassigning req %p tid %llu\n", req, req->r_tid);
1181 		unlink_request(osd, req);
1182 		link_request(&osdc->homeless_osd, req);
1183 	}
1184 	for (n = rb_first(&osd->o_linger_requests); n; ) {
1185 		struct ceph_osd_linger_request *lreq =
1186 		    rb_entry(n, struct ceph_osd_linger_request, node);
1187 
1188 		n = rb_next(n); /* unlink_linger() */
1189 
1190 		dout(" reassigning lreq %p linger_id %llu\n", lreq,
1191 		     lreq->linger_id);
1192 		unlink_linger(osd, lreq);
1193 		link_linger(&osdc->homeless_osd, lreq);
1194 	}
1195 	clear_backoffs(osd);
1196 
1197 	__remove_osd_from_lru(osd);
1198 	erase_osd(&osdc->osds, osd);
1199 	put_osd(osd);
1200 }
1201 
1202 /*
1203  * reset osd connect
1204  */
1205 static int reopen_osd(struct ceph_osd *osd)
1206 {
1207 	struct ceph_entity_addr *peer_addr;
1208 
1209 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1210 
1211 	if (RB_EMPTY_ROOT(&osd->o_requests) &&
1212 	    RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1213 		close_osd(osd);
1214 		return -ENODEV;
1215 	}
1216 
1217 	peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
1218 	if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1219 			!ceph_con_opened(&osd->o_con)) {
1220 		struct rb_node *n;
1221 
1222 		dout("osd addr hasn't changed and connection never opened, "
1223 		     "letting msgr retry\n");
1224 		/* touch each r_stamp for handle_timeout()'s benfit */
1225 		for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1226 			struct ceph_osd_request *req =
1227 			    rb_entry(n, struct ceph_osd_request, r_node);
1228 			req->r_stamp = jiffies;
1229 		}
1230 
1231 		return -EAGAIN;
1232 	}
1233 
1234 	ceph_con_close(&osd->o_con);
1235 	ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1236 	osd->o_incarnation++;
1237 
1238 	return 0;
1239 }
1240 
1241 static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
1242 					  bool wrlocked)
1243 {
1244 	struct ceph_osd *osd;
1245 
1246 	if (wrlocked)
1247 		verify_osdc_wrlocked(osdc);
1248 	else
1249 		verify_osdc_locked(osdc);
1250 
1251 	if (o != CEPH_HOMELESS_OSD)
1252 		osd = lookup_osd(&osdc->osds, o);
1253 	else
1254 		osd = &osdc->homeless_osd;
1255 	if (!osd) {
1256 		if (!wrlocked)
1257 			return ERR_PTR(-EAGAIN);
1258 
1259 		osd = create_osd(osdc, o);
1260 		insert_osd(&osdc->osds, osd);
1261 		ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1262 			      &osdc->osdmap->osd_addr[osd->o_osd]);
1263 	}
1264 
1265 	dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1266 	return osd;
1267 }
1268 
1269 /*
1270  * Create request <-> OSD session relation.
1271  *
1272  * @req has to be assigned a tid, @osd may be homeless.
1273  */
1274 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1275 {
1276 	verify_osd_locked(osd);
1277 	WARN_ON(!req->r_tid || req->r_osd);
1278 	dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1279 	     req, req->r_tid);
1280 
1281 	if (!osd_homeless(osd))
1282 		__remove_osd_from_lru(osd);
1283 	else
1284 		atomic_inc(&osd->o_osdc->num_homeless);
1285 
1286 	get_osd(osd);
1287 	insert_request(&osd->o_requests, req);
1288 	req->r_osd = osd;
1289 }
1290 
1291 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1292 {
1293 	verify_osd_locked(osd);
1294 	WARN_ON(req->r_osd != osd);
1295 	dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1296 	     req, req->r_tid);
1297 
1298 	req->r_osd = NULL;
1299 	erase_request(&osd->o_requests, req);
1300 	put_osd(osd);
1301 
1302 	if (!osd_homeless(osd))
1303 		maybe_move_osd_to_lru(osd);
1304 	else
1305 		atomic_dec(&osd->o_osdc->num_homeless);
1306 }
1307 
1308 static bool __pool_full(struct ceph_pg_pool_info *pi)
1309 {
1310 	return pi->flags & CEPH_POOL_FLAG_FULL;
1311 }
1312 
1313 static bool have_pool_full(struct ceph_osd_client *osdc)
1314 {
1315 	struct rb_node *n;
1316 
1317 	for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
1318 		struct ceph_pg_pool_info *pi =
1319 		    rb_entry(n, struct ceph_pg_pool_info, node);
1320 
1321 		if (__pool_full(pi))
1322 			return true;
1323 	}
1324 
1325 	return false;
1326 }
1327 
1328 static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
1329 {
1330 	struct ceph_pg_pool_info *pi;
1331 
1332 	pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
1333 	if (!pi)
1334 		return false;
1335 
1336 	return __pool_full(pi);
1337 }
1338 
1339 /*
1340  * Returns whether a request should be blocked from being sent
1341  * based on the current osdmap and osd_client settings.
1342  */
1343 static bool target_should_be_paused(struct ceph_osd_client *osdc,
1344 				    const struct ceph_osd_request_target *t,
1345 				    struct ceph_pg_pool_info *pi)
1346 {
1347 	bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
1348 	bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
1349 		       ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1350 		       __pool_full(pi);
1351 
1352 	WARN_ON(pi->id != t->target_oloc.pool);
1353 	return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
1354 	       ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
1355 	       (osdc->osdmap->epoch < osdc->epoch_barrier);
1356 }
1357 
1358 enum calc_target_result {
1359 	CALC_TARGET_NO_ACTION = 0,
1360 	CALC_TARGET_NEED_RESEND,
1361 	CALC_TARGET_POOL_DNE,
1362 };
1363 
1364 static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1365 					   struct ceph_osd_request_target *t,
1366 					   struct ceph_connection *con,
1367 					   bool any_change)
1368 {
1369 	struct ceph_pg_pool_info *pi;
1370 	struct ceph_pg pgid, last_pgid;
1371 	struct ceph_osds up, acting;
1372 	bool force_resend = false;
1373 	bool unpaused = false;
1374 	bool legacy_change;
1375 	bool split = false;
1376 	bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
1377 	bool recovery_deletes = ceph_osdmap_flag(osdc,
1378 						 CEPH_OSDMAP_RECOVERY_DELETES);
1379 	enum calc_target_result ct_res;
1380 	int ret;
1381 
1382 	t->epoch = osdc->osdmap->epoch;
1383 	pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1384 	if (!pi) {
1385 		t->osd = CEPH_HOMELESS_OSD;
1386 		ct_res = CALC_TARGET_POOL_DNE;
1387 		goto out;
1388 	}
1389 
1390 	if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1391 		if (t->last_force_resend < pi->last_force_request_resend) {
1392 			t->last_force_resend = pi->last_force_request_resend;
1393 			force_resend = true;
1394 		} else if (t->last_force_resend == 0) {
1395 			force_resend = true;
1396 		}
1397 	}
1398 
1399 	/* apply tiering */
1400 	ceph_oid_copy(&t->target_oid, &t->base_oid);
1401 	ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1402 	if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1403 		if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
1404 			t->target_oloc.pool = pi->read_tier;
1405 		if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
1406 			t->target_oloc.pool = pi->write_tier;
1407 
1408 		pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
1409 		if (!pi) {
1410 			t->osd = CEPH_HOMELESS_OSD;
1411 			ct_res = CALC_TARGET_POOL_DNE;
1412 			goto out;
1413 		}
1414 	}
1415 
1416 	ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc,
1417 					  &pgid);
1418 	if (ret) {
1419 		WARN_ON(ret != -ENOENT);
1420 		t->osd = CEPH_HOMELESS_OSD;
1421 		ct_res = CALC_TARGET_POOL_DNE;
1422 		goto out;
1423 	}
1424 	last_pgid.pool = pgid.pool;
1425 	last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1426 
1427 	ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
1428 	if (any_change &&
1429 	    ceph_is_new_interval(&t->acting,
1430 				 &acting,
1431 				 &t->up,
1432 				 &up,
1433 				 t->size,
1434 				 pi->size,
1435 				 t->min_size,
1436 				 pi->min_size,
1437 				 t->pg_num,
1438 				 pi->pg_num,
1439 				 t->sort_bitwise,
1440 				 sort_bitwise,
1441 				 t->recovery_deletes,
1442 				 recovery_deletes,
1443 				 &last_pgid))
1444 		force_resend = true;
1445 
1446 	if (t->paused && !target_should_be_paused(osdc, t, pi)) {
1447 		t->paused = false;
1448 		unpaused = true;
1449 	}
1450 	legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
1451 			ceph_osds_changed(&t->acting, &acting, any_change);
1452 	if (t->pg_num)
1453 		split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
1454 
1455 	if (legacy_change || force_resend || split) {
1456 		t->pgid = pgid; /* struct */
1457 		ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
1458 		ceph_osds_copy(&t->acting, &acting);
1459 		ceph_osds_copy(&t->up, &up);
1460 		t->size = pi->size;
1461 		t->min_size = pi->min_size;
1462 		t->pg_num = pi->pg_num;
1463 		t->pg_num_mask = pi->pg_num_mask;
1464 		t->sort_bitwise = sort_bitwise;
1465 		t->recovery_deletes = recovery_deletes;
1466 
1467 		t->osd = acting.primary;
1468 	}
1469 
1470 	if (unpaused || legacy_change || force_resend ||
1471 	    (split && con && CEPH_HAVE_FEATURE(con->peer_features,
1472 					       RESEND_ON_SPLIT)))
1473 		ct_res = CALC_TARGET_NEED_RESEND;
1474 	else
1475 		ct_res = CALC_TARGET_NO_ACTION;
1476 
1477 out:
1478 	dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd);
1479 	return ct_res;
1480 }
1481 
1482 static struct ceph_spg_mapping *alloc_spg_mapping(void)
1483 {
1484 	struct ceph_spg_mapping *spg;
1485 
1486 	spg = kmalloc(sizeof(*spg), GFP_NOIO);
1487 	if (!spg)
1488 		return NULL;
1489 
1490 	RB_CLEAR_NODE(&spg->node);
1491 	spg->backoffs = RB_ROOT;
1492 	return spg;
1493 }
1494 
1495 static void free_spg_mapping(struct ceph_spg_mapping *spg)
1496 {
1497 	WARN_ON(!RB_EMPTY_NODE(&spg->node));
1498 	WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
1499 
1500 	kfree(spg);
1501 }
1502 
1503 /*
1504  * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
1505  * ceph_pg_mapping.  Used to track OSD backoffs -- a backoff [range] is
1506  * defined only within a specific spgid; it does not pass anything to
1507  * children on split, or to another primary.
1508  */
1509 DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
1510 		 RB_BYPTR, const struct ceph_spg *, node)
1511 
1512 static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
1513 {
1514 	return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
1515 }
1516 
1517 static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
1518 				   void **pkey, size_t *pkey_len)
1519 {
1520 	if (hoid->key_len) {
1521 		*pkey = hoid->key;
1522 		*pkey_len = hoid->key_len;
1523 	} else {
1524 		*pkey = hoid->oid;
1525 		*pkey_len = hoid->oid_len;
1526 	}
1527 }
1528 
1529 static int compare_names(const void *name1, size_t name1_len,
1530 			 const void *name2, size_t name2_len)
1531 {
1532 	int ret;
1533 
1534 	ret = memcmp(name1, name2, min(name1_len, name2_len));
1535 	if (!ret) {
1536 		if (name1_len < name2_len)
1537 			ret = -1;
1538 		else if (name1_len > name2_len)
1539 			ret = 1;
1540 	}
1541 	return ret;
1542 }
1543 
1544 static int hoid_compare(const struct ceph_hobject_id *lhs,
1545 			const struct ceph_hobject_id *rhs)
1546 {
1547 	void *effective_key1, *effective_key2;
1548 	size_t effective_key1_len, effective_key2_len;
1549 	int ret;
1550 
1551 	if (lhs->is_max < rhs->is_max)
1552 		return -1;
1553 	if (lhs->is_max > rhs->is_max)
1554 		return 1;
1555 
1556 	if (lhs->pool < rhs->pool)
1557 		return -1;
1558 	if (lhs->pool > rhs->pool)
1559 		return 1;
1560 
1561 	if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
1562 		return -1;
1563 	if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
1564 		return 1;
1565 
1566 	ret = compare_names(lhs->nspace, lhs->nspace_len,
1567 			    rhs->nspace, rhs->nspace_len);
1568 	if (ret)
1569 		return ret;
1570 
1571 	hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
1572 	hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
1573 	ret = compare_names(effective_key1, effective_key1_len,
1574 			    effective_key2, effective_key2_len);
1575 	if (ret)
1576 		return ret;
1577 
1578 	ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
1579 	if (ret)
1580 		return ret;
1581 
1582 	if (lhs->snapid < rhs->snapid)
1583 		return -1;
1584 	if (lhs->snapid > rhs->snapid)
1585 		return 1;
1586 
1587 	return 0;
1588 }
1589 
1590 /*
1591  * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
1592  * compat stuff here.
1593  *
1594  * Assumes @hoid is zero-initialized.
1595  */
1596 static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
1597 {
1598 	u8 struct_v;
1599 	u32 struct_len;
1600 	int ret;
1601 
1602 	ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
1603 				  &struct_len);
1604 	if (ret)
1605 		return ret;
1606 
1607 	if (struct_v < 4) {
1608 		pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
1609 		goto e_inval;
1610 	}
1611 
1612 	hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
1613 						GFP_NOIO);
1614 	if (IS_ERR(hoid->key)) {
1615 		ret = PTR_ERR(hoid->key);
1616 		hoid->key = NULL;
1617 		return ret;
1618 	}
1619 
1620 	hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
1621 						GFP_NOIO);
1622 	if (IS_ERR(hoid->oid)) {
1623 		ret = PTR_ERR(hoid->oid);
1624 		hoid->oid = NULL;
1625 		return ret;
1626 	}
1627 
1628 	ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
1629 	ceph_decode_32_safe(p, end, hoid->hash, e_inval);
1630 	ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
1631 
1632 	hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
1633 						   GFP_NOIO);
1634 	if (IS_ERR(hoid->nspace)) {
1635 		ret = PTR_ERR(hoid->nspace);
1636 		hoid->nspace = NULL;
1637 		return ret;
1638 	}
1639 
1640 	ceph_decode_64_safe(p, end, hoid->pool, e_inval);
1641 
1642 	ceph_hoid_build_hash_cache(hoid);
1643 	return 0;
1644 
1645 e_inval:
1646 	return -EINVAL;
1647 }
1648 
1649 static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
1650 {
1651 	return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
1652 	       4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
1653 }
1654 
1655 static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
1656 {
1657 	ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
1658 	ceph_encode_string(p, end, hoid->key, hoid->key_len);
1659 	ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
1660 	ceph_encode_64(p, hoid->snapid);
1661 	ceph_encode_32(p, hoid->hash);
1662 	ceph_encode_8(p, hoid->is_max);
1663 	ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
1664 	ceph_encode_64(p, hoid->pool);
1665 }
1666 
1667 static void free_hoid(struct ceph_hobject_id *hoid)
1668 {
1669 	if (hoid) {
1670 		kfree(hoid->key);
1671 		kfree(hoid->oid);
1672 		kfree(hoid->nspace);
1673 		kfree(hoid);
1674 	}
1675 }
1676 
1677 static struct ceph_osd_backoff *alloc_backoff(void)
1678 {
1679 	struct ceph_osd_backoff *backoff;
1680 
1681 	backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
1682 	if (!backoff)
1683 		return NULL;
1684 
1685 	RB_CLEAR_NODE(&backoff->spg_node);
1686 	RB_CLEAR_NODE(&backoff->id_node);
1687 	return backoff;
1688 }
1689 
1690 static void free_backoff(struct ceph_osd_backoff *backoff)
1691 {
1692 	WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
1693 	WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
1694 
1695 	free_hoid(backoff->begin);
1696 	free_hoid(backoff->end);
1697 	kfree(backoff);
1698 }
1699 
1700 /*
1701  * Within a specific spgid, backoffs are managed by ->begin hoid.
1702  */
1703 DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
1704 			RB_BYVAL, spg_node);
1705 
1706 static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
1707 					    const struct ceph_hobject_id *hoid)
1708 {
1709 	struct rb_node *n = root->rb_node;
1710 
1711 	while (n) {
1712 		struct ceph_osd_backoff *cur =
1713 		    rb_entry(n, struct ceph_osd_backoff, spg_node);
1714 		int cmp;
1715 
1716 		cmp = hoid_compare(hoid, cur->begin);
1717 		if (cmp < 0) {
1718 			n = n->rb_left;
1719 		} else if (cmp > 0) {
1720 			if (hoid_compare(hoid, cur->end) < 0)
1721 				return cur;
1722 
1723 			n = n->rb_right;
1724 		} else {
1725 			return cur;
1726 		}
1727 	}
1728 
1729 	return NULL;
1730 }
1731 
1732 /*
1733  * Each backoff has a unique id within its OSD session.
1734  */
1735 DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
1736 
1737 static void clear_backoffs(struct ceph_osd *osd)
1738 {
1739 	while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
1740 		struct ceph_spg_mapping *spg =
1741 		    rb_entry(rb_first(&osd->o_backoff_mappings),
1742 			     struct ceph_spg_mapping, node);
1743 
1744 		while (!RB_EMPTY_ROOT(&spg->backoffs)) {
1745 			struct ceph_osd_backoff *backoff =
1746 			    rb_entry(rb_first(&spg->backoffs),
1747 				     struct ceph_osd_backoff, spg_node);
1748 
1749 			erase_backoff(&spg->backoffs, backoff);
1750 			erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
1751 			free_backoff(backoff);
1752 		}
1753 		erase_spg_mapping(&osd->o_backoff_mappings, spg);
1754 		free_spg_mapping(spg);
1755 	}
1756 }
1757 
1758 /*
1759  * Set up a temporary, non-owning view into @t.
1760  */
1761 static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
1762 				  const struct ceph_osd_request_target *t)
1763 {
1764 	hoid->key = NULL;
1765 	hoid->key_len = 0;
1766 	hoid->oid = t->target_oid.name;
1767 	hoid->oid_len = t->target_oid.name_len;
1768 	hoid->snapid = CEPH_NOSNAP;
1769 	hoid->hash = t->pgid.seed;
1770 	hoid->is_max = false;
1771 	if (t->target_oloc.pool_ns) {
1772 		hoid->nspace = t->target_oloc.pool_ns->str;
1773 		hoid->nspace_len = t->target_oloc.pool_ns->len;
1774 	} else {
1775 		hoid->nspace = NULL;
1776 		hoid->nspace_len = 0;
1777 	}
1778 	hoid->pool = t->target_oloc.pool;
1779 	ceph_hoid_build_hash_cache(hoid);
1780 }
1781 
1782 static bool should_plug_request(struct ceph_osd_request *req)
1783 {
1784 	struct ceph_osd *osd = req->r_osd;
1785 	struct ceph_spg_mapping *spg;
1786 	struct ceph_osd_backoff *backoff;
1787 	struct ceph_hobject_id hoid;
1788 
1789 	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
1790 	if (!spg)
1791 		return false;
1792 
1793 	hoid_fill_from_target(&hoid, &req->r_t);
1794 	backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
1795 	if (!backoff)
1796 		return false;
1797 
1798 	dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
1799 	     __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
1800 	     backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
1801 	return true;
1802 }
1803 
1804 static void setup_request_data(struct ceph_osd_request *req,
1805 			       struct ceph_msg *msg)
1806 {
1807 	u32 data_len = 0;
1808 	int i;
1809 
1810 	if (!list_empty(&msg->data))
1811 		return;
1812 
1813 	WARN_ON(msg->data_length);
1814 	for (i = 0; i < req->r_num_ops; i++) {
1815 		struct ceph_osd_req_op *op = &req->r_ops[i];
1816 
1817 		switch (op->op) {
1818 		/* request */
1819 		case CEPH_OSD_OP_WRITE:
1820 		case CEPH_OSD_OP_WRITEFULL:
1821 			WARN_ON(op->indata_len != op->extent.length);
1822 			ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
1823 			break;
1824 		case CEPH_OSD_OP_SETXATTR:
1825 		case CEPH_OSD_OP_CMPXATTR:
1826 			WARN_ON(op->indata_len != op->xattr.name_len +
1827 						  op->xattr.value_len);
1828 			ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
1829 			break;
1830 		case CEPH_OSD_OP_NOTIFY_ACK:
1831 			ceph_osdc_msg_data_add(msg,
1832 					       &op->notify_ack.request_data);
1833 			break;
1834 
1835 		/* reply */
1836 		case CEPH_OSD_OP_STAT:
1837 			ceph_osdc_msg_data_add(req->r_reply,
1838 					       &op->raw_data_in);
1839 			break;
1840 		case CEPH_OSD_OP_READ:
1841 			ceph_osdc_msg_data_add(req->r_reply,
1842 					       &op->extent.osd_data);
1843 			break;
1844 		case CEPH_OSD_OP_LIST_WATCHERS:
1845 			ceph_osdc_msg_data_add(req->r_reply,
1846 					       &op->list_watchers.response_data);
1847 			break;
1848 
1849 		/* both */
1850 		case CEPH_OSD_OP_CALL:
1851 			WARN_ON(op->indata_len != op->cls.class_len +
1852 						  op->cls.method_len +
1853 						  op->cls.indata_len);
1854 			ceph_osdc_msg_data_add(msg, &op->cls.request_info);
1855 			/* optional, can be NONE */
1856 			ceph_osdc_msg_data_add(msg, &op->cls.request_data);
1857 			/* optional, can be NONE */
1858 			ceph_osdc_msg_data_add(req->r_reply,
1859 					       &op->cls.response_data);
1860 			break;
1861 		case CEPH_OSD_OP_NOTIFY:
1862 			ceph_osdc_msg_data_add(msg,
1863 					       &op->notify.request_data);
1864 			ceph_osdc_msg_data_add(req->r_reply,
1865 					       &op->notify.response_data);
1866 			break;
1867 		}
1868 
1869 		data_len += op->indata_len;
1870 	}
1871 
1872 	WARN_ON(data_len != msg->data_length);
1873 }
1874 
1875 static void encode_pgid(void **p, const struct ceph_pg *pgid)
1876 {
1877 	ceph_encode_8(p, 1);
1878 	ceph_encode_64(p, pgid->pool);
1879 	ceph_encode_32(p, pgid->seed);
1880 	ceph_encode_32(p, -1); /* preferred */
1881 }
1882 
1883 static void encode_spgid(void **p, const struct ceph_spg *spgid)
1884 {
1885 	ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
1886 	encode_pgid(p, &spgid->pgid);
1887 	ceph_encode_8(p, spgid->shard);
1888 }
1889 
1890 static void encode_oloc(void **p, void *end,
1891 			const struct ceph_object_locator *oloc)
1892 {
1893 	ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
1894 	ceph_encode_64(p, oloc->pool);
1895 	ceph_encode_32(p, -1); /* preferred */
1896 	ceph_encode_32(p, 0);  /* key len */
1897 	if (oloc->pool_ns)
1898 		ceph_encode_string(p, end, oloc->pool_ns->str,
1899 				   oloc->pool_ns->len);
1900 	else
1901 		ceph_encode_32(p, 0);
1902 }
1903 
1904 static void encode_request_partial(struct ceph_osd_request *req,
1905 				   struct ceph_msg *msg)
1906 {
1907 	void *p = msg->front.iov_base;
1908 	void *const end = p + msg->front_alloc_len;
1909 	u32 data_len = 0;
1910 	int i;
1911 
1912 	if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
1913 		/* snapshots aren't writeable */
1914 		WARN_ON(req->r_snapid != CEPH_NOSNAP);
1915 	} else {
1916 		WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
1917 			req->r_data_offset || req->r_snapc);
1918 	}
1919 
1920 	setup_request_data(req, msg);
1921 
1922 	encode_spgid(&p, &req->r_t.spgid); /* actual spg */
1923 	ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
1924 	ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
1925 	ceph_encode_32(&p, req->r_flags);
1926 
1927 	/* reqid */
1928 	ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
1929 	memset(p, 0, sizeof(struct ceph_osd_reqid));
1930 	p += sizeof(struct ceph_osd_reqid);
1931 
1932 	/* trace */
1933 	memset(p, 0, sizeof(struct ceph_blkin_trace_info));
1934 	p += sizeof(struct ceph_blkin_trace_info);
1935 
1936 	ceph_encode_32(&p, 0); /* client_inc, always 0 */
1937 	ceph_encode_timespec(p, &req->r_mtime);
1938 	p += sizeof(struct ceph_timespec);
1939 
1940 	encode_oloc(&p, end, &req->r_t.target_oloc);
1941 	ceph_encode_string(&p, end, req->r_t.target_oid.name,
1942 			   req->r_t.target_oid.name_len);
1943 
1944 	/* ops, can imply data */
1945 	ceph_encode_16(&p, req->r_num_ops);
1946 	for (i = 0; i < req->r_num_ops; i++) {
1947 		data_len += osd_req_encode_op(p, &req->r_ops[i]);
1948 		p += sizeof(struct ceph_osd_op);
1949 	}
1950 
1951 	ceph_encode_64(&p, req->r_snapid); /* snapid */
1952 	if (req->r_snapc) {
1953 		ceph_encode_64(&p, req->r_snapc->seq);
1954 		ceph_encode_32(&p, req->r_snapc->num_snaps);
1955 		for (i = 0; i < req->r_snapc->num_snaps; i++)
1956 			ceph_encode_64(&p, req->r_snapc->snaps[i]);
1957 	} else {
1958 		ceph_encode_64(&p, 0); /* snap_seq */
1959 		ceph_encode_32(&p, 0); /* snaps len */
1960 	}
1961 
1962 	ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
1963 	BUG_ON(p > end - 8); /* space for features */
1964 
1965 	msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
1966 	/* front_len is finalized in encode_request_finish() */
1967 	msg->front.iov_len = p - msg->front.iov_base;
1968 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1969 	msg->hdr.data_len = cpu_to_le32(data_len);
1970 	/*
1971 	 * The header "data_off" is a hint to the receiver allowing it
1972 	 * to align received data into its buffers such that there's no
1973 	 * need to re-copy it before writing it to disk (direct I/O).
1974 	 */
1975 	msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
1976 
1977 	dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
1978 	     req->r_t.target_oid.name, req->r_t.target_oid.name_len);
1979 }
1980 
1981 static void encode_request_finish(struct ceph_msg *msg)
1982 {
1983 	void *p = msg->front.iov_base;
1984 	void *const partial_end = p + msg->front.iov_len;
1985 	void *const end = p + msg->front_alloc_len;
1986 
1987 	if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
1988 		/* luminous OSD -- encode features and be done */
1989 		p = partial_end;
1990 		ceph_encode_64(&p, msg->con->peer_features);
1991 	} else {
1992 		struct {
1993 			char spgid[CEPH_ENCODING_START_BLK_LEN +
1994 				   CEPH_PGID_ENCODING_LEN + 1];
1995 			__le32 hash;
1996 			__le32 epoch;
1997 			__le32 flags;
1998 			char reqid[CEPH_ENCODING_START_BLK_LEN +
1999 				   sizeof(struct ceph_osd_reqid)];
2000 			char trace[sizeof(struct ceph_blkin_trace_info)];
2001 			__le32 client_inc;
2002 			struct ceph_timespec mtime;
2003 		} __packed head;
2004 		struct ceph_pg pgid;
2005 		void *oloc, *oid, *tail;
2006 		int oloc_len, oid_len, tail_len;
2007 		int len;
2008 
2009 		/*
2010 		 * Pre-luminous OSD -- reencode v8 into v4 using @head
2011 		 * as a temporary buffer.  Encode the raw PG; the rest
2012 		 * is just a matter of moving oloc, oid and tail blobs
2013 		 * around.
2014 		 */
2015 		memcpy(&head, p, sizeof(head));
2016 		p += sizeof(head);
2017 
2018 		oloc = p;
2019 		p += CEPH_ENCODING_START_BLK_LEN;
2020 		pgid.pool = ceph_decode_64(&p);
2021 		p += 4 + 4; /* preferred, key len */
2022 		len = ceph_decode_32(&p);
2023 		p += len;   /* nspace */
2024 		oloc_len = p - oloc;
2025 
2026 		oid = p;
2027 		len = ceph_decode_32(&p);
2028 		p += len;
2029 		oid_len = p - oid;
2030 
2031 		tail = p;
2032 		tail_len = partial_end - p;
2033 
2034 		p = msg->front.iov_base;
2035 		ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
2036 		ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
2037 		ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
2038 		ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
2039 
2040 		/* reassert_version */
2041 		memset(p, 0, sizeof(struct ceph_eversion));
2042 		p += sizeof(struct ceph_eversion);
2043 
2044 		BUG_ON(p >= oloc);
2045 		memmove(p, oloc, oloc_len);
2046 		p += oloc_len;
2047 
2048 		pgid.seed = le32_to_cpu(head.hash);
2049 		encode_pgid(&p, &pgid); /* raw pg */
2050 
2051 		BUG_ON(p >= oid);
2052 		memmove(p, oid, oid_len);
2053 		p += oid_len;
2054 
2055 		/* tail -- ops, snapid, snapc, retry_attempt */
2056 		BUG_ON(p >= tail);
2057 		memmove(p, tail, tail_len);
2058 		p += tail_len;
2059 
2060 		msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
2061 	}
2062 
2063 	BUG_ON(p > end);
2064 	msg->front.iov_len = p - msg->front.iov_base;
2065 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2066 
2067 	dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
2068 	     le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
2069 	     le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
2070 	     le16_to_cpu(msg->hdr.version));
2071 }
2072 
2073 /*
2074  * @req has to be assigned a tid and registered.
2075  */
2076 static void send_request(struct ceph_osd_request *req)
2077 {
2078 	struct ceph_osd *osd = req->r_osd;
2079 
2080 	verify_osd_locked(osd);
2081 	WARN_ON(osd->o_osd != req->r_t.osd);
2082 
2083 	/* backoff? */
2084 	if (should_plug_request(req))
2085 		return;
2086 
2087 	/*
2088 	 * We may have a previously queued request message hanging
2089 	 * around.  Cancel it to avoid corrupting the msgr.
2090 	 */
2091 	if (req->r_sent)
2092 		ceph_msg_revoke(req->r_request);
2093 
2094 	req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
2095 	if (req->r_attempts)
2096 		req->r_flags |= CEPH_OSD_FLAG_RETRY;
2097 	else
2098 		WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
2099 
2100 	encode_request_partial(req, req->r_request);
2101 
2102 	dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
2103 	     __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
2104 	     req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
2105 	     req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
2106 	     req->r_attempts);
2107 
2108 	req->r_t.paused = false;
2109 	req->r_stamp = jiffies;
2110 	req->r_attempts++;
2111 
2112 	req->r_sent = osd->o_incarnation;
2113 	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
2114 	ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
2115 }
2116 
2117 static void maybe_request_map(struct ceph_osd_client *osdc)
2118 {
2119 	bool continuous = false;
2120 
2121 	verify_osdc_locked(osdc);
2122 	WARN_ON(!osdc->osdmap->epoch);
2123 
2124 	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2125 	    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
2126 	    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2127 		dout("%s osdc %p continuous\n", __func__, osdc);
2128 		continuous = true;
2129 	} else {
2130 		dout("%s osdc %p onetime\n", __func__, osdc);
2131 	}
2132 
2133 	if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2134 			       osdc->osdmap->epoch + 1, continuous))
2135 		ceph_monc_renew_subs(&osdc->client->monc);
2136 }
2137 
2138 static void complete_request(struct ceph_osd_request *req, int err);
2139 static void send_map_check(struct ceph_osd_request *req);
2140 
2141 static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
2142 {
2143 	struct ceph_osd_client *osdc = req->r_osdc;
2144 	struct ceph_osd *osd;
2145 	enum calc_target_result ct_res;
2146 	bool need_send = false;
2147 	bool promoted = false;
2148 	bool need_abort = false;
2149 
2150 	WARN_ON(req->r_tid);
2151 	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
2152 
2153 again:
2154 	ct_res = calc_target(osdc, &req->r_t, NULL, false);
2155 	if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
2156 		goto promote;
2157 
2158 	osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
2159 	if (IS_ERR(osd)) {
2160 		WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
2161 		goto promote;
2162 	}
2163 
2164 	if (osdc->osdmap->epoch < osdc->epoch_barrier) {
2165 		dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
2166 		     osdc->epoch_barrier);
2167 		req->r_t.paused = true;
2168 		maybe_request_map(osdc);
2169 	} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2170 		   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2171 		dout("req %p pausewr\n", req);
2172 		req->r_t.paused = true;
2173 		maybe_request_map(osdc);
2174 	} else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
2175 		   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2176 		dout("req %p pauserd\n", req);
2177 		req->r_t.paused = true;
2178 		maybe_request_map(osdc);
2179 	} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2180 		   !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
2181 				     CEPH_OSD_FLAG_FULL_FORCE)) &&
2182 		   (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2183 		    pool_full(osdc, req->r_t.base_oloc.pool))) {
2184 		dout("req %p full/pool_full\n", req);
2185 		pr_warn_ratelimited("FULL or reached pool quota\n");
2186 		req->r_t.paused = true;
2187 		maybe_request_map(osdc);
2188 		if (req->r_abort_on_full)
2189 			need_abort = true;
2190 	} else if (!osd_homeless(osd)) {
2191 		need_send = true;
2192 	} else {
2193 		maybe_request_map(osdc);
2194 	}
2195 
2196 	mutex_lock(&osd->lock);
2197 	/*
2198 	 * Assign the tid atomically with send_request() to protect
2199 	 * multiple writes to the same object from racing with each
2200 	 * other, resulting in out of order ops on the OSDs.
2201 	 */
2202 	req->r_tid = atomic64_inc_return(&osdc->last_tid);
2203 	link_request(osd, req);
2204 	if (need_send)
2205 		send_request(req);
2206 	else if (need_abort)
2207 		complete_request(req, -ENOSPC);
2208 	mutex_unlock(&osd->lock);
2209 
2210 	if (ct_res == CALC_TARGET_POOL_DNE)
2211 		send_map_check(req);
2212 
2213 	if (promoted)
2214 		downgrade_write(&osdc->lock);
2215 	return;
2216 
2217 promote:
2218 	up_read(&osdc->lock);
2219 	down_write(&osdc->lock);
2220 	wrlocked = true;
2221 	promoted = true;
2222 	goto again;
2223 }
2224 
2225 static void account_request(struct ceph_osd_request *req)
2226 {
2227 	WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
2228 	WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
2229 
2230 	req->r_flags |= CEPH_OSD_FLAG_ONDISK;
2231 	atomic_inc(&req->r_osdc->num_requests);
2232 
2233 	req->r_start_stamp = jiffies;
2234 }
2235 
2236 static void submit_request(struct ceph_osd_request *req, bool wrlocked)
2237 {
2238 	ceph_osdc_get_request(req);
2239 	account_request(req);
2240 	__submit_request(req, wrlocked);
2241 }
2242 
2243 static void finish_request(struct ceph_osd_request *req)
2244 {
2245 	struct ceph_osd_client *osdc = req->r_osdc;
2246 
2247 	WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
2248 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2249 
2250 	if (req->r_osd)
2251 		unlink_request(req->r_osd, req);
2252 	atomic_dec(&osdc->num_requests);
2253 
2254 	/*
2255 	 * If an OSD has failed or returned and a request has been sent
2256 	 * twice, it's possible to get a reply and end up here while the
2257 	 * request message is queued for delivery.  We will ignore the
2258 	 * reply, so not a big deal, but better to try and catch it.
2259 	 */
2260 	ceph_msg_revoke(req->r_request);
2261 	ceph_msg_revoke_incoming(req->r_reply);
2262 }
2263 
2264 static void __complete_request(struct ceph_osd_request *req)
2265 {
2266 	if (req->r_callback) {
2267 		dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
2268 		     req->r_tid, req->r_callback, req->r_result);
2269 		req->r_callback(req);
2270 	}
2271 }
2272 
2273 /*
2274  * This is open-coded in handle_reply().
2275  */
2276 static void complete_request(struct ceph_osd_request *req, int err)
2277 {
2278 	dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2279 
2280 	req->r_result = err;
2281 	finish_request(req);
2282 	__complete_request(req);
2283 	complete_all(&req->r_completion);
2284 	ceph_osdc_put_request(req);
2285 }
2286 
2287 static void cancel_map_check(struct ceph_osd_request *req)
2288 {
2289 	struct ceph_osd_client *osdc = req->r_osdc;
2290 	struct ceph_osd_request *lookup_req;
2291 
2292 	verify_osdc_wrlocked(osdc);
2293 
2294 	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2295 	if (!lookup_req)
2296 		return;
2297 
2298 	WARN_ON(lookup_req != req);
2299 	erase_request_mc(&osdc->map_checks, req);
2300 	ceph_osdc_put_request(req);
2301 }
2302 
2303 static void cancel_request(struct ceph_osd_request *req)
2304 {
2305 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2306 
2307 	cancel_map_check(req);
2308 	finish_request(req);
2309 	complete_all(&req->r_completion);
2310 	ceph_osdc_put_request(req);
2311 }
2312 
2313 static void abort_request(struct ceph_osd_request *req, int err)
2314 {
2315 	dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2316 
2317 	cancel_map_check(req);
2318 	complete_request(req, err);
2319 }
2320 
2321 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2322 {
2323 	if (likely(eb > osdc->epoch_barrier)) {
2324 		dout("updating epoch_barrier from %u to %u\n",
2325 				osdc->epoch_barrier, eb);
2326 		osdc->epoch_barrier = eb;
2327 		/* Request map if we're not to the barrier yet */
2328 		if (eb > osdc->osdmap->epoch)
2329 			maybe_request_map(osdc);
2330 	}
2331 }
2332 
2333 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2334 {
2335 	down_read(&osdc->lock);
2336 	if (unlikely(eb > osdc->epoch_barrier)) {
2337 		up_read(&osdc->lock);
2338 		down_write(&osdc->lock);
2339 		update_epoch_barrier(osdc, eb);
2340 		up_write(&osdc->lock);
2341 	} else {
2342 		up_read(&osdc->lock);
2343 	}
2344 }
2345 EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2346 
2347 /*
2348  * Drop all pending requests that are stalled waiting on a full condition to
2349  * clear, and complete them with ENOSPC as the return code. Set the
2350  * osdc->epoch_barrier to the latest map epoch that we've seen if any were
2351  * cancelled.
2352  */
2353 static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
2354 {
2355 	struct rb_node *n;
2356 	bool victims = false;
2357 
2358 	dout("enter abort_on_full\n");
2359 
2360 	if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
2361 		goto out;
2362 
2363 	/* Scan list and see if there is anything to abort */
2364 	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2365 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2366 		struct rb_node *m;
2367 
2368 		m = rb_first(&osd->o_requests);
2369 		while (m) {
2370 			struct ceph_osd_request *req = rb_entry(m,
2371 					struct ceph_osd_request, r_node);
2372 			m = rb_next(m);
2373 
2374 			if (req->r_abort_on_full) {
2375 				victims = true;
2376 				break;
2377 			}
2378 		}
2379 		if (victims)
2380 			break;
2381 	}
2382 
2383 	if (!victims)
2384 		goto out;
2385 
2386 	/*
2387 	 * Update the barrier to current epoch if it's behind that point,
2388 	 * since we know we have some calls to be aborted in the tree.
2389 	 */
2390 	update_epoch_barrier(osdc, osdc->osdmap->epoch);
2391 
2392 	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2393 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2394 		struct rb_node *m;
2395 
2396 		m = rb_first(&osd->o_requests);
2397 		while (m) {
2398 			struct ceph_osd_request *req = rb_entry(m,
2399 					struct ceph_osd_request, r_node);
2400 			m = rb_next(m);
2401 
2402 			if (req->r_abort_on_full &&
2403 			    (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2404 			     pool_full(osdc, req->r_t.target_oloc.pool)))
2405 				abort_request(req, -ENOSPC);
2406 		}
2407 	}
2408 out:
2409 	dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
2410 }
2411 
2412 static void check_pool_dne(struct ceph_osd_request *req)
2413 {
2414 	struct ceph_osd_client *osdc = req->r_osdc;
2415 	struct ceph_osdmap *map = osdc->osdmap;
2416 
2417 	verify_osdc_wrlocked(osdc);
2418 	WARN_ON(!map->epoch);
2419 
2420 	if (req->r_attempts) {
2421 		/*
2422 		 * We sent a request earlier, which means that
2423 		 * previously the pool existed, and now it does not
2424 		 * (i.e., it was deleted).
2425 		 */
2426 		req->r_map_dne_bound = map->epoch;
2427 		dout("%s req %p tid %llu pool disappeared\n", __func__, req,
2428 		     req->r_tid);
2429 	} else {
2430 		dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
2431 		     req, req->r_tid, req->r_map_dne_bound, map->epoch);
2432 	}
2433 
2434 	if (req->r_map_dne_bound) {
2435 		if (map->epoch >= req->r_map_dne_bound) {
2436 			/* we had a new enough map */
2437 			pr_info_ratelimited("tid %llu pool does not exist\n",
2438 					    req->r_tid);
2439 			complete_request(req, -ENOENT);
2440 		}
2441 	} else {
2442 		send_map_check(req);
2443 	}
2444 }
2445 
2446 static void map_check_cb(struct ceph_mon_generic_request *greq)
2447 {
2448 	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
2449 	struct ceph_osd_request *req;
2450 	u64 tid = greq->private_data;
2451 
2452 	WARN_ON(greq->result || !greq->u.newest);
2453 
2454 	down_write(&osdc->lock);
2455 	req = lookup_request_mc(&osdc->map_checks, tid);
2456 	if (!req) {
2457 		dout("%s tid %llu dne\n", __func__, tid);
2458 		goto out_unlock;
2459 	}
2460 
2461 	dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
2462 	     req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
2463 	if (!req->r_map_dne_bound)
2464 		req->r_map_dne_bound = greq->u.newest;
2465 	erase_request_mc(&osdc->map_checks, req);
2466 	check_pool_dne(req);
2467 
2468 	ceph_osdc_put_request(req);
2469 out_unlock:
2470 	up_write(&osdc->lock);
2471 }
2472 
2473 static void send_map_check(struct ceph_osd_request *req)
2474 {
2475 	struct ceph_osd_client *osdc = req->r_osdc;
2476 	struct ceph_osd_request *lookup_req;
2477 	int ret;
2478 
2479 	verify_osdc_wrlocked(osdc);
2480 
2481 	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2482 	if (lookup_req) {
2483 		WARN_ON(lookup_req != req);
2484 		return;
2485 	}
2486 
2487 	ceph_osdc_get_request(req);
2488 	insert_request_mc(&osdc->map_checks, req);
2489 	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
2490 					  map_check_cb, req->r_tid);
2491 	WARN_ON(ret);
2492 }
2493 
2494 /*
2495  * lingering requests, watch/notify v2 infrastructure
2496  */
2497 static void linger_release(struct kref *kref)
2498 {
2499 	struct ceph_osd_linger_request *lreq =
2500 	    container_of(kref, struct ceph_osd_linger_request, kref);
2501 
2502 	dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
2503 	     lreq->reg_req, lreq->ping_req);
2504 	WARN_ON(!RB_EMPTY_NODE(&lreq->node));
2505 	WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
2506 	WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
2507 	WARN_ON(!list_empty(&lreq->scan_item));
2508 	WARN_ON(!list_empty(&lreq->pending_lworks));
2509 	WARN_ON(lreq->osd);
2510 
2511 	if (lreq->reg_req)
2512 		ceph_osdc_put_request(lreq->reg_req);
2513 	if (lreq->ping_req)
2514 		ceph_osdc_put_request(lreq->ping_req);
2515 	target_destroy(&lreq->t);
2516 	kfree(lreq);
2517 }
2518 
2519 static void linger_put(struct ceph_osd_linger_request *lreq)
2520 {
2521 	if (lreq)
2522 		kref_put(&lreq->kref, linger_release);
2523 }
2524 
2525 static struct ceph_osd_linger_request *
2526 linger_get(struct ceph_osd_linger_request *lreq)
2527 {
2528 	kref_get(&lreq->kref);
2529 	return lreq;
2530 }
2531 
2532 static struct ceph_osd_linger_request *
2533 linger_alloc(struct ceph_osd_client *osdc)
2534 {
2535 	struct ceph_osd_linger_request *lreq;
2536 
2537 	lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
2538 	if (!lreq)
2539 		return NULL;
2540 
2541 	kref_init(&lreq->kref);
2542 	mutex_init(&lreq->lock);
2543 	RB_CLEAR_NODE(&lreq->node);
2544 	RB_CLEAR_NODE(&lreq->osdc_node);
2545 	RB_CLEAR_NODE(&lreq->mc_node);
2546 	INIT_LIST_HEAD(&lreq->scan_item);
2547 	INIT_LIST_HEAD(&lreq->pending_lworks);
2548 	init_completion(&lreq->reg_commit_wait);
2549 	init_completion(&lreq->notify_finish_wait);
2550 
2551 	lreq->osdc = osdc;
2552 	target_init(&lreq->t);
2553 
2554 	dout("%s lreq %p\n", __func__, lreq);
2555 	return lreq;
2556 }
2557 
2558 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
2559 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
2560 DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
2561 
2562 /*
2563  * Create linger request <-> OSD session relation.
2564  *
2565  * @lreq has to be registered, @osd may be homeless.
2566  */
2567 static void link_linger(struct ceph_osd *osd,
2568 			struct ceph_osd_linger_request *lreq)
2569 {
2570 	verify_osd_locked(osd);
2571 	WARN_ON(!lreq->linger_id || lreq->osd);
2572 	dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2573 	     osd->o_osd, lreq, lreq->linger_id);
2574 
2575 	if (!osd_homeless(osd))
2576 		__remove_osd_from_lru(osd);
2577 	else
2578 		atomic_inc(&osd->o_osdc->num_homeless);
2579 
2580 	get_osd(osd);
2581 	insert_linger(&osd->o_linger_requests, lreq);
2582 	lreq->osd = osd;
2583 }
2584 
2585 static void unlink_linger(struct ceph_osd *osd,
2586 			  struct ceph_osd_linger_request *lreq)
2587 {
2588 	verify_osd_locked(osd);
2589 	WARN_ON(lreq->osd != osd);
2590 	dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2591 	     osd->o_osd, lreq, lreq->linger_id);
2592 
2593 	lreq->osd = NULL;
2594 	erase_linger(&osd->o_linger_requests, lreq);
2595 	put_osd(osd);
2596 
2597 	if (!osd_homeless(osd))
2598 		maybe_move_osd_to_lru(osd);
2599 	else
2600 		atomic_dec(&osd->o_osdc->num_homeless);
2601 }
2602 
2603 static bool __linger_registered(struct ceph_osd_linger_request *lreq)
2604 {
2605 	verify_osdc_locked(lreq->osdc);
2606 
2607 	return !RB_EMPTY_NODE(&lreq->osdc_node);
2608 }
2609 
2610 static bool linger_registered(struct ceph_osd_linger_request *lreq)
2611 {
2612 	struct ceph_osd_client *osdc = lreq->osdc;
2613 	bool registered;
2614 
2615 	down_read(&osdc->lock);
2616 	registered = __linger_registered(lreq);
2617 	up_read(&osdc->lock);
2618 
2619 	return registered;
2620 }
2621 
2622 static void linger_register(struct ceph_osd_linger_request *lreq)
2623 {
2624 	struct ceph_osd_client *osdc = lreq->osdc;
2625 
2626 	verify_osdc_wrlocked(osdc);
2627 	WARN_ON(lreq->linger_id);
2628 
2629 	linger_get(lreq);
2630 	lreq->linger_id = ++osdc->last_linger_id;
2631 	insert_linger_osdc(&osdc->linger_requests, lreq);
2632 }
2633 
2634 static void linger_unregister(struct ceph_osd_linger_request *lreq)
2635 {
2636 	struct ceph_osd_client *osdc = lreq->osdc;
2637 
2638 	verify_osdc_wrlocked(osdc);
2639 
2640 	erase_linger_osdc(&osdc->linger_requests, lreq);
2641 	linger_put(lreq);
2642 }
2643 
2644 static void cancel_linger_request(struct ceph_osd_request *req)
2645 {
2646 	struct ceph_osd_linger_request *lreq = req->r_priv;
2647 
2648 	WARN_ON(!req->r_linger);
2649 	cancel_request(req);
2650 	linger_put(lreq);
2651 }
2652 
2653 struct linger_work {
2654 	struct work_struct work;
2655 	struct ceph_osd_linger_request *lreq;
2656 	struct list_head pending_item;
2657 	unsigned long queued_stamp;
2658 
2659 	union {
2660 		struct {
2661 			u64 notify_id;
2662 			u64 notifier_id;
2663 			void *payload; /* points into @msg front */
2664 			size_t payload_len;
2665 
2666 			struct ceph_msg *msg; /* for ceph_msg_put() */
2667 		} notify;
2668 		struct {
2669 			int err;
2670 		} error;
2671 	};
2672 };
2673 
2674 static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
2675 				       work_func_t workfn)
2676 {
2677 	struct linger_work *lwork;
2678 
2679 	lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
2680 	if (!lwork)
2681 		return NULL;
2682 
2683 	INIT_WORK(&lwork->work, workfn);
2684 	INIT_LIST_HEAD(&lwork->pending_item);
2685 	lwork->lreq = linger_get(lreq);
2686 
2687 	return lwork;
2688 }
2689 
2690 static void lwork_free(struct linger_work *lwork)
2691 {
2692 	struct ceph_osd_linger_request *lreq = lwork->lreq;
2693 
2694 	mutex_lock(&lreq->lock);
2695 	list_del(&lwork->pending_item);
2696 	mutex_unlock(&lreq->lock);
2697 
2698 	linger_put(lreq);
2699 	kfree(lwork);
2700 }
2701 
2702 static void lwork_queue(struct linger_work *lwork)
2703 {
2704 	struct ceph_osd_linger_request *lreq = lwork->lreq;
2705 	struct ceph_osd_client *osdc = lreq->osdc;
2706 
2707 	verify_lreq_locked(lreq);
2708 	WARN_ON(!list_empty(&lwork->pending_item));
2709 
2710 	lwork->queued_stamp = jiffies;
2711 	list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
2712 	queue_work(osdc->notify_wq, &lwork->work);
2713 }
2714 
2715 static void do_watch_notify(struct work_struct *w)
2716 {
2717 	struct linger_work *lwork = container_of(w, struct linger_work, work);
2718 	struct ceph_osd_linger_request *lreq = lwork->lreq;
2719 
2720 	if (!linger_registered(lreq)) {
2721 		dout("%s lreq %p not registered\n", __func__, lreq);
2722 		goto out;
2723 	}
2724 
2725 	WARN_ON(!lreq->is_watch);
2726 	dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
2727 	     __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
2728 	     lwork->notify.payload_len);
2729 	lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
2730 		  lwork->notify.notifier_id, lwork->notify.payload,
2731 		  lwork->notify.payload_len);
2732 
2733 out:
2734 	ceph_msg_put(lwork->notify.msg);
2735 	lwork_free(lwork);
2736 }
2737 
2738 static void do_watch_error(struct work_struct *w)
2739 {
2740 	struct linger_work *lwork = container_of(w, struct linger_work, work);
2741 	struct ceph_osd_linger_request *lreq = lwork->lreq;
2742 
2743 	if (!linger_registered(lreq)) {
2744 		dout("%s lreq %p not registered\n", __func__, lreq);
2745 		goto out;
2746 	}
2747 
2748 	dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
2749 	lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
2750 
2751 out:
2752 	lwork_free(lwork);
2753 }
2754 
2755 static void queue_watch_error(struct ceph_osd_linger_request *lreq)
2756 {
2757 	struct linger_work *lwork;
2758 
2759 	lwork = lwork_alloc(lreq, do_watch_error);
2760 	if (!lwork) {
2761 		pr_err("failed to allocate error-lwork\n");
2762 		return;
2763 	}
2764 
2765 	lwork->error.err = lreq->last_error;
2766 	lwork_queue(lwork);
2767 }
2768 
2769 static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
2770 				       int result)
2771 {
2772 	if (!completion_done(&lreq->reg_commit_wait)) {
2773 		lreq->reg_commit_error = (result <= 0 ? result : 0);
2774 		complete_all(&lreq->reg_commit_wait);
2775 	}
2776 }
2777 
2778 static void linger_commit_cb(struct ceph_osd_request *req)
2779 {
2780 	struct ceph_osd_linger_request *lreq = req->r_priv;
2781 
2782 	mutex_lock(&lreq->lock);
2783 	dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
2784 	     lreq->linger_id, req->r_result);
2785 	linger_reg_commit_complete(lreq, req->r_result);
2786 	lreq->committed = true;
2787 
2788 	if (!lreq->is_watch) {
2789 		struct ceph_osd_data *osd_data =
2790 		    osd_req_op_data(req, 0, notify, response_data);
2791 		void *p = page_address(osd_data->pages[0]);
2792 
2793 		WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
2794 			osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
2795 
2796 		/* make note of the notify_id */
2797 		if (req->r_ops[0].outdata_len >= sizeof(u64)) {
2798 			lreq->notify_id = ceph_decode_64(&p);
2799 			dout("lreq %p notify_id %llu\n", lreq,
2800 			     lreq->notify_id);
2801 		} else {
2802 			dout("lreq %p no notify_id\n", lreq);
2803 		}
2804 	}
2805 
2806 	mutex_unlock(&lreq->lock);
2807 	linger_put(lreq);
2808 }
2809 
2810 static int normalize_watch_error(int err)
2811 {
2812 	/*
2813 	 * Translate ENOENT -> ENOTCONN so that a delete->disconnection
2814 	 * notification and a failure to reconnect because we raced with
2815 	 * the delete appear the same to the user.
2816 	 */
2817 	if (err == -ENOENT)
2818 		err = -ENOTCONN;
2819 
2820 	return err;
2821 }
2822 
2823 static void linger_reconnect_cb(struct ceph_osd_request *req)
2824 {
2825 	struct ceph_osd_linger_request *lreq = req->r_priv;
2826 
2827 	mutex_lock(&lreq->lock);
2828 	dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
2829 	     lreq, lreq->linger_id, req->r_result, lreq->last_error);
2830 	if (req->r_result < 0) {
2831 		if (!lreq->last_error) {
2832 			lreq->last_error = normalize_watch_error(req->r_result);
2833 			queue_watch_error(lreq);
2834 		}
2835 	}
2836 
2837 	mutex_unlock(&lreq->lock);
2838 	linger_put(lreq);
2839 }
2840 
2841 static void send_linger(struct ceph_osd_linger_request *lreq)
2842 {
2843 	struct ceph_osd_request *req = lreq->reg_req;
2844 	struct ceph_osd_req_op *op = &req->r_ops[0];
2845 
2846 	verify_osdc_wrlocked(req->r_osdc);
2847 	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
2848 
2849 	if (req->r_osd)
2850 		cancel_linger_request(req);
2851 
2852 	request_reinit(req);
2853 	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
2854 	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
2855 	req->r_flags = lreq->t.flags;
2856 	req->r_mtime = lreq->mtime;
2857 
2858 	mutex_lock(&lreq->lock);
2859 	if (lreq->is_watch && lreq->committed) {
2860 		WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2861 			op->watch.cookie != lreq->linger_id);
2862 		op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
2863 		op->watch.gen = ++lreq->register_gen;
2864 		dout("lreq %p reconnect register_gen %u\n", lreq,
2865 		     op->watch.gen);
2866 		req->r_callback = linger_reconnect_cb;
2867 	} else {
2868 		if (!lreq->is_watch)
2869 			lreq->notify_id = 0;
2870 		else
2871 			WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
2872 		dout("lreq %p register\n", lreq);
2873 		req->r_callback = linger_commit_cb;
2874 	}
2875 	mutex_unlock(&lreq->lock);
2876 
2877 	req->r_priv = linger_get(lreq);
2878 	req->r_linger = true;
2879 
2880 	submit_request(req, true);
2881 }
2882 
2883 static void linger_ping_cb(struct ceph_osd_request *req)
2884 {
2885 	struct ceph_osd_linger_request *lreq = req->r_priv;
2886 
2887 	mutex_lock(&lreq->lock);
2888 	dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
2889 	     __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
2890 	     lreq->last_error);
2891 	if (lreq->register_gen == req->r_ops[0].watch.gen) {
2892 		if (!req->r_result) {
2893 			lreq->watch_valid_thru = lreq->ping_sent;
2894 		} else if (!lreq->last_error) {
2895 			lreq->last_error = normalize_watch_error(req->r_result);
2896 			queue_watch_error(lreq);
2897 		}
2898 	} else {
2899 		dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
2900 		     lreq->register_gen, req->r_ops[0].watch.gen);
2901 	}
2902 
2903 	mutex_unlock(&lreq->lock);
2904 	linger_put(lreq);
2905 }
2906 
2907 static void send_linger_ping(struct ceph_osd_linger_request *lreq)
2908 {
2909 	struct ceph_osd_client *osdc = lreq->osdc;
2910 	struct ceph_osd_request *req = lreq->ping_req;
2911 	struct ceph_osd_req_op *op = &req->r_ops[0];
2912 
2913 	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2914 		dout("%s PAUSERD\n", __func__);
2915 		return;
2916 	}
2917 
2918 	lreq->ping_sent = jiffies;
2919 	dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
2920 	     __func__, lreq, lreq->linger_id, lreq->ping_sent,
2921 	     lreq->register_gen);
2922 
2923 	if (req->r_osd)
2924 		cancel_linger_request(req);
2925 
2926 	request_reinit(req);
2927 	target_copy(&req->r_t, &lreq->t);
2928 
2929 	WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2930 		op->watch.cookie != lreq->linger_id ||
2931 		op->watch.op != CEPH_OSD_WATCH_OP_PING);
2932 	op->watch.gen = lreq->register_gen;
2933 	req->r_callback = linger_ping_cb;
2934 	req->r_priv = linger_get(lreq);
2935 	req->r_linger = true;
2936 
2937 	ceph_osdc_get_request(req);
2938 	account_request(req);
2939 	req->r_tid = atomic64_inc_return(&osdc->last_tid);
2940 	link_request(lreq->osd, req);
2941 	send_request(req);
2942 }
2943 
2944 static void linger_submit(struct ceph_osd_linger_request *lreq)
2945 {
2946 	struct ceph_osd_client *osdc = lreq->osdc;
2947 	struct ceph_osd *osd;
2948 
2949 	calc_target(osdc, &lreq->t, NULL, false);
2950 	osd = lookup_create_osd(osdc, lreq->t.osd, true);
2951 	link_linger(osd, lreq);
2952 
2953 	send_linger(lreq);
2954 }
2955 
2956 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
2957 {
2958 	struct ceph_osd_client *osdc = lreq->osdc;
2959 	struct ceph_osd_linger_request *lookup_lreq;
2960 
2961 	verify_osdc_wrlocked(osdc);
2962 
2963 	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
2964 				       lreq->linger_id);
2965 	if (!lookup_lreq)
2966 		return;
2967 
2968 	WARN_ON(lookup_lreq != lreq);
2969 	erase_linger_mc(&osdc->linger_map_checks, lreq);
2970 	linger_put(lreq);
2971 }
2972 
2973 /*
2974  * @lreq has to be both registered and linked.
2975  */
2976 static void __linger_cancel(struct ceph_osd_linger_request *lreq)
2977 {
2978 	if (lreq->is_watch && lreq->ping_req->r_osd)
2979 		cancel_linger_request(lreq->ping_req);
2980 	if (lreq->reg_req->r_osd)
2981 		cancel_linger_request(lreq->reg_req);
2982 	cancel_linger_map_check(lreq);
2983 	unlink_linger(lreq->osd, lreq);
2984 	linger_unregister(lreq);
2985 }
2986 
2987 static void linger_cancel(struct ceph_osd_linger_request *lreq)
2988 {
2989 	struct ceph_osd_client *osdc = lreq->osdc;
2990 
2991 	down_write(&osdc->lock);
2992 	if (__linger_registered(lreq))
2993 		__linger_cancel(lreq);
2994 	up_write(&osdc->lock);
2995 }
2996 
2997 static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
2998 
2999 static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
3000 {
3001 	struct ceph_osd_client *osdc = lreq->osdc;
3002 	struct ceph_osdmap *map = osdc->osdmap;
3003 
3004 	verify_osdc_wrlocked(osdc);
3005 	WARN_ON(!map->epoch);
3006 
3007 	if (lreq->register_gen) {
3008 		lreq->map_dne_bound = map->epoch;
3009 		dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
3010 		     lreq, lreq->linger_id);
3011 	} else {
3012 		dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
3013 		     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3014 		     map->epoch);
3015 	}
3016 
3017 	if (lreq->map_dne_bound) {
3018 		if (map->epoch >= lreq->map_dne_bound) {
3019 			/* we had a new enough map */
3020 			pr_info("linger_id %llu pool does not exist\n",
3021 				lreq->linger_id);
3022 			linger_reg_commit_complete(lreq, -ENOENT);
3023 			__linger_cancel(lreq);
3024 		}
3025 	} else {
3026 		send_linger_map_check(lreq);
3027 	}
3028 }
3029 
3030 static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
3031 {
3032 	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
3033 	struct ceph_osd_linger_request *lreq;
3034 	u64 linger_id = greq->private_data;
3035 
3036 	WARN_ON(greq->result || !greq->u.newest);
3037 
3038 	down_write(&osdc->lock);
3039 	lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
3040 	if (!lreq) {
3041 		dout("%s linger_id %llu dne\n", __func__, linger_id);
3042 		goto out_unlock;
3043 	}
3044 
3045 	dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
3046 	     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3047 	     greq->u.newest);
3048 	if (!lreq->map_dne_bound)
3049 		lreq->map_dne_bound = greq->u.newest;
3050 	erase_linger_mc(&osdc->linger_map_checks, lreq);
3051 	check_linger_pool_dne(lreq);
3052 
3053 	linger_put(lreq);
3054 out_unlock:
3055 	up_write(&osdc->lock);
3056 }
3057 
3058 static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
3059 {
3060 	struct ceph_osd_client *osdc = lreq->osdc;
3061 	struct ceph_osd_linger_request *lookup_lreq;
3062 	int ret;
3063 
3064 	verify_osdc_wrlocked(osdc);
3065 
3066 	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3067 				       lreq->linger_id);
3068 	if (lookup_lreq) {
3069 		WARN_ON(lookup_lreq != lreq);
3070 		return;
3071 	}
3072 
3073 	linger_get(lreq);
3074 	insert_linger_mc(&osdc->linger_map_checks, lreq);
3075 	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
3076 					  linger_map_check_cb, lreq->linger_id);
3077 	WARN_ON(ret);
3078 }
3079 
3080 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
3081 {
3082 	int ret;
3083 
3084 	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3085 	ret = wait_for_completion_interruptible(&lreq->reg_commit_wait);
3086 	return ret ?: lreq->reg_commit_error;
3087 }
3088 
3089 static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq)
3090 {
3091 	int ret;
3092 
3093 	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3094 	ret = wait_for_completion_interruptible(&lreq->notify_finish_wait);
3095 	return ret ?: lreq->notify_finish_error;
3096 }
3097 
3098 /*
3099  * Timeout callback, called every N seconds.  When 1 or more OSD
3100  * requests has been active for more than N seconds, we send a keepalive
3101  * (tag + timestamp) to its OSD to ensure any communications channel
3102  * reset is detected.
3103  */
3104 static void handle_timeout(struct work_struct *work)
3105 {
3106 	struct ceph_osd_client *osdc =
3107 		container_of(work, struct ceph_osd_client, timeout_work.work);
3108 	struct ceph_options *opts = osdc->client->options;
3109 	unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
3110 	unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout;
3111 	LIST_HEAD(slow_osds);
3112 	struct rb_node *n, *p;
3113 
3114 	dout("%s osdc %p\n", __func__, osdc);
3115 	down_write(&osdc->lock);
3116 
3117 	/*
3118 	 * ping osds that are a bit slow.  this ensures that if there
3119 	 * is a break in the TCP connection we will notice, and reopen
3120 	 * a connection with that osd (from the fault callback).
3121 	 */
3122 	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
3123 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3124 		bool found = false;
3125 
3126 		for (p = rb_first(&osd->o_requests); p; ) {
3127 			struct ceph_osd_request *req =
3128 			    rb_entry(p, struct ceph_osd_request, r_node);
3129 
3130 			p = rb_next(p); /* abort_request() */
3131 
3132 			if (time_before(req->r_stamp, cutoff)) {
3133 				dout(" req %p tid %llu on osd%d is laggy\n",
3134 				     req, req->r_tid, osd->o_osd);
3135 				found = true;
3136 			}
3137 			if (opts->osd_request_timeout &&
3138 			    time_before(req->r_start_stamp, expiry_cutoff)) {
3139 				pr_err_ratelimited("tid %llu on osd%d timeout\n",
3140 				       req->r_tid, osd->o_osd);
3141 				abort_request(req, -ETIMEDOUT);
3142 			}
3143 		}
3144 		for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
3145 			struct ceph_osd_linger_request *lreq =
3146 			    rb_entry(p, struct ceph_osd_linger_request, node);
3147 
3148 			dout(" lreq %p linger_id %llu is served by osd%d\n",
3149 			     lreq, lreq->linger_id, osd->o_osd);
3150 			found = true;
3151 
3152 			mutex_lock(&lreq->lock);
3153 			if (lreq->is_watch && lreq->committed && !lreq->last_error)
3154 				send_linger_ping(lreq);
3155 			mutex_unlock(&lreq->lock);
3156 		}
3157 
3158 		if (found)
3159 			list_move_tail(&osd->o_keepalive_item, &slow_osds);
3160 	}
3161 
3162 	if (opts->osd_request_timeout) {
3163 		for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
3164 			struct ceph_osd_request *req =
3165 			    rb_entry(p, struct ceph_osd_request, r_node);
3166 
3167 			p = rb_next(p); /* abort_request() */
3168 
3169 			if (time_before(req->r_start_stamp, expiry_cutoff)) {
3170 				pr_err_ratelimited("tid %llu on osd%d timeout\n",
3171 				       req->r_tid, osdc->homeless_osd.o_osd);
3172 				abort_request(req, -ETIMEDOUT);
3173 			}
3174 		}
3175 	}
3176 
3177 	if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
3178 		maybe_request_map(osdc);
3179 
3180 	while (!list_empty(&slow_osds)) {
3181 		struct ceph_osd *osd = list_first_entry(&slow_osds,
3182 							struct ceph_osd,
3183 							o_keepalive_item);
3184 		list_del_init(&osd->o_keepalive_item);
3185 		ceph_con_keepalive(&osd->o_con);
3186 	}
3187 
3188 	up_write(&osdc->lock);
3189 	schedule_delayed_work(&osdc->timeout_work,
3190 			      osdc->client->options->osd_keepalive_timeout);
3191 }
3192 
3193 static void handle_osds_timeout(struct work_struct *work)
3194 {
3195 	struct ceph_osd_client *osdc =
3196 		container_of(work, struct ceph_osd_client,
3197 			     osds_timeout_work.work);
3198 	unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
3199 	struct ceph_osd *osd, *nosd;
3200 
3201 	dout("%s osdc %p\n", __func__, osdc);
3202 	down_write(&osdc->lock);
3203 	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
3204 		if (time_before(jiffies, osd->lru_ttl))
3205 			break;
3206 
3207 		WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
3208 		WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
3209 		close_osd(osd);
3210 	}
3211 
3212 	up_write(&osdc->lock);
3213 	schedule_delayed_work(&osdc->osds_timeout_work,
3214 			      round_jiffies_relative(delay));
3215 }
3216 
3217 static int ceph_oloc_decode(void **p, void *end,
3218 			    struct ceph_object_locator *oloc)
3219 {
3220 	u8 struct_v, struct_cv;
3221 	u32 len;
3222 	void *struct_end;
3223 	int ret = 0;
3224 
3225 	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3226 	struct_v = ceph_decode_8(p);
3227 	struct_cv = ceph_decode_8(p);
3228 	if (struct_v < 3) {
3229 		pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
3230 			struct_v, struct_cv);
3231 		goto e_inval;
3232 	}
3233 	if (struct_cv > 6) {
3234 		pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
3235 			struct_v, struct_cv);
3236 		goto e_inval;
3237 	}
3238 	len = ceph_decode_32(p);
3239 	ceph_decode_need(p, end, len, e_inval);
3240 	struct_end = *p + len;
3241 
3242 	oloc->pool = ceph_decode_64(p);
3243 	*p += 4; /* skip preferred */
3244 
3245 	len = ceph_decode_32(p);
3246 	if (len > 0) {
3247 		pr_warn("ceph_object_locator::key is set\n");
3248 		goto e_inval;
3249 	}
3250 
3251 	if (struct_v >= 5) {
3252 		bool changed = false;
3253 
3254 		len = ceph_decode_32(p);
3255 		if (len > 0) {
3256 			ceph_decode_need(p, end, len, e_inval);
3257 			if (!oloc->pool_ns ||
3258 			    ceph_compare_string(oloc->pool_ns, *p, len))
3259 				changed = true;
3260 			*p += len;
3261 		} else {
3262 			if (oloc->pool_ns)
3263 				changed = true;
3264 		}
3265 		if (changed) {
3266 			/* redirect changes namespace */
3267 			pr_warn("ceph_object_locator::nspace is changed\n");
3268 			goto e_inval;
3269 		}
3270 	}
3271 
3272 	if (struct_v >= 6) {
3273 		s64 hash = ceph_decode_64(p);
3274 		if (hash != -1) {
3275 			pr_warn("ceph_object_locator::hash is set\n");
3276 			goto e_inval;
3277 		}
3278 	}
3279 
3280 	/* skip the rest */
3281 	*p = struct_end;
3282 out:
3283 	return ret;
3284 
3285 e_inval:
3286 	ret = -EINVAL;
3287 	goto out;
3288 }
3289 
3290 static int ceph_redirect_decode(void **p, void *end,
3291 				struct ceph_request_redirect *redir)
3292 {
3293 	u8 struct_v, struct_cv;
3294 	u32 len;
3295 	void *struct_end;
3296 	int ret;
3297 
3298 	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3299 	struct_v = ceph_decode_8(p);
3300 	struct_cv = ceph_decode_8(p);
3301 	if (struct_cv > 1) {
3302 		pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
3303 			struct_v, struct_cv);
3304 		goto e_inval;
3305 	}
3306 	len = ceph_decode_32(p);
3307 	ceph_decode_need(p, end, len, e_inval);
3308 	struct_end = *p + len;
3309 
3310 	ret = ceph_oloc_decode(p, end, &redir->oloc);
3311 	if (ret)
3312 		goto out;
3313 
3314 	len = ceph_decode_32(p);
3315 	if (len > 0) {
3316 		pr_warn("ceph_request_redirect::object_name is set\n");
3317 		goto e_inval;
3318 	}
3319 
3320 	len = ceph_decode_32(p);
3321 	*p += len; /* skip osd_instructions */
3322 
3323 	/* skip the rest */
3324 	*p = struct_end;
3325 out:
3326 	return ret;
3327 
3328 e_inval:
3329 	ret = -EINVAL;
3330 	goto out;
3331 }
3332 
3333 struct MOSDOpReply {
3334 	struct ceph_pg pgid;
3335 	u64 flags;
3336 	int result;
3337 	u32 epoch;
3338 	int num_ops;
3339 	u32 outdata_len[CEPH_OSD_MAX_OPS];
3340 	s32 rval[CEPH_OSD_MAX_OPS];
3341 	int retry_attempt;
3342 	struct ceph_eversion replay_version;
3343 	u64 user_version;
3344 	struct ceph_request_redirect redirect;
3345 };
3346 
3347 static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
3348 {
3349 	void *p = msg->front.iov_base;
3350 	void *const end = p + msg->front.iov_len;
3351 	u16 version = le16_to_cpu(msg->hdr.version);
3352 	struct ceph_eversion bad_replay_version;
3353 	u8 decode_redir;
3354 	u32 len;
3355 	int ret;
3356 	int i;
3357 
3358 	ceph_decode_32_safe(&p, end, len, e_inval);
3359 	ceph_decode_need(&p, end, len, e_inval);
3360 	p += len; /* skip oid */
3361 
3362 	ret = ceph_decode_pgid(&p, end, &m->pgid);
3363 	if (ret)
3364 		return ret;
3365 
3366 	ceph_decode_64_safe(&p, end, m->flags, e_inval);
3367 	ceph_decode_32_safe(&p, end, m->result, e_inval);
3368 	ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
3369 	memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
3370 	p += sizeof(bad_replay_version);
3371 	ceph_decode_32_safe(&p, end, m->epoch, e_inval);
3372 
3373 	ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
3374 	if (m->num_ops > ARRAY_SIZE(m->outdata_len))
3375 		goto e_inval;
3376 
3377 	ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
3378 			 e_inval);
3379 	for (i = 0; i < m->num_ops; i++) {
3380 		struct ceph_osd_op *op = p;
3381 
3382 		m->outdata_len[i] = le32_to_cpu(op->payload_len);
3383 		p += sizeof(*op);
3384 	}
3385 
3386 	ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
3387 	for (i = 0; i < m->num_ops; i++)
3388 		ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
3389 
3390 	if (version >= 5) {
3391 		ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
3392 		memcpy(&m->replay_version, p, sizeof(m->replay_version));
3393 		p += sizeof(m->replay_version);
3394 		ceph_decode_64_safe(&p, end, m->user_version, e_inval);
3395 	} else {
3396 		m->replay_version = bad_replay_version; /* struct */
3397 		m->user_version = le64_to_cpu(m->replay_version.version);
3398 	}
3399 
3400 	if (version >= 6) {
3401 		if (version >= 7)
3402 			ceph_decode_8_safe(&p, end, decode_redir, e_inval);
3403 		else
3404 			decode_redir = 1;
3405 	} else {
3406 		decode_redir = 0;
3407 	}
3408 
3409 	if (decode_redir) {
3410 		ret = ceph_redirect_decode(&p, end, &m->redirect);
3411 		if (ret)
3412 			return ret;
3413 	} else {
3414 		ceph_oloc_init(&m->redirect.oloc);
3415 	}
3416 
3417 	return 0;
3418 
3419 e_inval:
3420 	return -EINVAL;
3421 }
3422 
3423 /*
3424  * Handle MOSDOpReply.  Set ->r_result and call the callback if it is
3425  * specified.
3426  */
3427 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3428 {
3429 	struct ceph_osd_client *osdc = osd->o_osdc;
3430 	struct ceph_osd_request *req;
3431 	struct MOSDOpReply m;
3432 	u64 tid = le64_to_cpu(msg->hdr.tid);
3433 	u32 data_len = 0;
3434 	int ret;
3435 	int i;
3436 
3437 	dout("%s msg %p tid %llu\n", __func__, msg, tid);
3438 
3439 	down_read(&osdc->lock);
3440 	if (!osd_registered(osd)) {
3441 		dout("%s osd%d unknown\n", __func__, osd->o_osd);
3442 		goto out_unlock_osdc;
3443 	}
3444 	WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
3445 
3446 	mutex_lock(&osd->lock);
3447 	req = lookup_request(&osd->o_requests, tid);
3448 	if (!req) {
3449 		dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
3450 		goto out_unlock_session;
3451 	}
3452 
3453 	m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
3454 	ret = decode_MOSDOpReply(msg, &m);
3455 	m.redirect.oloc.pool_ns = NULL;
3456 	if (ret) {
3457 		pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
3458 		       req->r_tid, ret);
3459 		ceph_msg_dump(msg);
3460 		goto fail_request;
3461 	}
3462 	dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
3463 	     __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
3464 	     m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
3465 	     le64_to_cpu(m.replay_version.version), m.user_version);
3466 
3467 	if (m.retry_attempt >= 0) {
3468 		if (m.retry_attempt != req->r_attempts - 1) {
3469 			dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
3470 			     req, req->r_tid, m.retry_attempt,
3471 			     req->r_attempts - 1);
3472 			goto out_unlock_session;
3473 		}
3474 	} else {
3475 		WARN_ON(1); /* MOSDOpReply v4 is assumed */
3476 	}
3477 
3478 	if (!ceph_oloc_empty(&m.redirect.oloc)) {
3479 		dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
3480 		     m.redirect.oloc.pool);
3481 		unlink_request(osd, req);
3482 		mutex_unlock(&osd->lock);
3483 
3484 		/*
3485 		 * Not ceph_oloc_copy() - changing pool_ns is not
3486 		 * supported.
3487 		 */
3488 		req->r_t.target_oloc.pool = m.redirect.oloc.pool;
3489 		req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
3490 		req->r_tid = 0;
3491 		__submit_request(req, false);
3492 		goto out_unlock_osdc;
3493 	}
3494 
3495 	if (m.num_ops != req->r_num_ops) {
3496 		pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
3497 		       req->r_num_ops, req->r_tid);
3498 		goto fail_request;
3499 	}
3500 	for (i = 0; i < req->r_num_ops; i++) {
3501 		dout(" req %p tid %llu op %d rval %d len %u\n", req,
3502 		     req->r_tid, i, m.rval[i], m.outdata_len[i]);
3503 		req->r_ops[i].rval = m.rval[i];
3504 		req->r_ops[i].outdata_len = m.outdata_len[i];
3505 		data_len += m.outdata_len[i];
3506 	}
3507 	if (data_len != le32_to_cpu(msg->hdr.data_len)) {
3508 		pr_err("sum of lens %u != %u for tid %llu\n", data_len,
3509 		       le32_to_cpu(msg->hdr.data_len), req->r_tid);
3510 		goto fail_request;
3511 	}
3512 	dout("%s req %p tid %llu result %d data_len %u\n", __func__,
3513 	     req, req->r_tid, m.result, data_len);
3514 
3515 	/*
3516 	 * Since we only ever request ONDISK, we should only ever get
3517 	 * one (type of) reply back.
3518 	 */
3519 	WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
3520 	req->r_result = m.result ?: data_len;
3521 	finish_request(req);
3522 	mutex_unlock(&osd->lock);
3523 	up_read(&osdc->lock);
3524 
3525 	__complete_request(req);
3526 	complete_all(&req->r_completion);
3527 	ceph_osdc_put_request(req);
3528 	return;
3529 
3530 fail_request:
3531 	complete_request(req, -EIO);
3532 out_unlock_session:
3533 	mutex_unlock(&osd->lock);
3534 out_unlock_osdc:
3535 	up_read(&osdc->lock);
3536 }
3537 
3538 static void set_pool_was_full(struct ceph_osd_client *osdc)
3539 {
3540 	struct rb_node *n;
3541 
3542 	for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
3543 		struct ceph_pg_pool_info *pi =
3544 		    rb_entry(n, struct ceph_pg_pool_info, node);
3545 
3546 		pi->was_full = __pool_full(pi);
3547 	}
3548 }
3549 
3550 static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
3551 {
3552 	struct ceph_pg_pool_info *pi;
3553 
3554 	pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
3555 	if (!pi)
3556 		return false;
3557 
3558 	return pi->was_full && !__pool_full(pi);
3559 }
3560 
3561 static enum calc_target_result
3562 recalc_linger_target(struct ceph_osd_linger_request *lreq)
3563 {
3564 	struct ceph_osd_client *osdc = lreq->osdc;
3565 	enum calc_target_result ct_res;
3566 
3567 	ct_res = calc_target(osdc, &lreq->t, NULL, true);
3568 	if (ct_res == CALC_TARGET_NEED_RESEND) {
3569 		struct ceph_osd *osd;
3570 
3571 		osd = lookup_create_osd(osdc, lreq->t.osd, true);
3572 		if (osd != lreq->osd) {
3573 			unlink_linger(lreq->osd, lreq);
3574 			link_linger(osd, lreq);
3575 		}
3576 	}
3577 
3578 	return ct_res;
3579 }
3580 
3581 /*
3582  * Requeue requests whose mapping to an OSD has changed.
3583  */
3584 static void scan_requests(struct ceph_osd *osd,
3585 			  bool force_resend,
3586 			  bool cleared_full,
3587 			  bool check_pool_cleared_full,
3588 			  struct rb_root *need_resend,
3589 			  struct list_head *need_resend_linger)
3590 {
3591 	struct ceph_osd_client *osdc = osd->o_osdc;
3592 	struct rb_node *n;
3593 	bool force_resend_writes;
3594 
3595 	for (n = rb_first(&osd->o_linger_requests); n; ) {
3596 		struct ceph_osd_linger_request *lreq =
3597 		    rb_entry(n, struct ceph_osd_linger_request, node);
3598 		enum calc_target_result ct_res;
3599 
3600 		n = rb_next(n); /* recalc_linger_target() */
3601 
3602 		dout("%s lreq %p linger_id %llu\n", __func__, lreq,
3603 		     lreq->linger_id);
3604 		ct_res = recalc_linger_target(lreq);
3605 		switch (ct_res) {
3606 		case CALC_TARGET_NO_ACTION:
3607 			force_resend_writes = cleared_full ||
3608 			    (check_pool_cleared_full &&
3609 			     pool_cleared_full(osdc, lreq->t.base_oloc.pool));
3610 			if (!force_resend && !force_resend_writes)
3611 				break;
3612 
3613 			/* fall through */
3614 		case CALC_TARGET_NEED_RESEND:
3615 			cancel_linger_map_check(lreq);
3616 			/*
3617 			 * scan_requests() for the previous epoch(s)
3618 			 * may have already added it to the list, since
3619 			 * it's not unlinked here.
3620 			 */
3621 			if (list_empty(&lreq->scan_item))
3622 				list_add_tail(&lreq->scan_item, need_resend_linger);
3623 			break;
3624 		case CALC_TARGET_POOL_DNE:
3625 			list_del_init(&lreq->scan_item);
3626 			check_linger_pool_dne(lreq);
3627 			break;
3628 		}
3629 	}
3630 
3631 	for (n = rb_first(&osd->o_requests); n; ) {
3632 		struct ceph_osd_request *req =
3633 		    rb_entry(n, struct ceph_osd_request, r_node);
3634 		enum calc_target_result ct_res;
3635 
3636 		n = rb_next(n); /* unlink_request(), check_pool_dne() */
3637 
3638 		dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3639 		ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con,
3640 				     false);
3641 		switch (ct_res) {
3642 		case CALC_TARGET_NO_ACTION:
3643 			force_resend_writes = cleared_full ||
3644 			    (check_pool_cleared_full &&
3645 			     pool_cleared_full(osdc, req->r_t.base_oloc.pool));
3646 			if (!force_resend &&
3647 			    (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
3648 			     !force_resend_writes))
3649 				break;
3650 
3651 			/* fall through */
3652 		case CALC_TARGET_NEED_RESEND:
3653 			cancel_map_check(req);
3654 			unlink_request(osd, req);
3655 			insert_request(need_resend, req);
3656 			break;
3657 		case CALC_TARGET_POOL_DNE:
3658 			check_pool_dne(req);
3659 			break;
3660 		}
3661 	}
3662 }
3663 
3664 static int handle_one_map(struct ceph_osd_client *osdc,
3665 			  void *p, void *end, bool incremental,
3666 			  struct rb_root *need_resend,
3667 			  struct list_head *need_resend_linger)
3668 {
3669 	struct ceph_osdmap *newmap;
3670 	struct rb_node *n;
3671 	bool skipped_map = false;
3672 	bool was_full;
3673 
3674 	was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
3675 	set_pool_was_full(osdc);
3676 
3677 	if (incremental)
3678 		newmap = osdmap_apply_incremental(&p, end, osdc->osdmap);
3679 	else
3680 		newmap = ceph_osdmap_decode(&p, end);
3681 	if (IS_ERR(newmap))
3682 		return PTR_ERR(newmap);
3683 
3684 	if (newmap != osdc->osdmap) {
3685 		/*
3686 		 * Preserve ->was_full before destroying the old map.
3687 		 * For pools that weren't in the old map, ->was_full
3688 		 * should be false.
3689 		 */
3690 		for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
3691 			struct ceph_pg_pool_info *pi =
3692 			    rb_entry(n, struct ceph_pg_pool_info, node);
3693 			struct ceph_pg_pool_info *old_pi;
3694 
3695 			old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
3696 			if (old_pi)
3697 				pi->was_full = old_pi->was_full;
3698 			else
3699 				WARN_ON(pi->was_full);
3700 		}
3701 
3702 		if (osdc->osdmap->epoch &&
3703 		    osdc->osdmap->epoch + 1 < newmap->epoch) {
3704 			WARN_ON(incremental);
3705 			skipped_map = true;
3706 		}
3707 
3708 		ceph_osdmap_destroy(osdc->osdmap);
3709 		osdc->osdmap = newmap;
3710 	}
3711 
3712 	was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
3713 	scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
3714 		      need_resend, need_resend_linger);
3715 
3716 	for (n = rb_first(&osdc->osds); n; ) {
3717 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3718 
3719 		n = rb_next(n); /* close_osd() */
3720 
3721 		scan_requests(osd, skipped_map, was_full, true, need_resend,
3722 			      need_resend_linger);
3723 		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
3724 		    memcmp(&osd->o_con.peer_addr,
3725 			   ceph_osd_addr(osdc->osdmap, osd->o_osd),
3726 			   sizeof(struct ceph_entity_addr)))
3727 			close_osd(osd);
3728 	}
3729 
3730 	return 0;
3731 }
3732 
3733 static void kick_requests(struct ceph_osd_client *osdc,
3734 			  struct rb_root *need_resend,
3735 			  struct list_head *need_resend_linger)
3736 {
3737 	struct ceph_osd_linger_request *lreq, *nlreq;
3738 	enum calc_target_result ct_res;
3739 	struct rb_node *n;
3740 
3741 	/* make sure need_resend targets reflect latest map */
3742 	for (n = rb_first(need_resend); n; ) {
3743 		struct ceph_osd_request *req =
3744 		    rb_entry(n, struct ceph_osd_request, r_node);
3745 
3746 		n = rb_next(n);
3747 
3748 		if (req->r_t.epoch < osdc->osdmap->epoch) {
3749 			ct_res = calc_target(osdc, &req->r_t, NULL, false);
3750 			if (ct_res == CALC_TARGET_POOL_DNE) {
3751 				erase_request(need_resend, req);
3752 				check_pool_dne(req);
3753 			}
3754 		}
3755 	}
3756 
3757 	for (n = rb_first(need_resend); n; ) {
3758 		struct ceph_osd_request *req =
3759 		    rb_entry(n, struct ceph_osd_request, r_node);
3760 		struct ceph_osd *osd;
3761 
3762 		n = rb_next(n);
3763 		erase_request(need_resend, req); /* before link_request() */
3764 
3765 		osd = lookup_create_osd(osdc, req->r_t.osd, true);
3766 		link_request(osd, req);
3767 		if (!req->r_linger) {
3768 			if (!osd_homeless(osd) && !req->r_t.paused)
3769 				send_request(req);
3770 		} else {
3771 			cancel_linger_request(req);
3772 		}
3773 	}
3774 
3775 	list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
3776 		if (!osd_homeless(lreq->osd))
3777 			send_linger(lreq);
3778 
3779 		list_del_init(&lreq->scan_item);
3780 	}
3781 }
3782 
3783 /*
3784  * Process updated osd map.
3785  *
3786  * The message contains any number of incremental and full maps, normally
3787  * indicating some sort of topology change in the cluster.  Kick requests
3788  * off to different OSDs as needed.
3789  */
3790 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
3791 {
3792 	void *p = msg->front.iov_base;
3793 	void *const end = p + msg->front.iov_len;
3794 	u32 nr_maps, maplen;
3795 	u32 epoch;
3796 	struct ceph_fsid fsid;
3797 	struct rb_root need_resend = RB_ROOT;
3798 	LIST_HEAD(need_resend_linger);
3799 	bool handled_incremental = false;
3800 	bool was_pauserd, was_pausewr;
3801 	bool pauserd, pausewr;
3802 	int err;
3803 
3804 	dout("%s have %u\n", __func__, osdc->osdmap->epoch);
3805 	down_write(&osdc->lock);
3806 
3807 	/* verify fsid */
3808 	ceph_decode_need(&p, end, sizeof(fsid), bad);
3809 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
3810 	if (ceph_check_fsid(osdc->client, &fsid) < 0)
3811 		goto bad;
3812 
3813 	was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
3814 	was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
3815 		      ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
3816 		      have_pool_full(osdc);
3817 
3818 	/* incremental maps */
3819 	ceph_decode_32_safe(&p, end, nr_maps, bad);
3820 	dout(" %d inc maps\n", nr_maps);
3821 	while (nr_maps > 0) {
3822 		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3823 		epoch = ceph_decode_32(&p);
3824 		maplen = ceph_decode_32(&p);
3825 		ceph_decode_need(&p, end, maplen, bad);
3826 		if (osdc->osdmap->epoch &&
3827 		    osdc->osdmap->epoch + 1 == epoch) {
3828 			dout("applying incremental map %u len %d\n",
3829 			     epoch, maplen);
3830 			err = handle_one_map(osdc, p, p + maplen, true,
3831 					     &need_resend, &need_resend_linger);
3832 			if (err)
3833 				goto bad;
3834 			handled_incremental = true;
3835 		} else {
3836 			dout("ignoring incremental map %u len %d\n",
3837 			     epoch, maplen);
3838 		}
3839 		p += maplen;
3840 		nr_maps--;
3841 	}
3842 	if (handled_incremental)
3843 		goto done;
3844 
3845 	/* full maps */
3846 	ceph_decode_32_safe(&p, end, nr_maps, bad);
3847 	dout(" %d full maps\n", nr_maps);
3848 	while (nr_maps) {
3849 		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3850 		epoch = ceph_decode_32(&p);
3851 		maplen = ceph_decode_32(&p);
3852 		ceph_decode_need(&p, end, maplen, bad);
3853 		if (nr_maps > 1) {
3854 			dout("skipping non-latest full map %u len %d\n",
3855 			     epoch, maplen);
3856 		} else if (osdc->osdmap->epoch >= epoch) {
3857 			dout("skipping full map %u len %d, "
3858 			     "older than our %u\n", epoch, maplen,
3859 			     osdc->osdmap->epoch);
3860 		} else {
3861 			dout("taking full map %u len %d\n", epoch, maplen);
3862 			err = handle_one_map(osdc, p, p + maplen, false,
3863 					     &need_resend, &need_resend_linger);
3864 			if (err)
3865 				goto bad;
3866 		}
3867 		p += maplen;
3868 		nr_maps--;
3869 	}
3870 
3871 done:
3872 	/*
3873 	 * subscribe to subsequent osdmap updates if full to ensure
3874 	 * we find out when we are no longer full and stop returning
3875 	 * ENOSPC.
3876 	 */
3877 	pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
3878 	pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
3879 		  ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
3880 		  have_pool_full(osdc);
3881 	if (was_pauserd || was_pausewr || pauserd || pausewr ||
3882 	    osdc->osdmap->epoch < osdc->epoch_barrier)
3883 		maybe_request_map(osdc);
3884 
3885 	kick_requests(osdc, &need_resend, &need_resend_linger);
3886 
3887 	ceph_osdc_abort_on_full(osdc);
3888 	ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
3889 			  osdc->osdmap->epoch);
3890 	up_write(&osdc->lock);
3891 	wake_up_all(&osdc->client->auth_wq);
3892 	return;
3893 
3894 bad:
3895 	pr_err("osdc handle_map corrupt msg\n");
3896 	ceph_msg_dump(msg);
3897 	up_write(&osdc->lock);
3898 }
3899 
3900 /*
3901  * Resubmit requests pending on the given osd.
3902  */
3903 static void kick_osd_requests(struct ceph_osd *osd)
3904 {
3905 	struct rb_node *n;
3906 
3907 	clear_backoffs(osd);
3908 
3909 	for (n = rb_first(&osd->o_requests); n; ) {
3910 		struct ceph_osd_request *req =
3911 		    rb_entry(n, struct ceph_osd_request, r_node);
3912 
3913 		n = rb_next(n); /* cancel_linger_request() */
3914 
3915 		if (!req->r_linger) {
3916 			if (!req->r_t.paused)
3917 				send_request(req);
3918 		} else {
3919 			cancel_linger_request(req);
3920 		}
3921 	}
3922 	for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
3923 		struct ceph_osd_linger_request *lreq =
3924 		    rb_entry(n, struct ceph_osd_linger_request, node);
3925 
3926 		send_linger(lreq);
3927 	}
3928 }
3929 
3930 /*
3931  * If the osd connection drops, we need to resubmit all requests.
3932  */
3933 static void osd_fault(struct ceph_connection *con)
3934 {
3935 	struct ceph_osd *osd = con->private;
3936 	struct ceph_osd_client *osdc = osd->o_osdc;
3937 
3938 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
3939 
3940 	down_write(&osdc->lock);
3941 	if (!osd_registered(osd)) {
3942 		dout("%s osd%d unknown\n", __func__, osd->o_osd);
3943 		goto out_unlock;
3944 	}
3945 
3946 	if (!reopen_osd(osd))
3947 		kick_osd_requests(osd);
3948 	maybe_request_map(osdc);
3949 
3950 out_unlock:
3951 	up_write(&osdc->lock);
3952 }
3953 
3954 struct MOSDBackoff {
3955 	struct ceph_spg spgid;
3956 	u32 map_epoch;
3957 	u8 op;
3958 	u64 id;
3959 	struct ceph_hobject_id *begin;
3960 	struct ceph_hobject_id *end;
3961 };
3962 
3963 static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
3964 {
3965 	void *p = msg->front.iov_base;
3966 	void *const end = p + msg->front.iov_len;
3967 	u8 struct_v;
3968 	u32 struct_len;
3969 	int ret;
3970 
3971 	ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
3972 	if (ret)
3973 		return ret;
3974 
3975 	ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
3976 	if (ret)
3977 		return ret;
3978 
3979 	ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
3980 	ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
3981 	ceph_decode_8_safe(&p, end, m->op, e_inval);
3982 	ceph_decode_64_safe(&p, end, m->id, e_inval);
3983 
3984 	m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
3985 	if (!m->begin)
3986 		return -ENOMEM;
3987 
3988 	ret = decode_hoid(&p, end, m->begin);
3989 	if (ret) {
3990 		free_hoid(m->begin);
3991 		return ret;
3992 	}
3993 
3994 	m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
3995 	if (!m->end) {
3996 		free_hoid(m->begin);
3997 		return -ENOMEM;
3998 	}
3999 
4000 	ret = decode_hoid(&p, end, m->end);
4001 	if (ret) {
4002 		free_hoid(m->begin);
4003 		free_hoid(m->end);
4004 		return ret;
4005 	}
4006 
4007 	return 0;
4008 
4009 e_inval:
4010 	return -EINVAL;
4011 }
4012 
4013 static struct ceph_msg *create_backoff_message(
4014 				const struct ceph_osd_backoff *backoff,
4015 				u32 map_epoch)
4016 {
4017 	struct ceph_msg *msg;
4018 	void *p, *end;
4019 	int msg_size;
4020 
4021 	msg_size = CEPH_ENCODING_START_BLK_LEN +
4022 			CEPH_PGID_ENCODING_LEN + 1; /* spgid */
4023 	msg_size += 4 + 1 + 8; /* map_epoch, op, id */
4024 	msg_size += CEPH_ENCODING_START_BLK_LEN +
4025 			hoid_encoding_size(backoff->begin);
4026 	msg_size += CEPH_ENCODING_START_BLK_LEN +
4027 			hoid_encoding_size(backoff->end);
4028 
4029 	msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
4030 	if (!msg)
4031 		return NULL;
4032 
4033 	p = msg->front.iov_base;
4034 	end = p + msg->front_alloc_len;
4035 
4036 	encode_spgid(&p, &backoff->spgid);
4037 	ceph_encode_32(&p, map_epoch);
4038 	ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
4039 	ceph_encode_64(&p, backoff->id);
4040 	encode_hoid(&p, end, backoff->begin);
4041 	encode_hoid(&p, end, backoff->end);
4042 	BUG_ON(p != end);
4043 
4044 	msg->front.iov_len = p - msg->front.iov_base;
4045 	msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
4046 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
4047 
4048 	return msg;
4049 }
4050 
4051 static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
4052 {
4053 	struct ceph_spg_mapping *spg;
4054 	struct ceph_osd_backoff *backoff;
4055 	struct ceph_msg *msg;
4056 
4057 	dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4058 	     m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4059 
4060 	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
4061 	if (!spg) {
4062 		spg = alloc_spg_mapping();
4063 		if (!spg) {
4064 			pr_err("%s failed to allocate spg\n", __func__);
4065 			return;
4066 		}
4067 		spg->spgid = m->spgid; /* struct */
4068 		insert_spg_mapping(&osd->o_backoff_mappings, spg);
4069 	}
4070 
4071 	backoff = alloc_backoff();
4072 	if (!backoff) {
4073 		pr_err("%s failed to allocate backoff\n", __func__);
4074 		return;
4075 	}
4076 	backoff->spgid = m->spgid; /* struct */
4077 	backoff->id = m->id;
4078 	backoff->begin = m->begin;
4079 	m->begin = NULL; /* backoff now owns this */
4080 	backoff->end = m->end;
4081 	m->end = NULL;   /* ditto */
4082 
4083 	insert_backoff(&spg->backoffs, backoff);
4084 	insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4085 
4086 	/*
4087 	 * Ack with original backoff's epoch so that the OSD can
4088 	 * discard this if there was a PG split.
4089 	 */
4090 	msg = create_backoff_message(backoff, m->map_epoch);
4091 	if (!msg) {
4092 		pr_err("%s failed to allocate msg\n", __func__);
4093 		return;
4094 	}
4095 	ceph_con_send(&osd->o_con, msg);
4096 }
4097 
4098 static bool target_contained_by(const struct ceph_osd_request_target *t,
4099 				const struct ceph_hobject_id *begin,
4100 				const struct ceph_hobject_id *end)
4101 {
4102 	struct ceph_hobject_id hoid;
4103 	int cmp;
4104 
4105 	hoid_fill_from_target(&hoid, t);
4106 	cmp = hoid_compare(&hoid, begin);
4107 	return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
4108 }
4109 
4110 static void handle_backoff_unblock(struct ceph_osd *osd,
4111 				   const struct MOSDBackoff *m)
4112 {
4113 	struct ceph_spg_mapping *spg;
4114 	struct ceph_osd_backoff *backoff;
4115 	struct rb_node *n;
4116 
4117 	dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4118 	     m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4119 
4120 	backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
4121 	if (!backoff) {
4122 		pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
4123 		       __func__, osd->o_osd, m->spgid.pgid.pool,
4124 		       m->spgid.pgid.seed, m->spgid.shard, m->id);
4125 		return;
4126 	}
4127 
4128 	if (hoid_compare(backoff->begin, m->begin) &&
4129 	    hoid_compare(backoff->end, m->end)) {
4130 		pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
4131 		       __func__, osd->o_osd, m->spgid.pgid.pool,
4132 		       m->spgid.pgid.seed, m->spgid.shard, m->id);
4133 		/* unblock it anyway... */
4134 	}
4135 
4136 	spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
4137 	BUG_ON(!spg);
4138 
4139 	erase_backoff(&spg->backoffs, backoff);
4140 	erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4141 	free_backoff(backoff);
4142 
4143 	if (RB_EMPTY_ROOT(&spg->backoffs)) {
4144 		erase_spg_mapping(&osd->o_backoff_mappings, spg);
4145 		free_spg_mapping(spg);
4146 	}
4147 
4148 	for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
4149 		struct ceph_osd_request *req =
4150 		    rb_entry(n, struct ceph_osd_request, r_node);
4151 
4152 		if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
4153 			/*
4154 			 * Match against @m, not @backoff -- the PG may
4155 			 * have split on the OSD.
4156 			 */
4157 			if (target_contained_by(&req->r_t, m->begin, m->end)) {
4158 				/*
4159 				 * If no other installed backoff applies,
4160 				 * resend.
4161 				 */
4162 				send_request(req);
4163 			}
4164 		}
4165 	}
4166 }
4167 
4168 static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
4169 {
4170 	struct ceph_osd_client *osdc = osd->o_osdc;
4171 	struct MOSDBackoff m;
4172 	int ret;
4173 
4174 	down_read(&osdc->lock);
4175 	if (!osd_registered(osd)) {
4176 		dout("%s osd%d unknown\n", __func__, osd->o_osd);
4177 		up_read(&osdc->lock);
4178 		return;
4179 	}
4180 	WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
4181 
4182 	mutex_lock(&osd->lock);
4183 	ret = decode_MOSDBackoff(msg, &m);
4184 	if (ret) {
4185 		pr_err("failed to decode MOSDBackoff: %d\n", ret);
4186 		ceph_msg_dump(msg);
4187 		goto out_unlock;
4188 	}
4189 
4190 	switch (m.op) {
4191 	case CEPH_OSD_BACKOFF_OP_BLOCK:
4192 		handle_backoff_block(osd, &m);
4193 		break;
4194 	case CEPH_OSD_BACKOFF_OP_UNBLOCK:
4195 		handle_backoff_unblock(osd, &m);
4196 		break;
4197 	default:
4198 		pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
4199 	}
4200 
4201 	free_hoid(m.begin);
4202 	free_hoid(m.end);
4203 
4204 out_unlock:
4205 	mutex_unlock(&osd->lock);
4206 	up_read(&osdc->lock);
4207 }
4208 
4209 /*
4210  * Process osd watch notifications
4211  */
4212 static void handle_watch_notify(struct ceph_osd_client *osdc,
4213 				struct ceph_msg *msg)
4214 {
4215 	void *p = msg->front.iov_base;
4216 	void *const end = p + msg->front.iov_len;
4217 	struct ceph_osd_linger_request *lreq;
4218 	struct linger_work *lwork;
4219 	u8 proto_ver, opcode;
4220 	u64 cookie, notify_id;
4221 	u64 notifier_id = 0;
4222 	s32 return_code = 0;
4223 	void *payload = NULL;
4224 	u32 payload_len = 0;
4225 
4226 	ceph_decode_8_safe(&p, end, proto_ver, bad);
4227 	ceph_decode_8_safe(&p, end, opcode, bad);
4228 	ceph_decode_64_safe(&p, end, cookie, bad);
4229 	p += 8; /* skip ver */
4230 	ceph_decode_64_safe(&p, end, notify_id, bad);
4231 
4232 	if (proto_ver >= 1) {
4233 		ceph_decode_32_safe(&p, end, payload_len, bad);
4234 		ceph_decode_need(&p, end, payload_len, bad);
4235 		payload = p;
4236 		p += payload_len;
4237 	}
4238 
4239 	if (le16_to_cpu(msg->hdr.version) >= 2)
4240 		ceph_decode_32_safe(&p, end, return_code, bad);
4241 
4242 	if (le16_to_cpu(msg->hdr.version) >= 3)
4243 		ceph_decode_64_safe(&p, end, notifier_id, bad);
4244 
4245 	down_read(&osdc->lock);
4246 	lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
4247 	if (!lreq) {
4248 		dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
4249 		     cookie);
4250 		goto out_unlock_osdc;
4251 	}
4252 
4253 	mutex_lock(&lreq->lock);
4254 	dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
4255 	     opcode, cookie, lreq, lreq->is_watch);
4256 	if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
4257 		if (!lreq->last_error) {
4258 			lreq->last_error = -ENOTCONN;
4259 			queue_watch_error(lreq);
4260 		}
4261 	} else if (!lreq->is_watch) {
4262 		/* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
4263 		if (lreq->notify_id && lreq->notify_id != notify_id) {
4264 			dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
4265 			     lreq->notify_id, notify_id);
4266 		} else if (!completion_done(&lreq->notify_finish_wait)) {
4267 			struct ceph_msg_data *data =
4268 			    list_first_entry_or_null(&msg->data,
4269 						     struct ceph_msg_data,
4270 						     links);
4271 
4272 			if (data) {
4273 				if (lreq->preply_pages) {
4274 					WARN_ON(data->type !=
4275 							CEPH_MSG_DATA_PAGES);
4276 					*lreq->preply_pages = data->pages;
4277 					*lreq->preply_len = data->length;
4278 				} else {
4279 					ceph_release_page_vector(data->pages,
4280 					       calc_pages_for(0, data->length));
4281 				}
4282 			}
4283 			lreq->notify_finish_error = return_code;
4284 			complete_all(&lreq->notify_finish_wait);
4285 		}
4286 	} else {
4287 		/* CEPH_WATCH_EVENT_NOTIFY */
4288 		lwork = lwork_alloc(lreq, do_watch_notify);
4289 		if (!lwork) {
4290 			pr_err("failed to allocate notify-lwork\n");
4291 			goto out_unlock_lreq;
4292 		}
4293 
4294 		lwork->notify.notify_id = notify_id;
4295 		lwork->notify.notifier_id = notifier_id;
4296 		lwork->notify.payload = payload;
4297 		lwork->notify.payload_len = payload_len;
4298 		lwork->notify.msg = ceph_msg_get(msg);
4299 		lwork_queue(lwork);
4300 	}
4301 
4302 out_unlock_lreq:
4303 	mutex_unlock(&lreq->lock);
4304 out_unlock_osdc:
4305 	up_read(&osdc->lock);
4306 	return;
4307 
4308 bad:
4309 	pr_err("osdc handle_watch_notify corrupt msg\n");
4310 }
4311 
4312 /*
4313  * Register request, send initial attempt.
4314  */
4315 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
4316 			    struct ceph_osd_request *req,
4317 			    bool nofail)
4318 {
4319 	down_read(&osdc->lock);
4320 	submit_request(req, false);
4321 	up_read(&osdc->lock);
4322 
4323 	return 0;
4324 }
4325 EXPORT_SYMBOL(ceph_osdc_start_request);
4326 
4327 /*
4328  * Unregister a registered request.  The request is not completed:
4329  * ->r_result isn't set and __complete_request() isn't called.
4330  */
4331 void ceph_osdc_cancel_request(struct ceph_osd_request *req)
4332 {
4333 	struct ceph_osd_client *osdc = req->r_osdc;
4334 
4335 	down_write(&osdc->lock);
4336 	if (req->r_osd)
4337 		cancel_request(req);
4338 	up_write(&osdc->lock);
4339 }
4340 EXPORT_SYMBOL(ceph_osdc_cancel_request);
4341 
4342 /*
4343  * @timeout: in jiffies, 0 means "wait forever"
4344  */
4345 static int wait_request_timeout(struct ceph_osd_request *req,
4346 				unsigned long timeout)
4347 {
4348 	long left;
4349 
4350 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
4351 	left = wait_for_completion_killable_timeout(&req->r_completion,
4352 						ceph_timeout_jiffies(timeout));
4353 	if (left <= 0) {
4354 		left = left ?: -ETIMEDOUT;
4355 		ceph_osdc_cancel_request(req);
4356 	} else {
4357 		left = req->r_result; /* completed */
4358 	}
4359 
4360 	return left;
4361 }
4362 
4363 /*
4364  * wait for a request to complete
4365  */
4366 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
4367 			   struct ceph_osd_request *req)
4368 {
4369 	return wait_request_timeout(req, 0);
4370 }
4371 EXPORT_SYMBOL(ceph_osdc_wait_request);
4372 
4373 /*
4374  * sync - wait for all in-flight requests to flush.  avoid starvation.
4375  */
4376 void ceph_osdc_sync(struct ceph_osd_client *osdc)
4377 {
4378 	struct rb_node *n, *p;
4379 	u64 last_tid = atomic64_read(&osdc->last_tid);
4380 
4381 again:
4382 	down_read(&osdc->lock);
4383 	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
4384 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4385 
4386 		mutex_lock(&osd->lock);
4387 		for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
4388 			struct ceph_osd_request *req =
4389 			    rb_entry(p, struct ceph_osd_request, r_node);
4390 
4391 			if (req->r_tid > last_tid)
4392 				break;
4393 
4394 			if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
4395 				continue;
4396 
4397 			ceph_osdc_get_request(req);
4398 			mutex_unlock(&osd->lock);
4399 			up_read(&osdc->lock);
4400 			dout("%s waiting on req %p tid %llu last_tid %llu\n",
4401 			     __func__, req, req->r_tid, last_tid);
4402 			wait_for_completion(&req->r_completion);
4403 			ceph_osdc_put_request(req);
4404 			goto again;
4405 		}
4406 
4407 		mutex_unlock(&osd->lock);
4408 	}
4409 
4410 	up_read(&osdc->lock);
4411 	dout("%s done last_tid %llu\n", __func__, last_tid);
4412 }
4413 EXPORT_SYMBOL(ceph_osdc_sync);
4414 
4415 static struct ceph_osd_request *
4416 alloc_linger_request(struct ceph_osd_linger_request *lreq)
4417 {
4418 	struct ceph_osd_request *req;
4419 
4420 	req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO);
4421 	if (!req)
4422 		return NULL;
4423 
4424 	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4425 	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4426 
4427 	if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
4428 		ceph_osdc_put_request(req);
4429 		return NULL;
4430 	}
4431 
4432 	return req;
4433 }
4434 
4435 /*
4436  * Returns a handle, caller owns a ref.
4437  */
4438 struct ceph_osd_linger_request *
4439 ceph_osdc_watch(struct ceph_osd_client *osdc,
4440 		struct ceph_object_id *oid,
4441 		struct ceph_object_locator *oloc,
4442 		rados_watchcb2_t wcb,
4443 		rados_watcherrcb_t errcb,
4444 		void *data)
4445 {
4446 	struct ceph_osd_linger_request *lreq;
4447 	int ret;
4448 
4449 	lreq = linger_alloc(osdc);
4450 	if (!lreq)
4451 		return ERR_PTR(-ENOMEM);
4452 
4453 	lreq->is_watch = true;
4454 	lreq->wcb = wcb;
4455 	lreq->errcb = errcb;
4456 	lreq->data = data;
4457 	lreq->watch_valid_thru = jiffies;
4458 
4459 	ceph_oid_copy(&lreq->t.base_oid, oid);
4460 	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4461 	lreq->t.flags = CEPH_OSD_FLAG_WRITE;
4462 	ktime_get_real_ts(&lreq->mtime);
4463 
4464 	lreq->reg_req = alloc_linger_request(lreq);
4465 	if (!lreq->reg_req) {
4466 		ret = -ENOMEM;
4467 		goto err_put_lreq;
4468 	}
4469 
4470 	lreq->ping_req = alloc_linger_request(lreq);
4471 	if (!lreq->ping_req) {
4472 		ret = -ENOMEM;
4473 		goto err_put_lreq;
4474 	}
4475 
4476 	down_write(&osdc->lock);
4477 	linger_register(lreq); /* before osd_req_op_* */
4478 	osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
4479 			      CEPH_OSD_WATCH_OP_WATCH);
4480 	osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
4481 			      CEPH_OSD_WATCH_OP_PING);
4482 	linger_submit(lreq);
4483 	up_write(&osdc->lock);
4484 
4485 	ret = linger_reg_commit_wait(lreq);
4486 	if (ret) {
4487 		linger_cancel(lreq);
4488 		goto err_put_lreq;
4489 	}
4490 
4491 	return lreq;
4492 
4493 err_put_lreq:
4494 	linger_put(lreq);
4495 	return ERR_PTR(ret);
4496 }
4497 EXPORT_SYMBOL(ceph_osdc_watch);
4498 
4499 /*
4500  * Releases a ref.
4501  *
4502  * Times out after mount_timeout to preserve rbd unmap behaviour
4503  * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
4504  * with mount_timeout").
4505  */
4506 int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
4507 		      struct ceph_osd_linger_request *lreq)
4508 {
4509 	struct ceph_options *opts = osdc->client->options;
4510 	struct ceph_osd_request *req;
4511 	int ret;
4512 
4513 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4514 	if (!req)
4515 		return -ENOMEM;
4516 
4517 	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4518 	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4519 	req->r_flags = CEPH_OSD_FLAG_WRITE;
4520 	ktime_get_real_ts(&req->r_mtime);
4521 	osd_req_op_watch_init(req, 0, lreq->linger_id,
4522 			      CEPH_OSD_WATCH_OP_UNWATCH);
4523 
4524 	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4525 	if (ret)
4526 		goto out_put_req;
4527 
4528 	ceph_osdc_start_request(osdc, req, false);
4529 	linger_cancel(lreq);
4530 	linger_put(lreq);
4531 	ret = wait_request_timeout(req, opts->mount_timeout);
4532 
4533 out_put_req:
4534 	ceph_osdc_put_request(req);
4535 	return ret;
4536 }
4537 EXPORT_SYMBOL(ceph_osdc_unwatch);
4538 
4539 static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
4540 				      u64 notify_id, u64 cookie, void *payload,
4541 				      size_t payload_len)
4542 {
4543 	struct ceph_osd_req_op *op;
4544 	struct ceph_pagelist *pl;
4545 	int ret;
4546 
4547 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
4548 
4549 	pl = kmalloc(sizeof(*pl), GFP_NOIO);
4550 	if (!pl)
4551 		return -ENOMEM;
4552 
4553 	ceph_pagelist_init(pl);
4554 	ret = ceph_pagelist_encode_64(pl, notify_id);
4555 	ret |= ceph_pagelist_encode_64(pl, cookie);
4556 	if (payload) {
4557 		ret |= ceph_pagelist_encode_32(pl, payload_len);
4558 		ret |= ceph_pagelist_append(pl, payload, payload_len);
4559 	} else {
4560 		ret |= ceph_pagelist_encode_32(pl, 0);
4561 	}
4562 	if (ret) {
4563 		ceph_pagelist_release(pl);
4564 		return -ENOMEM;
4565 	}
4566 
4567 	ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
4568 	op->indata_len = pl->length;
4569 	return 0;
4570 }
4571 
4572 int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
4573 			 struct ceph_object_id *oid,
4574 			 struct ceph_object_locator *oloc,
4575 			 u64 notify_id,
4576 			 u64 cookie,
4577 			 void *payload,
4578 			 size_t payload_len)
4579 {
4580 	struct ceph_osd_request *req;
4581 	int ret;
4582 
4583 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4584 	if (!req)
4585 		return -ENOMEM;
4586 
4587 	ceph_oid_copy(&req->r_base_oid, oid);
4588 	ceph_oloc_copy(&req->r_base_oloc, oloc);
4589 	req->r_flags = CEPH_OSD_FLAG_READ;
4590 
4591 	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4592 	if (ret)
4593 		goto out_put_req;
4594 
4595 	ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4596 					 payload_len);
4597 	if (ret)
4598 		goto out_put_req;
4599 
4600 	ceph_osdc_start_request(osdc, req, false);
4601 	ret = ceph_osdc_wait_request(osdc, req);
4602 
4603 out_put_req:
4604 	ceph_osdc_put_request(req);
4605 	return ret;
4606 }
4607 EXPORT_SYMBOL(ceph_osdc_notify_ack);
4608 
4609 static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
4610 				  u64 cookie, u32 prot_ver, u32 timeout,
4611 				  void *payload, size_t payload_len)
4612 {
4613 	struct ceph_osd_req_op *op;
4614 	struct ceph_pagelist *pl;
4615 	int ret;
4616 
4617 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
4618 	op->notify.cookie = cookie;
4619 
4620 	pl = kmalloc(sizeof(*pl), GFP_NOIO);
4621 	if (!pl)
4622 		return -ENOMEM;
4623 
4624 	ceph_pagelist_init(pl);
4625 	ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
4626 	ret |= ceph_pagelist_encode_32(pl, timeout);
4627 	ret |= ceph_pagelist_encode_32(pl, payload_len);
4628 	ret |= ceph_pagelist_append(pl, payload, payload_len);
4629 	if (ret) {
4630 		ceph_pagelist_release(pl);
4631 		return -ENOMEM;
4632 	}
4633 
4634 	ceph_osd_data_pagelist_init(&op->notify.request_data, pl);
4635 	op->indata_len = pl->length;
4636 	return 0;
4637 }
4638 
4639 /*
4640  * @timeout: in seconds
4641  *
4642  * @preply_{pages,len} are initialized both on success and error.
4643  * The caller is responsible for:
4644  *
4645  *     ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
4646  */
4647 int ceph_osdc_notify(struct ceph_osd_client *osdc,
4648 		     struct ceph_object_id *oid,
4649 		     struct ceph_object_locator *oloc,
4650 		     void *payload,
4651 		     size_t payload_len,
4652 		     u32 timeout,
4653 		     struct page ***preply_pages,
4654 		     size_t *preply_len)
4655 {
4656 	struct ceph_osd_linger_request *lreq;
4657 	struct page **pages;
4658 	int ret;
4659 
4660 	WARN_ON(!timeout);
4661 	if (preply_pages) {
4662 		*preply_pages = NULL;
4663 		*preply_len = 0;
4664 	}
4665 
4666 	lreq = linger_alloc(osdc);
4667 	if (!lreq)
4668 		return -ENOMEM;
4669 
4670 	lreq->preply_pages = preply_pages;
4671 	lreq->preply_len = preply_len;
4672 
4673 	ceph_oid_copy(&lreq->t.base_oid, oid);
4674 	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4675 	lreq->t.flags = CEPH_OSD_FLAG_READ;
4676 
4677 	lreq->reg_req = alloc_linger_request(lreq);
4678 	if (!lreq->reg_req) {
4679 		ret = -ENOMEM;
4680 		goto out_put_lreq;
4681 	}
4682 
4683 	/* for notify_id */
4684 	pages = ceph_alloc_page_vector(1, GFP_NOIO);
4685 	if (IS_ERR(pages)) {
4686 		ret = PTR_ERR(pages);
4687 		goto out_put_lreq;
4688 	}
4689 
4690 	down_write(&osdc->lock);
4691 	linger_register(lreq); /* before osd_req_op_* */
4692 	ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
4693 				     timeout, payload, payload_len);
4694 	if (ret) {
4695 		linger_unregister(lreq);
4696 		up_write(&osdc->lock);
4697 		ceph_release_page_vector(pages, 1);
4698 		goto out_put_lreq;
4699 	}
4700 	ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
4701 						 response_data),
4702 				 pages, PAGE_SIZE, 0, false, true);
4703 	linger_submit(lreq);
4704 	up_write(&osdc->lock);
4705 
4706 	ret = linger_reg_commit_wait(lreq);
4707 	if (!ret)
4708 		ret = linger_notify_finish_wait(lreq);
4709 	else
4710 		dout("lreq %p failed to initiate notify %d\n", lreq, ret);
4711 
4712 	linger_cancel(lreq);
4713 out_put_lreq:
4714 	linger_put(lreq);
4715 	return ret;
4716 }
4717 EXPORT_SYMBOL(ceph_osdc_notify);
4718 
4719 /*
4720  * Return the number of milliseconds since the watch was last
4721  * confirmed, or an error.  If there is an error, the watch is no
4722  * longer valid, and should be destroyed with ceph_osdc_unwatch().
4723  */
4724 int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
4725 			  struct ceph_osd_linger_request *lreq)
4726 {
4727 	unsigned long stamp, age;
4728 	int ret;
4729 
4730 	down_read(&osdc->lock);
4731 	mutex_lock(&lreq->lock);
4732 	stamp = lreq->watch_valid_thru;
4733 	if (!list_empty(&lreq->pending_lworks)) {
4734 		struct linger_work *lwork =
4735 		    list_first_entry(&lreq->pending_lworks,
4736 				     struct linger_work,
4737 				     pending_item);
4738 
4739 		if (time_before(lwork->queued_stamp, stamp))
4740 			stamp = lwork->queued_stamp;
4741 	}
4742 	age = jiffies - stamp;
4743 	dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__,
4744 	     lreq, lreq->linger_id, age, lreq->last_error);
4745 	/* we are truncating to msecs, so return a safe upper bound */
4746 	ret = lreq->last_error ?: 1 + jiffies_to_msecs(age);
4747 
4748 	mutex_unlock(&lreq->lock);
4749 	up_read(&osdc->lock);
4750 	return ret;
4751 }
4752 
4753 static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
4754 {
4755 	u8 struct_v;
4756 	u32 struct_len;
4757 	int ret;
4758 
4759 	ret = ceph_start_decoding(p, end, 2, "watch_item_t",
4760 				  &struct_v, &struct_len);
4761 	if (ret)
4762 		return ret;
4763 
4764 	ceph_decode_copy(p, &item->name, sizeof(item->name));
4765 	item->cookie = ceph_decode_64(p);
4766 	*p += 4; /* skip timeout_seconds */
4767 	if (struct_v >= 2) {
4768 		ceph_decode_copy(p, &item->addr, sizeof(item->addr));
4769 		ceph_decode_addr(&item->addr);
4770 	}
4771 
4772 	dout("%s %s%llu cookie %llu addr %s\n", __func__,
4773 	     ENTITY_NAME(item->name), item->cookie,
4774 	     ceph_pr_addr(&item->addr.in_addr));
4775 	return 0;
4776 }
4777 
4778 static int decode_watchers(void **p, void *end,
4779 			   struct ceph_watch_item **watchers,
4780 			   u32 *num_watchers)
4781 {
4782 	u8 struct_v;
4783 	u32 struct_len;
4784 	int i;
4785 	int ret;
4786 
4787 	ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
4788 				  &struct_v, &struct_len);
4789 	if (ret)
4790 		return ret;
4791 
4792 	*num_watchers = ceph_decode_32(p);
4793 	*watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
4794 	if (!*watchers)
4795 		return -ENOMEM;
4796 
4797 	for (i = 0; i < *num_watchers; i++) {
4798 		ret = decode_watcher(p, end, *watchers + i);
4799 		if (ret) {
4800 			kfree(*watchers);
4801 			return ret;
4802 		}
4803 	}
4804 
4805 	return 0;
4806 }
4807 
4808 /*
4809  * On success, the caller is responsible for:
4810  *
4811  *     kfree(watchers);
4812  */
4813 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
4814 			    struct ceph_object_id *oid,
4815 			    struct ceph_object_locator *oloc,
4816 			    struct ceph_watch_item **watchers,
4817 			    u32 *num_watchers)
4818 {
4819 	struct ceph_osd_request *req;
4820 	struct page **pages;
4821 	int ret;
4822 
4823 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4824 	if (!req)
4825 		return -ENOMEM;
4826 
4827 	ceph_oid_copy(&req->r_base_oid, oid);
4828 	ceph_oloc_copy(&req->r_base_oloc, oloc);
4829 	req->r_flags = CEPH_OSD_FLAG_READ;
4830 
4831 	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4832 	if (ret)
4833 		goto out_put_req;
4834 
4835 	pages = ceph_alloc_page_vector(1, GFP_NOIO);
4836 	if (IS_ERR(pages)) {
4837 		ret = PTR_ERR(pages);
4838 		goto out_put_req;
4839 	}
4840 
4841 	osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
4842 	ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
4843 						 response_data),
4844 				 pages, PAGE_SIZE, 0, false, true);
4845 
4846 	ceph_osdc_start_request(osdc, req, false);
4847 	ret = ceph_osdc_wait_request(osdc, req);
4848 	if (ret >= 0) {
4849 		void *p = page_address(pages[0]);
4850 		void *const end = p + req->r_ops[0].outdata_len;
4851 
4852 		ret = decode_watchers(&p, end, watchers, num_watchers);
4853 	}
4854 
4855 out_put_req:
4856 	ceph_osdc_put_request(req);
4857 	return ret;
4858 }
4859 EXPORT_SYMBOL(ceph_osdc_list_watchers);
4860 
4861 /*
4862  * Call all pending notify callbacks - for use after a watch is
4863  * unregistered, to make sure no more callbacks for it will be invoked
4864  */
4865 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
4866 {
4867 	dout("%s osdc %p\n", __func__, osdc);
4868 	flush_workqueue(osdc->notify_wq);
4869 }
4870 EXPORT_SYMBOL(ceph_osdc_flush_notifies);
4871 
4872 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
4873 {
4874 	down_read(&osdc->lock);
4875 	maybe_request_map(osdc);
4876 	up_read(&osdc->lock);
4877 }
4878 EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
4879 
4880 /*
4881  * Execute an OSD class method on an object.
4882  *
4883  * @flags: CEPH_OSD_FLAG_*
4884  * @resp_len: in/out param for reply length
4885  */
4886 int ceph_osdc_call(struct ceph_osd_client *osdc,
4887 		   struct ceph_object_id *oid,
4888 		   struct ceph_object_locator *oloc,
4889 		   const char *class, const char *method,
4890 		   unsigned int flags,
4891 		   struct page *req_page, size_t req_len,
4892 		   struct page *resp_page, size_t *resp_len)
4893 {
4894 	struct ceph_osd_request *req;
4895 	int ret;
4896 
4897 	if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE))
4898 		return -E2BIG;
4899 
4900 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4901 	if (!req)
4902 		return -ENOMEM;
4903 
4904 	ceph_oid_copy(&req->r_base_oid, oid);
4905 	ceph_oloc_copy(&req->r_base_oloc, oloc);
4906 	req->r_flags = flags;
4907 
4908 	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4909 	if (ret)
4910 		goto out_put_req;
4911 
4912 	osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
4913 	if (req_page)
4914 		osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
4915 						  0, false, false);
4916 	if (resp_page)
4917 		osd_req_op_cls_response_data_pages(req, 0, &resp_page,
4918 						   *resp_len, 0, false, false);
4919 
4920 	ceph_osdc_start_request(osdc, req, false);
4921 	ret = ceph_osdc_wait_request(osdc, req);
4922 	if (ret >= 0) {
4923 		ret = req->r_ops[0].rval;
4924 		if (resp_page)
4925 			*resp_len = req->r_ops[0].outdata_len;
4926 	}
4927 
4928 out_put_req:
4929 	ceph_osdc_put_request(req);
4930 	return ret;
4931 }
4932 EXPORT_SYMBOL(ceph_osdc_call);
4933 
4934 /*
4935  * init, shutdown
4936  */
4937 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
4938 {
4939 	int err;
4940 
4941 	dout("init\n");
4942 	osdc->client = client;
4943 	init_rwsem(&osdc->lock);
4944 	osdc->osds = RB_ROOT;
4945 	INIT_LIST_HEAD(&osdc->osd_lru);
4946 	spin_lock_init(&osdc->osd_lru_lock);
4947 	osd_init(&osdc->homeless_osd);
4948 	osdc->homeless_osd.o_osdc = osdc;
4949 	osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
4950 	osdc->last_linger_id = CEPH_LINGER_ID_START;
4951 	osdc->linger_requests = RB_ROOT;
4952 	osdc->map_checks = RB_ROOT;
4953 	osdc->linger_map_checks = RB_ROOT;
4954 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
4955 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
4956 
4957 	err = -ENOMEM;
4958 	osdc->osdmap = ceph_osdmap_alloc();
4959 	if (!osdc->osdmap)
4960 		goto out;
4961 
4962 	osdc->req_mempool = mempool_create_slab_pool(10,
4963 						     ceph_osd_request_cache);
4964 	if (!osdc->req_mempool)
4965 		goto out_map;
4966 
4967 	err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
4968 				PAGE_SIZE, 10, true, "osd_op");
4969 	if (err < 0)
4970 		goto out_mempool;
4971 	err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
4972 				PAGE_SIZE, 10, true, "osd_op_reply");
4973 	if (err < 0)
4974 		goto out_msgpool;
4975 
4976 	err = -ENOMEM;
4977 	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
4978 	if (!osdc->notify_wq)
4979 		goto out_msgpool_reply;
4980 
4981 	schedule_delayed_work(&osdc->timeout_work,
4982 			      osdc->client->options->osd_keepalive_timeout);
4983 	schedule_delayed_work(&osdc->osds_timeout_work,
4984 	    round_jiffies_relative(osdc->client->options->osd_idle_ttl));
4985 
4986 	return 0;
4987 
4988 out_msgpool_reply:
4989 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
4990 out_msgpool:
4991 	ceph_msgpool_destroy(&osdc->msgpool_op);
4992 out_mempool:
4993 	mempool_destroy(osdc->req_mempool);
4994 out_map:
4995 	ceph_osdmap_destroy(osdc->osdmap);
4996 out:
4997 	return err;
4998 }
4999 
5000 void ceph_osdc_stop(struct ceph_osd_client *osdc)
5001 {
5002 	flush_workqueue(osdc->notify_wq);
5003 	destroy_workqueue(osdc->notify_wq);
5004 	cancel_delayed_work_sync(&osdc->timeout_work);
5005 	cancel_delayed_work_sync(&osdc->osds_timeout_work);
5006 
5007 	down_write(&osdc->lock);
5008 	while (!RB_EMPTY_ROOT(&osdc->osds)) {
5009 		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
5010 						struct ceph_osd, o_node);
5011 		close_osd(osd);
5012 	}
5013 	up_write(&osdc->lock);
5014 	WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
5015 	osd_cleanup(&osdc->homeless_osd);
5016 
5017 	WARN_ON(!list_empty(&osdc->osd_lru));
5018 	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
5019 	WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
5020 	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
5021 	WARN_ON(atomic_read(&osdc->num_requests));
5022 	WARN_ON(atomic_read(&osdc->num_homeless));
5023 
5024 	ceph_osdmap_destroy(osdc->osdmap);
5025 	mempool_destroy(osdc->req_mempool);
5026 	ceph_msgpool_destroy(&osdc->msgpool_op);
5027 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5028 }
5029 
5030 /*
5031  * Read some contiguous pages.  If we cross a stripe boundary, shorten
5032  * *plen.  Return number of bytes read, or error.
5033  */
5034 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
5035 			struct ceph_vino vino, struct ceph_file_layout *layout,
5036 			u64 off, u64 *plen,
5037 			u32 truncate_seq, u64 truncate_size,
5038 			struct page **pages, int num_pages, int page_align)
5039 {
5040 	struct ceph_osd_request *req;
5041 	int rc = 0;
5042 
5043 	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
5044 	     vino.snap, off, *plen);
5045 	req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
5046 				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
5047 				    NULL, truncate_seq, truncate_size,
5048 				    false);
5049 	if (IS_ERR(req))
5050 		return PTR_ERR(req);
5051 
5052 	/* it may be a short read due to an object boundary */
5053 	osd_req_op_extent_osd_data_pages(req, 0,
5054 				pages, *plen, page_align, false, false);
5055 
5056 	dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
5057 	     off, *plen, *plen, page_align);
5058 
5059 	rc = ceph_osdc_start_request(osdc, req, false);
5060 	if (!rc)
5061 		rc = ceph_osdc_wait_request(osdc, req);
5062 
5063 	ceph_osdc_put_request(req);
5064 	dout("readpages result %d\n", rc);
5065 	return rc;
5066 }
5067 EXPORT_SYMBOL(ceph_osdc_readpages);
5068 
5069 /*
5070  * do a synchronous write on N pages
5071  */
5072 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
5073 			 struct ceph_file_layout *layout,
5074 			 struct ceph_snap_context *snapc,
5075 			 u64 off, u64 len,
5076 			 u32 truncate_seq, u64 truncate_size,
5077 			 struct timespec *mtime,
5078 			 struct page **pages, int num_pages)
5079 {
5080 	struct ceph_osd_request *req;
5081 	int rc = 0;
5082 	int page_align = off & ~PAGE_MASK;
5083 
5084 	req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
5085 				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
5086 				    snapc, truncate_seq, truncate_size,
5087 				    true);
5088 	if (IS_ERR(req))
5089 		return PTR_ERR(req);
5090 
5091 	/* it may be a short write due to an object boundary */
5092 	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
5093 				false, false);
5094 	dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
5095 
5096 	req->r_mtime = *mtime;
5097 	rc = ceph_osdc_start_request(osdc, req, true);
5098 	if (!rc)
5099 		rc = ceph_osdc_wait_request(osdc, req);
5100 
5101 	ceph_osdc_put_request(req);
5102 	if (rc == 0)
5103 		rc = len;
5104 	dout("writepages result %d\n", rc);
5105 	return rc;
5106 }
5107 EXPORT_SYMBOL(ceph_osdc_writepages);
5108 
5109 int __init ceph_osdc_setup(void)
5110 {
5111 	size_t size = sizeof(struct ceph_osd_request) +
5112 	    CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
5113 
5114 	BUG_ON(ceph_osd_request_cache);
5115 	ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
5116 						   0, 0, NULL);
5117 
5118 	return ceph_osd_request_cache ? 0 : -ENOMEM;
5119 }
5120 
5121 void ceph_osdc_cleanup(void)
5122 {
5123 	BUG_ON(!ceph_osd_request_cache);
5124 	kmem_cache_destroy(ceph_osd_request_cache);
5125 	ceph_osd_request_cache = NULL;
5126 }
5127 
5128 /*
5129  * handle incoming message
5130  */
5131 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5132 {
5133 	struct ceph_osd *osd = con->private;
5134 	struct ceph_osd_client *osdc = osd->o_osdc;
5135 	int type = le16_to_cpu(msg->hdr.type);
5136 
5137 	switch (type) {
5138 	case CEPH_MSG_OSD_MAP:
5139 		ceph_osdc_handle_map(osdc, msg);
5140 		break;
5141 	case CEPH_MSG_OSD_OPREPLY:
5142 		handle_reply(osd, msg);
5143 		break;
5144 	case CEPH_MSG_OSD_BACKOFF:
5145 		handle_backoff(osd, msg);
5146 		break;
5147 	case CEPH_MSG_WATCH_NOTIFY:
5148 		handle_watch_notify(osdc, msg);
5149 		break;
5150 
5151 	default:
5152 		pr_err("received unknown message type %d %s\n", type,
5153 		       ceph_msg_type_name(type));
5154 	}
5155 
5156 	ceph_msg_put(msg);
5157 }
5158 
5159 /*
5160  * Lookup and return message for incoming reply.  Don't try to do
5161  * anything about a larger than preallocated data portion of the
5162  * message at the moment - for now, just skip the message.
5163  */
5164 static struct ceph_msg *get_reply(struct ceph_connection *con,
5165 				  struct ceph_msg_header *hdr,
5166 				  int *skip)
5167 {
5168 	struct ceph_osd *osd = con->private;
5169 	struct ceph_osd_client *osdc = osd->o_osdc;
5170 	struct ceph_msg *m = NULL;
5171 	struct ceph_osd_request *req;
5172 	int front_len = le32_to_cpu(hdr->front_len);
5173 	int data_len = le32_to_cpu(hdr->data_len);
5174 	u64 tid = le64_to_cpu(hdr->tid);
5175 
5176 	down_read(&osdc->lock);
5177 	if (!osd_registered(osd)) {
5178 		dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
5179 		*skip = 1;
5180 		goto out_unlock_osdc;
5181 	}
5182 	WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
5183 
5184 	mutex_lock(&osd->lock);
5185 	req = lookup_request(&osd->o_requests, tid);
5186 	if (!req) {
5187 		dout("%s osd%d tid %llu unknown, skipping\n", __func__,
5188 		     osd->o_osd, tid);
5189 		*skip = 1;
5190 		goto out_unlock_session;
5191 	}
5192 
5193 	ceph_msg_revoke_incoming(req->r_reply);
5194 
5195 	if (front_len > req->r_reply->front_alloc_len) {
5196 		pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
5197 			__func__, osd->o_osd, req->r_tid, front_len,
5198 			req->r_reply->front_alloc_len);
5199 		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
5200 				 false);
5201 		if (!m)
5202 			goto out_unlock_session;
5203 		ceph_msg_put(req->r_reply);
5204 		req->r_reply = m;
5205 	}
5206 
5207 	if (data_len > req->r_reply->data_length) {
5208 		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
5209 			__func__, osd->o_osd, req->r_tid, data_len,
5210 			req->r_reply->data_length);
5211 		m = NULL;
5212 		*skip = 1;
5213 		goto out_unlock_session;
5214 	}
5215 
5216 	m = ceph_msg_get(req->r_reply);
5217 	dout("get_reply tid %lld %p\n", tid, m);
5218 
5219 out_unlock_session:
5220 	mutex_unlock(&osd->lock);
5221 out_unlock_osdc:
5222 	up_read(&osdc->lock);
5223 	return m;
5224 }
5225 
5226 /*
5227  * TODO: switch to a msg-owned pagelist
5228  */
5229 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
5230 {
5231 	struct ceph_msg *m;
5232 	int type = le16_to_cpu(hdr->type);
5233 	u32 front_len = le32_to_cpu(hdr->front_len);
5234 	u32 data_len = le32_to_cpu(hdr->data_len);
5235 
5236 	m = ceph_msg_new(type, front_len, GFP_NOIO, false);
5237 	if (!m)
5238 		return NULL;
5239 
5240 	if (data_len) {
5241 		struct page **pages;
5242 		struct ceph_osd_data osd_data;
5243 
5244 		pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
5245 					       GFP_NOIO);
5246 		if (IS_ERR(pages)) {
5247 			ceph_msg_put(m);
5248 			return NULL;
5249 		}
5250 
5251 		ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false,
5252 					 false);
5253 		ceph_osdc_msg_data_add(m, &osd_data);
5254 	}
5255 
5256 	return m;
5257 }
5258 
5259 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
5260 				  struct ceph_msg_header *hdr,
5261 				  int *skip)
5262 {
5263 	struct ceph_osd *osd = con->private;
5264 	int type = le16_to_cpu(hdr->type);
5265 
5266 	*skip = 0;
5267 	switch (type) {
5268 	case CEPH_MSG_OSD_MAP:
5269 	case CEPH_MSG_OSD_BACKOFF:
5270 	case CEPH_MSG_WATCH_NOTIFY:
5271 		return alloc_msg_with_page_vector(hdr);
5272 	case CEPH_MSG_OSD_OPREPLY:
5273 		return get_reply(con, hdr, skip);
5274 	default:
5275 		pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
5276 			osd->o_osd, type);
5277 		*skip = 1;
5278 		return NULL;
5279 	}
5280 }
5281 
5282 /*
5283  * Wrappers to refcount containing ceph_osd struct
5284  */
5285 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
5286 {
5287 	struct ceph_osd *osd = con->private;
5288 	if (get_osd(osd))
5289 		return con;
5290 	return NULL;
5291 }
5292 
5293 static void put_osd_con(struct ceph_connection *con)
5294 {
5295 	struct ceph_osd *osd = con->private;
5296 	put_osd(osd);
5297 }
5298 
5299 /*
5300  * authentication
5301  */
5302 /*
5303  * Note: returned pointer is the address of a structure that's
5304  * managed separately.  Caller must *not* attempt to free it.
5305  */
5306 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
5307 					int *proto, int force_new)
5308 {
5309 	struct ceph_osd *o = con->private;
5310 	struct ceph_osd_client *osdc = o->o_osdc;
5311 	struct ceph_auth_client *ac = osdc->client->monc.auth;
5312 	struct ceph_auth_handshake *auth = &o->o_auth;
5313 
5314 	if (force_new && auth->authorizer) {
5315 		ceph_auth_destroy_authorizer(auth->authorizer);
5316 		auth->authorizer = NULL;
5317 	}
5318 	if (!auth->authorizer) {
5319 		int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
5320 						      auth);
5321 		if (ret)
5322 			return ERR_PTR(ret);
5323 	} else {
5324 		int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
5325 						     auth);
5326 		if (ret)
5327 			return ERR_PTR(ret);
5328 	}
5329 	*proto = ac->protocol;
5330 
5331 	return auth;
5332 }
5333 
5334 
5335 static int verify_authorizer_reply(struct ceph_connection *con)
5336 {
5337 	struct ceph_osd *o = con->private;
5338 	struct ceph_osd_client *osdc = o->o_osdc;
5339 	struct ceph_auth_client *ac = osdc->client->monc.auth;
5340 
5341 	return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer);
5342 }
5343 
5344 static int invalidate_authorizer(struct ceph_connection *con)
5345 {
5346 	struct ceph_osd *o = con->private;
5347 	struct ceph_osd_client *osdc = o->o_osdc;
5348 	struct ceph_auth_client *ac = osdc->client->monc.auth;
5349 
5350 	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
5351 	return ceph_monc_validate_auth(&osdc->client->monc);
5352 }
5353 
5354 static void osd_reencode_message(struct ceph_msg *msg)
5355 {
5356 	int type = le16_to_cpu(msg->hdr.type);
5357 
5358 	if (type == CEPH_MSG_OSD_OP)
5359 		encode_request_finish(msg);
5360 }
5361 
5362 static int osd_sign_message(struct ceph_msg *msg)
5363 {
5364 	struct ceph_osd *o = msg->con->private;
5365 	struct ceph_auth_handshake *auth = &o->o_auth;
5366 
5367 	return ceph_auth_sign_message(auth, msg);
5368 }
5369 
5370 static int osd_check_message_signature(struct ceph_msg *msg)
5371 {
5372 	struct ceph_osd *o = msg->con->private;
5373 	struct ceph_auth_handshake *auth = &o->o_auth;
5374 
5375 	return ceph_auth_check_message_signature(auth, msg);
5376 }
5377 
5378 static const struct ceph_connection_operations osd_con_ops = {
5379 	.get = get_osd_con,
5380 	.put = put_osd_con,
5381 	.dispatch = dispatch,
5382 	.get_authorizer = get_authorizer,
5383 	.verify_authorizer_reply = verify_authorizer_reply,
5384 	.invalidate_authorizer = invalidate_authorizer,
5385 	.alloc_msg = alloc_msg,
5386 	.reencode_message = osd_reencode_message,
5387 	.sign_message = osd_sign_message,
5388 	.check_message_signature = osd_check_message_signature,
5389 	.fault = osd_fault,
5390 };
5391