xref: /openbmc/linux/net/ceph/osd_client.c (revision a10bcb19)
1 
2 #include <linux/ceph/ceph_debug.h>
3 
4 #include <linux/module.h>
5 #include <linux/err.h>
6 #include <linux/highmem.h>
7 #include <linux/mm.h>
8 #include <linux/pagemap.h>
9 #include <linux/slab.h>
10 #include <linux/uaccess.h>
11 #ifdef CONFIG_BLOCK
12 #include <linux/bio.h>
13 #endif
14 
15 #include <linux/ceph/ceph_features.h>
16 #include <linux/ceph/libceph.h>
17 #include <linux/ceph/osd_client.h>
18 #include <linux/ceph/messenger.h>
19 #include <linux/ceph/decode.h>
20 #include <linux/ceph/auth.h>
21 #include <linux/ceph/pagelist.h>
22 
23 #define OSD_OPREPLY_FRONT_LEN	512
24 
25 static struct kmem_cache	*ceph_osd_request_cache;
26 
27 static const struct ceph_connection_operations osd_con_ops;
28 
29 /*
30  * Implement client access to distributed object storage cluster.
31  *
32  * All data objects are stored within a cluster/cloud of OSDs, or
33  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
34  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
35  * remote daemons serving up and coordinating consistent and safe
36  * access to storage.
37  *
38  * Cluster membership and the mapping of data objects onto storage devices
39  * are described by the osd map.
40  *
41  * We keep track of pending OSD requests (read, write), resubmit
42  * requests to different OSDs when the cluster topology/data layout
43  * change, or retry the affected requests when the communications
44  * channel with an OSD is reset.
45  */
46 
47 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
48 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
49 static void link_linger(struct ceph_osd *osd,
50 			struct ceph_osd_linger_request *lreq);
51 static void unlink_linger(struct ceph_osd *osd,
52 			  struct ceph_osd_linger_request *lreq);
53 
54 #if 1
55 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
56 {
57 	bool wrlocked = true;
58 
59 	if (unlikely(down_read_trylock(sem))) {
60 		wrlocked = false;
61 		up_read(sem);
62 	}
63 
64 	return wrlocked;
65 }
66 static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
67 {
68 	WARN_ON(!rwsem_is_locked(&osdc->lock));
69 }
70 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
71 {
72 	WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
73 }
74 static inline void verify_osd_locked(struct ceph_osd *osd)
75 {
76 	struct ceph_osd_client *osdc = osd->o_osdc;
77 
78 	WARN_ON(!(mutex_is_locked(&osd->lock) &&
79 		  rwsem_is_locked(&osdc->lock)) &&
80 		!rwsem_is_wrlocked(&osdc->lock));
81 }
82 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
83 {
84 	WARN_ON(!mutex_is_locked(&lreq->lock));
85 }
86 #else
87 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
88 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
89 static inline void verify_osd_locked(struct ceph_osd *osd) { }
90 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
91 #endif
92 
93 /*
94  * calculate the mapping of a file extent onto an object, and fill out the
95  * request accordingly.  shorten extent as necessary if it crosses an
96  * object boundary.
97  *
98  * fill osd op in request message.
99  */
100 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
101 			u64 *objnum, u64 *objoff, u64 *objlen)
102 {
103 	u64 orig_len = *plen;
104 	int r;
105 
106 	/* object extent? */
107 	r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
108 					  objoff, objlen);
109 	if (r < 0)
110 		return r;
111 	if (*objlen < orig_len) {
112 		*plen = *objlen;
113 		dout(" skipping last %llu, final file extent %llu~%llu\n",
114 		     orig_len - *plen, off, *plen);
115 	}
116 
117 	dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
118 
119 	return 0;
120 }
121 
122 static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
123 {
124 	memset(osd_data, 0, sizeof (*osd_data));
125 	osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
126 }
127 
128 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
129 			struct page **pages, u64 length, u32 alignment,
130 			bool pages_from_pool, bool own_pages)
131 {
132 	osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
133 	osd_data->pages = pages;
134 	osd_data->length = length;
135 	osd_data->alignment = alignment;
136 	osd_data->pages_from_pool = pages_from_pool;
137 	osd_data->own_pages = own_pages;
138 }
139 
140 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
141 			struct ceph_pagelist *pagelist)
142 {
143 	osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
144 	osd_data->pagelist = pagelist;
145 }
146 
147 #ifdef CONFIG_BLOCK
148 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
149 			struct bio *bio, size_t bio_length)
150 {
151 	osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
152 	osd_data->bio = bio;
153 	osd_data->bio_length = bio_length;
154 }
155 #endif /* CONFIG_BLOCK */
156 
157 #define osd_req_op_data(oreq, whch, typ, fld)				\
158 ({									\
159 	struct ceph_osd_request *__oreq = (oreq);			\
160 	unsigned int __whch = (whch);					\
161 	BUG_ON(__whch >= __oreq->r_num_ops);				\
162 	&__oreq->r_ops[__whch].typ.fld;					\
163 })
164 
165 static struct ceph_osd_data *
166 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
167 {
168 	BUG_ON(which >= osd_req->r_num_ops);
169 
170 	return &osd_req->r_ops[which].raw_data_in;
171 }
172 
173 struct ceph_osd_data *
174 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
175 			unsigned int which)
176 {
177 	return osd_req_op_data(osd_req, which, extent, osd_data);
178 }
179 EXPORT_SYMBOL(osd_req_op_extent_osd_data);
180 
181 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
182 			unsigned int which, struct page **pages,
183 			u64 length, u32 alignment,
184 			bool pages_from_pool, bool own_pages)
185 {
186 	struct ceph_osd_data *osd_data;
187 
188 	osd_data = osd_req_op_raw_data_in(osd_req, which);
189 	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
190 				pages_from_pool, own_pages);
191 }
192 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
193 
194 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
195 			unsigned int which, struct page **pages,
196 			u64 length, u32 alignment,
197 			bool pages_from_pool, bool own_pages)
198 {
199 	struct ceph_osd_data *osd_data;
200 
201 	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
202 	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
203 				pages_from_pool, own_pages);
204 }
205 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
206 
207 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
208 			unsigned int which, struct ceph_pagelist *pagelist)
209 {
210 	struct ceph_osd_data *osd_data;
211 
212 	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
213 	ceph_osd_data_pagelist_init(osd_data, pagelist);
214 }
215 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
216 
217 #ifdef CONFIG_BLOCK
218 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
219 			unsigned int which, struct bio *bio, size_t bio_length)
220 {
221 	struct ceph_osd_data *osd_data;
222 
223 	osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
224 	ceph_osd_data_bio_init(osd_data, bio, bio_length);
225 }
226 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
227 #endif /* CONFIG_BLOCK */
228 
229 static void osd_req_op_cls_request_info_pagelist(
230 			struct ceph_osd_request *osd_req,
231 			unsigned int which, struct ceph_pagelist *pagelist)
232 {
233 	struct ceph_osd_data *osd_data;
234 
235 	osd_data = osd_req_op_data(osd_req, which, cls, request_info);
236 	ceph_osd_data_pagelist_init(osd_data, pagelist);
237 }
238 
239 void osd_req_op_cls_request_data_pagelist(
240 			struct ceph_osd_request *osd_req,
241 			unsigned int which, struct ceph_pagelist *pagelist)
242 {
243 	struct ceph_osd_data *osd_data;
244 
245 	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
246 	ceph_osd_data_pagelist_init(osd_data, pagelist);
247 	osd_req->r_ops[which].cls.indata_len += pagelist->length;
248 	osd_req->r_ops[which].indata_len += pagelist->length;
249 }
250 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
251 
252 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
253 			unsigned int which, struct page **pages, u64 length,
254 			u32 alignment, bool pages_from_pool, bool own_pages)
255 {
256 	struct ceph_osd_data *osd_data;
257 
258 	osd_data = osd_req_op_data(osd_req, which, cls, request_data);
259 	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
260 				pages_from_pool, own_pages);
261 	osd_req->r_ops[which].cls.indata_len += length;
262 	osd_req->r_ops[which].indata_len += length;
263 }
264 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
265 
266 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
267 			unsigned int which, struct page **pages, u64 length,
268 			u32 alignment, bool pages_from_pool, bool own_pages)
269 {
270 	struct ceph_osd_data *osd_data;
271 
272 	osd_data = osd_req_op_data(osd_req, which, cls, response_data);
273 	ceph_osd_data_pages_init(osd_data, pages, length, alignment,
274 				pages_from_pool, own_pages);
275 }
276 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
277 
278 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
279 {
280 	switch (osd_data->type) {
281 	case CEPH_OSD_DATA_TYPE_NONE:
282 		return 0;
283 	case CEPH_OSD_DATA_TYPE_PAGES:
284 		return osd_data->length;
285 	case CEPH_OSD_DATA_TYPE_PAGELIST:
286 		return (u64)osd_data->pagelist->length;
287 #ifdef CONFIG_BLOCK
288 	case CEPH_OSD_DATA_TYPE_BIO:
289 		return (u64)osd_data->bio_length;
290 #endif /* CONFIG_BLOCK */
291 	default:
292 		WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
293 		return 0;
294 	}
295 }
296 
297 static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
298 {
299 	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
300 		int num_pages;
301 
302 		num_pages = calc_pages_for((u64)osd_data->alignment,
303 						(u64)osd_data->length);
304 		ceph_release_page_vector(osd_data->pages, num_pages);
305 	}
306 	ceph_osd_data_init(osd_data);
307 }
308 
309 static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
310 			unsigned int which)
311 {
312 	struct ceph_osd_req_op *op;
313 
314 	BUG_ON(which >= osd_req->r_num_ops);
315 	op = &osd_req->r_ops[which];
316 
317 	switch (op->op) {
318 	case CEPH_OSD_OP_READ:
319 	case CEPH_OSD_OP_WRITE:
320 	case CEPH_OSD_OP_WRITEFULL:
321 		ceph_osd_data_release(&op->extent.osd_data);
322 		break;
323 	case CEPH_OSD_OP_CALL:
324 		ceph_osd_data_release(&op->cls.request_info);
325 		ceph_osd_data_release(&op->cls.request_data);
326 		ceph_osd_data_release(&op->cls.response_data);
327 		break;
328 	case CEPH_OSD_OP_SETXATTR:
329 	case CEPH_OSD_OP_CMPXATTR:
330 		ceph_osd_data_release(&op->xattr.osd_data);
331 		break;
332 	case CEPH_OSD_OP_STAT:
333 		ceph_osd_data_release(&op->raw_data_in);
334 		break;
335 	case CEPH_OSD_OP_NOTIFY_ACK:
336 		ceph_osd_data_release(&op->notify_ack.request_data);
337 		break;
338 	case CEPH_OSD_OP_NOTIFY:
339 		ceph_osd_data_release(&op->notify.request_data);
340 		ceph_osd_data_release(&op->notify.response_data);
341 		break;
342 	case CEPH_OSD_OP_LIST_WATCHERS:
343 		ceph_osd_data_release(&op->list_watchers.response_data);
344 		break;
345 	default:
346 		break;
347 	}
348 }
349 
350 /*
351  * Assumes @t is zero-initialized.
352  */
353 static void target_init(struct ceph_osd_request_target *t)
354 {
355 	ceph_oid_init(&t->base_oid);
356 	ceph_oloc_init(&t->base_oloc);
357 	ceph_oid_init(&t->target_oid);
358 	ceph_oloc_init(&t->target_oloc);
359 
360 	ceph_osds_init(&t->acting);
361 	ceph_osds_init(&t->up);
362 	t->size = -1;
363 	t->min_size = -1;
364 
365 	t->osd = CEPH_HOMELESS_OSD;
366 }
367 
368 static void target_copy(struct ceph_osd_request_target *dest,
369 			const struct ceph_osd_request_target *src)
370 {
371 	ceph_oid_copy(&dest->base_oid, &src->base_oid);
372 	ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
373 	ceph_oid_copy(&dest->target_oid, &src->target_oid);
374 	ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
375 
376 	dest->pgid = src->pgid; /* struct */
377 	dest->spgid = src->spgid; /* struct */
378 	dest->pg_num = src->pg_num;
379 	dest->pg_num_mask = src->pg_num_mask;
380 	ceph_osds_copy(&dest->acting, &src->acting);
381 	ceph_osds_copy(&dest->up, &src->up);
382 	dest->size = src->size;
383 	dest->min_size = src->min_size;
384 	dest->sort_bitwise = src->sort_bitwise;
385 
386 	dest->flags = src->flags;
387 	dest->paused = src->paused;
388 
389 	dest->last_force_resend = src->last_force_resend;
390 
391 	dest->osd = src->osd;
392 }
393 
394 static void target_destroy(struct ceph_osd_request_target *t)
395 {
396 	ceph_oid_destroy(&t->base_oid);
397 	ceph_oloc_destroy(&t->base_oloc);
398 	ceph_oid_destroy(&t->target_oid);
399 	ceph_oloc_destroy(&t->target_oloc);
400 }
401 
402 /*
403  * requests
404  */
405 static void request_release_checks(struct ceph_osd_request *req)
406 {
407 	WARN_ON(!RB_EMPTY_NODE(&req->r_node));
408 	WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
409 	WARN_ON(!list_empty(&req->r_unsafe_item));
410 	WARN_ON(req->r_osd);
411 }
412 
413 static void ceph_osdc_release_request(struct kref *kref)
414 {
415 	struct ceph_osd_request *req = container_of(kref,
416 					    struct ceph_osd_request, r_kref);
417 	unsigned int which;
418 
419 	dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
420 	     req->r_request, req->r_reply);
421 	request_release_checks(req);
422 
423 	if (req->r_request)
424 		ceph_msg_put(req->r_request);
425 	if (req->r_reply)
426 		ceph_msg_put(req->r_reply);
427 
428 	for (which = 0; which < req->r_num_ops; which++)
429 		osd_req_op_data_release(req, which);
430 
431 	target_destroy(&req->r_t);
432 	ceph_put_snap_context(req->r_snapc);
433 
434 	if (req->r_mempool)
435 		mempool_free(req, req->r_osdc->req_mempool);
436 	else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
437 		kmem_cache_free(ceph_osd_request_cache, req);
438 	else
439 		kfree(req);
440 }
441 
442 void ceph_osdc_get_request(struct ceph_osd_request *req)
443 {
444 	dout("%s %p (was %d)\n", __func__, req,
445 	     kref_read(&req->r_kref));
446 	kref_get(&req->r_kref);
447 }
448 EXPORT_SYMBOL(ceph_osdc_get_request);
449 
450 void ceph_osdc_put_request(struct ceph_osd_request *req)
451 {
452 	if (req) {
453 		dout("%s %p (was %d)\n", __func__, req,
454 		     kref_read(&req->r_kref));
455 		kref_put(&req->r_kref, ceph_osdc_release_request);
456 	}
457 }
458 EXPORT_SYMBOL(ceph_osdc_put_request);
459 
460 static void request_init(struct ceph_osd_request *req)
461 {
462 	/* req only, each op is zeroed in _osd_req_op_init() */
463 	memset(req, 0, sizeof(*req));
464 
465 	kref_init(&req->r_kref);
466 	init_completion(&req->r_completion);
467 	RB_CLEAR_NODE(&req->r_node);
468 	RB_CLEAR_NODE(&req->r_mc_node);
469 	INIT_LIST_HEAD(&req->r_unsafe_item);
470 
471 	target_init(&req->r_t);
472 }
473 
474 /*
475  * This is ugly, but it allows us to reuse linger registration and ping
476  * requests, keeping the structure of the code around send_linger{_ping}()
477  * reasonable.  Setting up a min_nr=2 mempool for each linger request
478  * and dealing with copying ops (this blasts req only, watch op remains
479  * intact) isn't any better.
480  */
481 static void request_reinit(struct ceph_osd_request *req)
482 {
483 	struct ceph_osd_client *osdc = req->r_osdc;
484 	bool mempool = req->r_mempool;
485 	unsigned int num_ops = req->r_num_ops;
486 	u64 snapid = req->r_snapid;
487 	struct ceph_snap_context *snapc = req->r_snapc;
488 	bool linger = req->r_linger;
489 	struct ceph_msg *request_msg = req->r_request;
490 	struct ceph_msg *reply_msg = req->r_reply;
491 
492 	dout("%s req %p\n", __func__, req);
493 	WARN_ON(kref_read(&req->r_kref) != 1);
494 	request_release_checks(req);
495 
496 	WARN_ON(kref_read(&request_msg->kref) != 1);
497 	WARN_ON(kref_read(&reply_msg->kref) != 1);
498 	target_destroy(&req->r_t);
499 
500 	request_init(req);
501 	req->r_osdc = osdc;
502 	req->r_mempool = mempool;
503 	req->r_num_ops = num_ops;
504 	req->r_snapid = snapid;
505 	req->r_snapc = snapc;
506 	req->r_linger = linger;
507 	req->r_request = request_msg;
508 	req->r_reply = reply_msg;
509 }
510 
511 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
512 					       struct ceph_snap_context *snapc,
513 					       unsigned int num_ops,
514 					       bool use_mempool,
515 					       gfp_t gfp_flags)
516 {
517 	struct ceph_osd_request *req;
518 
519 	if (use_mempool) {
520 		BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
521 		req = mempool_alloc(osdc->req_mempool, gfp_flags);
522 	} else if (num_ops <= CEPH_OSD_SLAB_OPS) {
523 		req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
524 	} else {
525 		BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
526 		req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
527 			      gfp_flags);
528 	}
529 	if (unlikely(!req))
530 		return NULL;
531 
532 	request_init(req);
533 	req->r_osdc = osdc;
534 	req->r_mempool = use_mempool;
535 	req->r_num_ops = num_ops;
536 	req->r_snapid = CEPH_NOSNAP;
537 	req->r_snapc = ceph_get_snap_context(snapc);
538 
539 	dout("%s req %p\n", __func__, req);
540 	return req;
541 }
542 EXPORT_SYMBOL(ceph_osdc_alloc_request);
543 
544 static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
545 {
546 	return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
547 }
548 
549 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
550 {
551 	struct ceph_osd_client *osdc = req->r_osdc;
552 	struct ceph_msg *msg;
553 	int msg_size;
554 
555 	WARN_ON(ceph_oid_empty(&req->r_base_oid));
556 	WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
557 
558 	/* create request message */
559 	msg_size = CEPH_ENCODING_START_BLK_LEN +
560 			CEPH_PGID_ENCODING_LEN + 1; /* spgid */
561 	msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
562 	msg_size += CEPH_ENCODING_START_BLK_LEN +
563 			sizeof(struct ceph_osd_reqid); /* reqid */
564 	msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
565 	msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
566 	msg_size += CEPH_ENCODING_START_BLK_LEN +
567 			ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
568 	msg_size += 4 + req->r_base_oid.name_len; /* oid */
569 	msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
570 	msg_size += 8; /* snapid */
571 	msg_size += 8; /* snap_seq */
572 	msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
573 	msg_size += 4 + 8; /* retry_attempt, features */
574 
575 	if (req->r_mempool)
576 		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
577 	else
578 		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
579 	if (!msg)
580 		return -ENOMEM;
581 
582 	memset(msg->front.iov_base, 0, msg->front.iov_len);
583 	req->r_request = msg;
584 
585 	/* create reply message */
586 	msg_size = OSD_OPREPLY_FRONT_LEN;
587 	msg_size += req->r_base_oid.name_len;
588 	msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
589 
590 	if (req->r_mempool)
591 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
592 	else
593 		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
594 	if (!msg)
595 		return -ENOMEM;
596 
597 	req->r_reply = msg;
598 
599 	return 0;
600 }
601 EXPORT_SYMBOL(ceph_osdc_alloc_messages);
602 
603 static bool osd_req_opcode_valid(u16 opcode)
604 {
605 	switch (opcode) {
606 #define GENERATE_CASE(op, opcode, str)	case CEPH_OSD_OP_##op: return true;
607 __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
608 #undef GENERATE_CASE
609 	default:
610 		return false;
611 	}
612 }
613 
614 /*
615  * This is an osd op init function for opcodes that have no data or
616  * other information associated with them.  It also serves as a
617  * common init routine for all the other init functions, below.
618  */
619 static struct ceph_osd_req_op *
620 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
621 		 u16 opcode, u32 flags)
622 {
623 	struct ceph_osd_req_op *op;
624 
625 	BUG_ON(which >= osd_req->r_num_ops);
626 	BUG_ON(!osd_req_opcode_valid(opcode));
627 
628 	op = &osd_req->r_ops[which];
629 	memset(op, 0, sizeof (*op));
630 	op->op = opcode;
631 	op->flags = flags;
632 
633 	return op;
634 }
635 
636 void osd_req_op_init(struct ceph_osd_request *osd_req,
637 		     unsigned int which, u16 opcode, u32 flags)
638 {
639 	(void)_osd_req_op_init(osd_req, which, opcode, flags);
640 }
641 EXPORT_SYMBOL(osd_req_op_init);
642 
643 void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
644 				unsigned int which, u16 opcode,
645 				u64 offset, u64 length,
646 				u64 truncate_size, u32 truncate_seq)
647 {
648 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
649 						      opcode, 0);
650 	size_t payload_len = 0;
651 
652 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
653 	       opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
654 	       opcode != CEPH_OSD_OP_TRUNCATE);
655 
656 	op->extent.offset = offset;
657 	op->extent.length = length;
658 	op->extent.truncate_size = truncate_size;
659 	op->extent.truncate_seq = truncate_seq;
660 	if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
661 		payload_len += length;
662 
663 	op->indata_len = payload_len;
664 }
665 EXPORT_SYMBOL(osd_req_op_extent_init);
666 
667 void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
668 				unsigned int which, u64 length)
669 {
670 	struct ceph_osd_req_op *op;
671 	u64 previous;
672 
673 	BUG_ON(which >= osd_req->r_num_ops);
674 	op = &osd_req->r_ops[which];
675 	previous = op->extent.length;
676 
677 	if (length == previous)
678 		return;		/* Nothing to do */
679 	BUG_ON(length > previous);
680 
681 	op->extent.length = length;
682 	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
683 		op->indata_len -= previous - length;
684 }
685 EXPORT_SYMBOL(osd_req_op_extent_update);
686 
687 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
688 				unsigned int which, u64 offset_inc)
689 {
690 	struct ceph_osd_req_op *op, *prev_op;
691 
692 	BUG_ON(which + 1 >= osd_req->r_num_ops);
693 
694 	prev_op = &osd_req->r_ops[which];
695 	op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
696 	/* dup previous one */
697 	op->indata_len = prev_op->indata_len;
698 	op->outdata_len = prev_op->outdata_len;
699 	op->extent = prev_op->extent;
700 	/* adjust offset */
701 	op->extent.offset += offset_inc;
702 	op->extent.length -= offset_inc;
703 
704 	if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
705 		op->indata_len -= offset_inc;
706 }
707 EXPORT_SYMBOL(osd_req_op_extent_dup_last);
708 
709 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
710 			u16 opcode, const char *class, const char *method)
711 {
712 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
713 						      opcode, 0);
714 	struct ceph_pagelist *pagelist;
715 	size_t payload_len = 0;
716 	size_t size;
717 
718 	BUG_ON(opcode != CEPH_OSD_OP_CALL);
719 
720 	pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
721 	BUG_ON(!pagelist);
722 	ceph_pagelist_init(pagelist);
723 
724 	op->cls.class_name = class;
725 	size = strlen(class);
726 	BUG_ON(size > (size_t) U8_MAX);
727 	op->cls.class_len = size;
728 	ceph_pagelist_append(pagelist, class, size);
729 	payload_len += size;
730 
731 	op->cls.method_name = method;
732 	size = strlen(method);
733 	BUG_ON(size > (size_t) U8_MAX);
734 	op->cls.method_len = size;
735 	ceph_pagelist_append(pagelist, method, size);
736 	payload_len += size;
737 
738 	osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
739 
740 	op->indata_len = payload_len;
741 }
742 EXPORT_SYMBOL(osd_req_op_cls_init);
743 
744 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
745 			  u16 opcode, const char *name, const void *value,
746 			  size_t size, u8 cmp_op, u8 cmp_mode)
747 {
748 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
749 						      opcode, 0);
750 	struct ceph_pagelist *pagelist;
751 	size_t payload_len;
752 
753 	BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
754 
755 	pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
756 	if (!pagelist)
757 		return -ENOMEM;
758 
759 	ceph_pagelist_init(pagelist);
760 
761 	payload_len = strlen(name);
762 	op->xattr.name_len = payload_len;
763 	ceph_pagelist_append(pagelist, name, payload_len);
764 
765 	op->xattr.value_len = size;
766 	ceph_pagelist_append(pagelist, value, size);
767 	payload_len += size;
768 
769 	op->xattr.cmp_op = cmp_op;
770 	op->xattr.cmp_mode = cmp_mode;
771 
772 	ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
773 	op->indata_len = payload_len;
774 	return 0;
775 }
776 EXPORT_SYMBOL(osd_req_op_xattr_init);
777 
778 /*
779  * @watch_opcode: CEPH_OSD_WATCH_OP_*
780  */
781 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
782 				  u64 cookie, u8 watch_opcode)
783 {
784 	struct ceph_osd_req_op *op;
785 
786 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
787 	op->watch.cookie = cookie;
788 	op->watch.op = watch_opcode;
789 	op->watch.gen = 0;
790 }
791 
792 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
793 				unsigned int which,
794 				u64 expected_object_size,
795 				u64 expected_write_size)
796 {
797 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
798 						      CEPH_OSD_OP_SETALLOCHINT,
799 						      0);
800 
801 	op->alloc_hint.expected_object_size = expected_object_size;
802 	op->alloc_hint.expected_write_size = expected_write_size;
803 
804 	/*
805 	 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
806 	 * not worth a feature bit.  Set FAILOK per-op flag to make
807 	 * sure older osds don't trip over an unsupported opcode.
808 	 */
809 	op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
810 }
811 EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
812 
813 static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
814 				struct ceph_osd_data *osd_data)
815 {
816 	u64 length = ceph_osd_data_length(osd_data);
817 
818 	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
819 		BUG_ON(length > (u64) SIZE_MAX);
820 		if (length)
821 			ceph_msg_data_add_pages(msg, osd_data->pages,
822 					length, osd_data->alignment);
823 	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
824 		BUG_ON(!length);
825 		ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
826 #ifdef CONFIG_BLOCK
827 	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
828 		ceph_msg_data_add_bio(msg, osd_data->bio, length);
829 #endif
830 	} else {
831 		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
832 	}
833 }
834 
835 static u32 osd_req_encode_op(struct ceph_osd_op *dst,
836 			     const struct ceph_osd_req_op *src)
837 {
838 	if (WARN_ON(!osd_req_opcode_valid(src->op))) {
839 		pr_err("unrecognized osd opcode %d\n", src->op);
840 
841 		return 0;
842 	}
843 
844 	switch (src->op) {
845 	case CEPH_OSD_OP_STAT:
846 		break;
847 	case CEPH_OSD_OP_READ:
848 	case CEPH_OSD_OP_WRITE:
849 	case CEPH_OSD_OP_WRITEFULL:
850 	case CEPH_OSD_OP_ZERO:
851 	case CEPH_OSD_OP_TRUNCATE:
852 		dst->extent.offset = cpu_to_le64(src->extent.offset);
853 		dst->extent.length = cpu_to_le64(src->extent.length);
854 		dst->extent.truncate_size =
855 			cpu_to_le64(src->extent.truncate_size);
856 		dst->extent.truncate_seq =
857 			cpu_to_le32(src->extent.truncate_seq);
858 		break;
859 	case CEPH_OSD_OP_CALL:
860 		dst->cls.class_len = src->cls.class_len;
861 		dst->cls.method_len = src->cls.method_len;
862 		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
863 		break;
864 	case CEPH_OSD_OP_STARTSYNC:
865 		break;
866 	case CEPH_OSD_OP_WATCH:
867 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
868 		dst->watch.ver = cpu_to_le64(0);
869 		dst->watch.op = src->watch.op;
870 		dst->watch.gen = cpu_to_le32(src->watch.gen);
871 		break;
872 	case CEPH_OSD_OP_NOTIFY_ACK:
873 		break;
874 	case CEPH_OSD_OP_NOTIFY:
875 		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
876 		break;
877 	case CEPH_OSD_OP_LIST_WATCHERS:
878 		break;
879 	case CEPH_OSD_OP_SETALLOCHINT:
880 		dst->alloc_hint.expected_object_size =
881 		    cpu_to_le64(src->alloc_hint.expected_object_size);
882 		dst->alloc_hint.expected_write_size =
883 		    cpu_to_le64(src->alloc_hint.expected_write_size);
884 		break;
885 	case CEPH_OSD_OP_SETXATTR:
886 	case CEPH_OSD_OP_CMPXATTR:
887 		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
888 		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
889 		dst->xattr.cmp_op = src->xattr.cmp_op;
890 		dst->xattr.cmp_mode = src->xattr.cmp_mode;
891 		break;
892 	case CEPH_OSD_OP_CREATE:
893 	case CEPH_OSD_OP_DELETE:
894 		break;
895 	default:
896 		pr_err("unsupported osd opcode %s\n",
897 			ceph_osd_op_name(src->op));
898 		WARN_ON(1);
899 
900 		return 0;
901 	}
902 
903 	dst->op = cpu_to_le16(src->op);
904 	dst->flags = cpu_to_le32(src->flags);
905 	dst->payload_len = cpu_to_le32(src->indata_len);
906 
907 	return src->indata_len;
908 }
909 
910 /*
911  * build new request AND message, calculate layout, and adjust file
912  * extent as needed.
913  *
914  * if the file was recently truncated, we include information about its
915  * old and new size so that the object can be updated appropriately.  (we
916  * avoid synchronously deleting truncated objects because it's slow.)
917  *
918  * if @do_sync, include a 'startsync' command so that the osd will flush
919  * data quickly.
920  */
921 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
922 					       struct ceph_file_layout *layout,
923 					       struct ceph_vino vino,
924 					       u64 off, u64 *plen,
925 					       unsigned int which, int num_ops,
926 					       int opcode, int flags,
927 					       struct ceph_snap_context *snapc,
928 					       u32 truncate_seq,
929 					       u64 truncate_size,
930 					       bool use_mempool)
931 {
932 	struct ceph_osd_request *req;
933 	u64 objnum = 0;
934 	u64 objoff = 0;
935 	u64 objlen = 0;
936 	int r;
937 
938 	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
939 	       opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
940 	       opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
941 
942 	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
943 					GFP_NOFS);
944 	if (!req) {
945 		r = -ENOMEM;
946 		goto fail;
947 	}
948 
949 	/* calculate max write size */
950 	r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
951 	if (r)
952 		goto fail;
953 
954 	if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
955 		osd_req_op_init(req, which, opcode, 0);
956 	} else {
957 		u32 object_size = layout->object_size;
958 		u32 object_base = off - objoff;
959 		if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
960 			if (truncate_size <= object_base) {
961 				truncate_size = 0;
962 			} else {
963 				truncate_size -= object_base;
964 				if (truncate_size > object_size)
965 					truncate_size = object_size;
966 			}
967 		}
968 		osd_req_op_extent_init(req, which, opcode, objoff, objlen,
969 				       truncate_size, truncate_seq);
970 	}
971 
972 	req->r_abort_on_full = true;
973 	req->r_flags = flags;
974 	req->r_base_oloc.pool = layout->pool_id;
975 	req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
976 	ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
977 
978 	req->r_snapid = vino.snap;
979 	if (flags & CEPH_OSD_FLAG_WRITE)
980 		req->r_data_offset = off;
981 
982 	r = ceph_osdc_alloc_messages(req, GFP_NOFS);
983 	if (r)
984 		goto fail;
985 
986 	return req;
987 
988 fail:
989 	ceph_osdc_put_request(req);
990 	return ERR_PTR(r);
991 }
992 EXPORT_SYMBOL(ceph_osdc_new_request);
993 
994 /*
995  * We keep osd requests in an rbtree, sorted by ->r_tid.
996  */
997 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
998 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
999 
1000 static bool osd_homeless(struct ceph_osd *osd)
1001 {
1002 	return osd->o_osd == CEPH_HOMELESS_OSD;
1003 }
1004 
1005 static bool osd_registered(struct ceph_osd *osd)
1006 {
1007 	verify_osdc_locked(osd->o_osdc);
1008 
1009 	return !RB_EMPTY_NODE(&osd->o_node);
1010 }
1011 
1012 /*
1013  * Assumes @osd is zero-initialized.
1014  */
1015 static void osd_init(struct ceph_osd *osd)
1016 {
1017 	refcount_set(&osd->o_ref, 1);
1018 	RB_CLEAR_NODE(&osd->o_node);
1019 	osd->o_requests = RB_ROOT;
1020 	osd->o_linger_requests = RB_ROOT;
1021 	INIT_LIST_HEAD(&osd->o_osd_lru);
1022 	INIT_LIST_HEAD(&osd->o_keepalive_item);
1023 	osd->o_incarnation = 1;
1024 	mutex_init(&osd->lock);
1025 }
1026 
1027 static void osd_cleanup(struct ceph_osd *osd)
1028 {
1029 	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1030 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1031 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1032 	WARN_ON(!list_empty(&osd->o_osd_lru));
1033 	WARN_ON(!list_empty(&osd->o_keepalive_item));
1034 
1035 	if (osd->o_auth.authorizer) {
1036 		WARN_ON(osd_homeless(osd));
1037 		ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1038 	}
1039 }
1040 
1041 /*
1042  * Track open sessions with osds.
1043  */
1044 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1045 {
1046 	struct ceph_osd *osd;
1047 
1048 	WARN_ON(onum == CEPH_HOMELESS_OSD);
1049 
1050 	osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
1051 	osd_init(osd);
1052 	osd->o_osdc = osdc;
1053 	osd->o_osd = onum;
1054 
1055 	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1056 
1057 	return osd;
1058 }
1059 
1060 static struct ceph_osd *get_osd(struct ceph_osd *osd)
1061 {
1062 	if (refcount_inc_not_zero(&osd->o_ref)) {
1063 		dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
1064 		     refcount_read(&osd->o_ref));
1065 		return osd;
1066 	} else {
1067 		dout("get_osd %p FAIL\n", osd);
1068 		return NULL;
1069 	}
1070 }
1071 
1072 static void put_osd(struct ceph_osd *osd)
1073 {
1074 	dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
1075 	     refcount_read(&osd->o_ref) - 1);
1076 	if (refcount_dec_and_test(&osd->o_ref)) {
1077 		osd_cleanup(osd);
1078 		kfree(osd);
1079 	}
1080 }
1081 
1082 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1083 
1084 static void __move_osd_to_lru(struct ceph_osd *osd)
1085 {
1086 	struct ceph_osd_client *osdc = osd->o_osdc;
1087 
1088 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1089 	BUG_ON(!list_empty(&osd->o_osd_lru));
1090 
1091 	spin_lock(&osdc->osd_lru_lock);
1092 	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1093 	spin_unlock(&osdc->osd_lru_lock);
1094 
1095 	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1096 }
1097 
1098 static void maybe_move_osd_to_lru(struct ceph_osd *osd)
1099 {
1100 	if (RB_EMPTY_ROOT(&osd->o_requests) &&
1101 	    RB_EMPTY_ROOT(&osd->o_linger_requests))
1102 		__move_osd_to_lru(osd);
1103 }
1104 
1105 static void __remove_osd_from_lru(struct ceph_osd *osd)
1106 {
1107 	struct ceph_osd_client *osdc = osd->o_osdc;
1108 
1109 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1110 
1111 	spin_lock(&osdc->osd_lru_lock);
1112 	if (!list_empty(&osd->o_osd_lru))
1113 		list_del_init(&osd->o_osd_lru);
1114 	spin_unlock(&osdc->osd_lru_lock);
1115 }
1116 
1117 /*
1118  * Close the connection and assign any leftover requests to the
1119  * homeless session.
1120  */
1121 static void close_osd(struct ceph_osd *osd)
1122 {
1123 	struct ceph_osd_client *osdc = osd->o_osdc;
1124 	struct rb_node *n;
1125 
1126 	verify_osdc_wrlocked(osdc);
1127 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1128 
1129 	ceph_con_close(&osd->o_con);
1130 
1131 	for (n = rb_first(&osd->o_requests); n; ) {
1132 		struct ceph_osd_request *req =
1133 		    rb_entry(n, struct ceph_osd_request, r_node);
1134 
1135 		n = rb_next(n); /* unlink_request() */
1136 
1137 		dout(" reassigning req %p tid %llu\n", req, req->r_tid);
1138 		unlink_request(osd, req);
1139 		link_request(&osdc->homeless_osd, req);
1140 	}
1141 	for (n = rb_first(&osd->o_linger_requests); n; ) {
1142 		struct ceph_osd_linger_request *lreq =
1143 		    rb_entry(n, struct ceph_osd_linger_request, node);
1144 
1145 		n = rb_next(n); /* unlink_linger() */
1146 
1147 		dout(" reassigning lreq %p linger_id %llu\n", lreq,
1148 		     lreq->linger_id);
1149 		unlink_linger(osd, lreq);
1150 		link_linger(&osdc->homeless_osd, lreq);
1151 	}
1152 
1153 	__remove_osd_from_lru(osd);
1154 	erase_osd(&osdc->osds, osd);
1155 	put_osd(osd);
1156 }
1157 
1158 /*
1159  * reset osd connect
1160  */
1161 static int reopen_osd(struct ceph_osd *osd)
1162 {
1163 	struct ceph_entity_addr *peer_addr;
1164 
1165 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1166 
1167 	if (RB_EMPTY_ROOT(&osd->o_requests) &&
1168 	    RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1169 		close_osd(osd);
1170 		return -ENODEV;
1171 	}
1172 
1173 	peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
1174 	if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1175 			!ceph_con_opened(&osd->o_con)) {
1176 		struct rb_node *n;
1177 
1178 		dout("osd addr hasn't changed and connection never opened, "
1179 		     "letting msgr retry\n");
1180 		/* touch each r_stamp for handle_timeout()'s benfit */
1181 		for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1182 			struct ceph_osd_request *req =
1183 			    rb_entry(n, struct ceph_osd_request, r_node);
1184 			req->r_stamp = jiffies;
1185 		}
1186 
1187 		return -EAGAIN;
1188 	}
1189 
1190 	ceph_con_close(&osd->o_con);
1191 	ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1192 	osd->o_incarnation++;
1193 
1194 	return 0;
1195 }
1196 
1197 static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
1198 					  bool wrlocked)
1199 {
1200 	struct ceph_osd *osd;
1201 
1202 	if (wrlocked)
1203 		verify_osdc_wrlocked(osdc);
1204 	else
1205 		verify_osdc_locked(osdc);
1206 
1207 	if (o != CEPH_HOMELESS_OSD)
1208 		osd = lookup_osd(&osdc->osds, o);
1209 	else
1210 		osd = &osdc->homeless_osd;
1211 	if (!osd) {
1212 		if (!wrlocked)
1213 			return ERR_PTR(-EAGAIN);
1214 
1215 		osd = create_osd(osdc, o);
1216 		insert_osd(&osdc->osds, osd);
1217 		ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1218 			      &osdc->osdmap->osd_addr[osd->o_osd]);
1219 	}
1220 
1221 	dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1222 	return osd;
1223 }
1224 
1225 /*
1226  * Create request <-> OSD session relation.
1227  *
1228  * @req has to be assigned a tid, @osd may be homeless.
1229  */
1230 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1231 {
1232 	verify_osd_locked(osd);
1233 	WARN_ON(!req->r_tid || req->r_osd);
1234 	dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1235 	     req, req->r_tid);
1236 
1237 	if (!osd_homeless(osd))
1238 		__remove_osd_from_lru(osd);
1239 	else
1240 		atomic_inc(&osd->o_osdc->num_homeless);
1241 
1242 	get_osd(osd);
1243 	insert_request(&osd->o_requests, req);
1244 	req->r_osd = osd;
1245 }
1246 
1247 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1248 {
1249 	verify_osd_locked(osd);
1250 	WARN_ON(req->r_osd != osd);
1251 	dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1252 	     req, req->r_tid);
1253 
1254 	req->r_osd = NULL;
1255 	erase_request(&osd->o_requests, req);
1256 	put_osd(osd);
1257 
1258 	if (!osd_homeless(osd))
1259 		maybe_move_osd_to_lru(osd);
1260 	else
1261 		atomic_dec(&osd->o_osdc->num_homeless);
1262 }
1263 
1264 static bool __pool_full(struct ceph_pg_pool_info *pi)
1265 {
1266 	return pi->flags & CEPH_POOL_FLAG_FULL;
1267 }
1268 
1269 static bool have_pool_full(struct ceph_osd_client *osdc)
1270 {
1271 	struct rb_node *n;
1272 
1273 	for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
1274 		struct ceph_pg_pool_info *pi =
1275 		    rb_entry(n, struct ceph_pg_pool_info, node);
1276 
1277 		if (__pool_full(pi))
1278 			return true;
1279 	}
1280 
1281 	return false;
1282 }
1283 
1284 static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
1285 {
1286 	struct ceph_pg_pool_info *pi;
1287 
1288 	pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
1289 	if (!pi)
1290 		return false;
1291 
1292 	return __pool_full(pi);
1293 }
1294 
1295 /*
1296  * Returns whether a request should be blocked from being sent
1297  * based on the current osdmap and osd_client settings.
1298  */
1299 static bool target_should_be_paused(struct ceph_osd_client *osdc,
1300 				    const struct ceph_osd_request_target *t,
1301 				    struct ceph_pg_pool_info *pi)
1302 {
1303 	bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
1304 	bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
1305 		       ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1306 		       __pool_full(pi);
1307 
1308 	WARN_ON(pi->id != t->base_oloc.pool);
1309 	return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
1310 	       ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
1311 	       (osdc->osdmap->epoch < osdc->epoch_barrier);
1312 }
1313 
1314 enum calc_target_result {
1315 	CALC_TARGET_NO_ACTION = 0,
1316 	CALC_TARGET_NEED_RESEND,
1317 	CALC_TARGET_POOL_DNE,
1318 };
1319 
1320 static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1321 					   struct ceph_osd_request_target *t,
1322 					   struct ceph_connection *con,
1323 					   bool any_change)
1324 {
1325 	struct ceph_pg_pool_info *pi;
1326 	struct ceph_pg pgid, last_pgid;
1327 	struct ceph_osds up, acting;
1328 	bool force_resend = false;
1329 	bool unpaused = false;
1330 	bool legacy_change;
1331 	bool split = false;
1332 	bool need_check_tiering = false;
1333 	bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
1334 	enum calc_target_result ct_res;
1335 	int ret;
1336 
1337 	pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1338 	if (!pi) {
1339 		t->osd = CEPH_HOMELESS_OSD;
1340 		ct_res = CALC_TARGET_POOL_DNE;
1341 		goto out;
1342 	}
1343 
1344 	if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1345 		if (t->last_force_resend < pi->last_force_request_resend) {
1346 			t->last_force_resend = pi->last_force_request_resend;
1347 			force_resend = true;
1348 		} else if (t->last_force_resend == 0) {
1349 			force_resend = true;
1350 		}
1351 	}
1352 	if (ceph_oid_empty(&t->target_oid) || force_resend) {
1353 		ceph_oid_copy(&t->target_oid, &t->base_oid);
1354 		need_check_tiering = true;
1355 	}
1356 	if (ceph_oloc_empty(&t->target_oloc) || force_resend) {
1357 		ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1358 		need_check_tiering = true;
1359 	}
1360 
1361 	if (need_check_tiering &&
1362 	    (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1363 		if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
1364 			t->target_oloc.pool = pi->read_tier;
1365 		if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
1366 			t->target_oloc.pool = pi->write_tier;
1367 	}
1368 
1369 	ret = ceph_object_locator_to_pg(osdc->osdmap, &t->target_oid,
1370 					&t->target_oloc, &pgid);
1371 	if (ret) {
1372 		WARN_ON(ret != -ENOENT);
1373 		t->osd = CEPH_HOMELESS_OSD;
1374 		ct_res = CALC_TARGET_POOL_DNE;
1375 		goto out;
1376 	}
1377 	last_pgid.pool = pgid.pool;
1378 	last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1379 
1380 	ceph_pg_to_up_acting_osds(osdc->osdmap, &pgid, &up, &acting);
1381 	if (any_change &&
1382 	    ceph_is_new_interval(&t->acting,
1383 				 &acting,
1384 				 &t->up,
1385 				 &up,
1386 				 t->size,
1387 				 pi->size,
1388 				 t->min_size,
1389 				 pi->min_size,
1390 				 t->pg_num,
1391 				 pi->pg_num,
1392 				 t->sort_bitwise,
1393 				 sort_bitwise,
1394 				 &last_pgid))
1395 		force_resend = true;
1396 
1397 	if (t->paused && !target_should_be_paused(osdc, t, pi)) {
1398 		t->paused = false;
1399 		unpaused = true;
1400 	}
1401 	legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
1402 			ceph_osds_changed(&t->acting, &acting, any_change);
1403 	if (t->pg_num)
1404 		split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
1405 
1406 	if (legacy_change || force_resend || split) {
1407 		t->pgid = pgid; /* struct */
1408 		ceph_pg_to_primary_shard(osdc->osdmap, &pgid, &t->spgid);
1409 		ceph_osds_copy(&t->acting, &acting);
1410 		ceph_osds_copy(&t->up, &up);
1411 		t->size = pi->size;
1412 		t->min_size = pi->min_size;
1413 		t->pg_num = pi->pg_num;
1414 		t->pg_num_mask = pi->pg_num_mask;
1415 		t->sort_bitwise = sort_bitwise;
1416 
1417 		t->osd = acting.primary;
1418 	}
1419 
1420 	if (unpaused || legacy_change || force_resend ||
1421 	    (split && con && CEPH_HAVE_FEATURE(con->peer_features,
1422 					       RESEND_ON_SPLIT)))
1423 		ct_res = CALC_TARGET_NEED_RESEND;
1424 	else
1425 		ct_res = CALC_TARGET_NO_ACTION;
1426 
1427 out:
1428 	dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd);
1429 	return ct_res;
1430 }
1431 
1432 static void setup_request_data(struct ceph_osd_request *req,
1433 			       struct ceph_msg *msg)
1434 {
1435 	u32 data_len = 0;
1436 	int i;
1437 
1438 	if (!list_empty(&msg->data))
1439 		return;
1440 
1441 	WARN_ON(msg->data_length);
1442 	for (i = 0; i < req->r_num_ops; i++) {
1443 		struct ceph_osd_req_op *op = &req->r_ops[i];
1444 
1445 		switch (op->op) {
1446 		/* request */
1447 		case CEPH_OSD_OP_WRITE:
1448 		case CEPH_OSD_OP_WRITEFULL:
1449 			WARN_ON(op->indata_len != op->extent.length);
1450 			ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
1451 			break;
1452 		case CEPH_OSD_OP_SETXATTR:
1453 		case CEPH_OSD_OP_CMPXATTR:
1454 			WARN_ON(op->indata_len != op->xattr.name_len +
1455 						  op->xattr.value_len);
1456 			ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
1457 			break;
1458 		case CEPH_OSD_OP_NOTIFY_ACK:
1459 			ceph_osdc_msg_data_add(msg,
1460 					       &op->notify_ack.request_data);
1461 			break;
1462 
1463 		/* reply */
1464 		case CEPH_OSD_OP_STAT:
1465 			ceph_osdc_msg_data_add(req->r_reply,
1466 					       &op->raw_data_in);
1467 			break;
1468 		case CEPH_OSD_OP_READ:
1469 			ceph_osdc_msg_data_add(req->r_reply,
1470 					       &op->extent.osd_data);
1471 			break;
1472 		case CEPH_OSD_OP_LIST_WATCHERS:
1473 			ceph_osdc_msg_data_add(req->r_reply,
1474 					       &op->list_watchers.response_data);
1475 			break;
1476 
1477 		/* both */
1478 		case CEPH_OSD_OP_CALL:
1479 			WARN_ON(op->indata_len != op->cls.class_len +
1480 						  op->cls.method_len +
1481 						  op->cls.indata_len);
1482 			ceph_osdc_msg_data_add(msg, &op->cls.request_info);
1483 			/* optional, can be NONE */
1484 			ceph_osdc_msg_data_add(msg, &op->cls.request_data);
1485 			/* optional, can be NONE */
1486 			ceph_osdc_msg_data_add(req->r_reply,
1487 					       &op->cls.response_data);
1488 			break;
1489 		case CEPH_OSD_OP_NOTIFY:
1490 			ceph_osdc_msg_data_add(msg,
1491 					       &op->notify.request_data);
1492 			ceph_osdc_msg_data_add(req->r_reply,
1493 					       &op->notify.response_data);
1494 			break;
1495 		}
1496 
1497 		data_len += op->indata_len;
1498 	}
1499 
1500 	WARN_ON(data_len != msg->data_length);
1501 }
1502 
1503 static void encode_pgid(void **p, const struct ceph_pg *pgid)
1504 {
1505 	ceph_encode_8(p, 1);
1506 	ceph_encode_64(p, pgid->pool);
1507 	ceph_encode_32(p, pgid->seed);
1508 	ceph_encode_32(p, -1); /* preferred */
1509 }
1510 
1511 static void encode_spgid(void **p, const struct ceph_spg *spgid)
1512 {
1513 	ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
1514 	encode_pgid(p, &spgid->pgid);
1515 	ceph_encode_8(p, spgid->shard);
1516 }
1517 
1518 static void encode_oloc(void **p, void *end,
1519 			const struct ceph_object_locator *oloc)
1520 {
1521 	ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
1522 	ceph_encode_64(p, oloc->pool);
1523 	ceph_encode_32(p, -1); /* preferred */
1524 	ceph_encode_32(p, 0);  /* key len */
1525 	if (oloc->pool_ns)
1526 		ceph_encode_string(p, end, oloc->pool_ns->str,
1527 				   oloc->pool_ns->len);
1528 	else
1529 		ceph_encode_32(p, 0);
1530 }
1531 
1532 static void encode_request_partial(struct ceph_osd_request *req,
1533 				   struct ceph_msg *msg)
1534 {
1535 	void *p = msg->front.iov_base;
1536 	void *const end = p + msg->front_alloc_len;
1537 	u32 data_len = 0;
1538 	int i;
1539 
1540 	if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
1541 		/* snapshots aren't writeable */
1542 		WARN_ON(req->r_snapid != CEPH_NOSNAP);
1543 	} else {
1544 		WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
1545 			req->r_data_offset || req->r_snapc);
1546 	}
1547 
1548 	setup_request_data(req, msg);
1549 
1550 	encode_spgid(&p, &req->r_t.spgid); /* actual spg */
1551 	ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
1552 	ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
1553 	ceph_encode_32(&p, req->r_flags);
1554 
1555 	/* reqid */
1556 	ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
1557 	memset(p, 0, sizeof(struct ceph_osd_reqid));
1558 	p += sizeof(struct ceph_osd_reqid);
1559 
1560 	/* trace */
1561 	memset(p, 0, sizeof(struct ceph_blkin_trace_info));
1562 	p += sizeof(struct ceph_blkin_trace_info);
1563 
1564 	ceph_encode_32(&p, 0); /* client_inc, always 0 */
1565 	ceph_encode_timespec(p, &req->r_mtime);
1566 	p += sizeof(struct ceph_timespec);
1567 
1568 	encode_oloc(&p, end, &req->r_t.target_oloc);
1569 	ceph_encode_string(&p, end, req->r_t.target_oid.name,
1570 			   req->r_t.target_oid.name_len);
1571 
1572 	/* ops, can imply data */
1573 	ceph_encode_16(&p, req->r_num_ops);
1574 	for (i = 0; i < req->r_num_ops; i++) {
1575 		data_len += osd_req_encode_op(p, &req->r_ops[i]);
1576 		p += sizeof(struct ceph_osd_op);
1577 	}
1578 
1579 	ceph_encode_64(&p, req->r_snapid); /* snapid */
1580 	if (req->r_snapc) {
1581 		ceph_encode_64(&p, req->r_snapc->seq);
1582 		ceph_encode_32(&p, req->r_snapc->num_snaps);
1583 		for (i = 0; i < req->r_snapc->num_snaps; i++)
1584 			ceph_encode_64(&p, req->r_snapc->snaps[i]);
1585 	} else {
1586 		ceph_encode_64(&p, 0); /* snap_seq */
1587 		ceph_encode_32(&p, 0); /* snaps len */
1588 	}
1589 
1590 	ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
1591 	BUG_ON(p != end - 8); /* space for features */
1592 
1593 	msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
1594 	/* front_len is finalized in encode_request_finish() */
1595 	msg->hdr.data_len = cpu_to_le32(data_len);
1596 	/*
1597 	 * The header "data_off" is a hint to the receiver allowing it
1598 	 * to align received data into its buffers such that there's no
1599 	 * need to re-copy it before writing it to disk (direct I/O).
1600 	 */
1601 	msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
1602 
1603 	dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
1604 	     req->r_t.target_oid.name, req->r_t.target_oid.name_len);
1605 }
1606 
1607 static void encode_request_finish(struct ceph_msg *msg)
1608 {
1609 	void *p = msg->front.iov_base;
1610 	void *const end = p + msg->front_alloc_len;
1611 
1612 	if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
1613 		/* luminous OSD -- encode features and be done */
1614 		p = end - 8;
1615 		ceph_encode_64(&p, msg->con->peer_features);
1616 	} else {
1617 		struct {
1618 			char spgid[CEPH_ENCODING_START_BLK_LEN +
1619 				   CEPH_PGID_ENCODING_LEN + 1];
1620 			__le32 hash;
1621 			__le32 epoch;
1622 			__le32 flags;
1623 			char reqid[CEPH_ENCODING_START_BLK_LEN +
1624 				   sizeof(struct ceph_osd_reqid)];
1625 			char trace[sizeof(struct ceph_blkin_trace_info)];
1626 			__le32 client_inc;
1627 			struct ceph_timespec mtime;
1628 		} __packed head;
1629 		struct ceph_pg pgid;
1630 		void *oloc, *oid, *tail;
1631 		int oloc_len, oid_len, tail_len;
1632 		int len;
1633 
1634 		/*
1635 		 * Pre-luminous OSD -- reencode v8 into v4 using @head
1636 		 * as a temporary buffer.  Encode the raw PG; the rest
1637 		 * is just a matter of moving oloc, oid and tail blobs
1638 		 * around.
1639 		 */
1640 		memcpy(&head, p, sizeof(head));
1641 		p += sizeof(head);
1642 
1643 		oloc = p;
1644 		p += CEPH_ENCODING_START_BLK_LEN;
1645 		pgid.pool = ceph_decode_64(&p);
1646 		p += 4 + 4; /* preferred, key len */
1647 		len = ceph_decode_32(&p);
1648 		p += len;   /* nspace */
1649 		oloc_len = p - oloc;
1650 
1651 		oid = p;
1652 		len = ceph_decode_32(&p);
1653 		p += len;
1654 		oid_len = p - oid;
1655 
1656 		tail = p;
1657 		tail_len = (end - p) - 8;
1658 
1659 		p = msg->front.iov_base;
1660 		ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
1661 		ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
1662 		ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
1663 		ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
1664 
1665 		/* reassert_version */
1666 		memset(p, 0, sizeof(struct ceph_eversion));
1667 		p += sizeof(struct ceph_eversion);
1668 
1669 		BUG_ON(p >= oloc);
1670 		memmove(p, oloc, oloc_len);
1671 		p += oloc_len;
1672 
1673 		pgid.seed = le32_to_cpu(head.hash);
1674 		encode_pgid(&p, &pgid); /* raw pg */
1675 
1676 		BUG_ON(p >= oid);
1677 		memmove(p, oid, oid_len);
1678 		p += oid_len;
1679 
1680 		/* tail -- ops, snapid, snapc, retry_attempt */
1681 		BUG_ON(p >= tail);
1682 		memmove(p, tail, tail_len);
1683 		p += tail_len;
1684 
1685 		msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
1686 	}
1687 
1688 	BUG_ON(p > end);
1689 	msg->front.iov_len = p - msg->front.iov_base;
1690 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1691 
1692 	dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
1693 	     le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
1694 	     le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
1695 	     le16_to_cpu(msg->hdr.version));
1696 }
1697 
1698 /*
1699  * @req has to be assigned a tid and registered.
1700  */
1701 static void send_request(struct ceph_osd_request *req)
1702 {
1703 	struct ceph_osd *osd = req->r_osd;
1704 
1705 	verify_osd_locked(osd);
1706 	WARN_ON(osd->o_osd != req->r_t.osd);
1707 
1708 	/*
1709 	 * We may have a previously queued request message hanging
1710 	 * around.  Cancel it to avoid corrupting the msgr.
1711 	 */
1712 	if (req->r_sent)
1713 		ceph_msg_revoke(req->r_request);
1714 
1715 	req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
1716 	if (req->r_attempts)
1717 		req->r_flags |= CEPH_OSD_FLAG_RETRY;
1718 	else
1719 		WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
1720 
1721 	encode_request_partial(req, req->r_request);
1722 
1723 	dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d flags 0x%x attempt %d\n",
1724 	     __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
1725 	     req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
1726 	     req->r_t.spgid.shard, osd->o_osd, req->r_flags, req->r_attempts);
1727 
1728 	req->r_t.paused = false;
1729 	req->r_stamp = jiffies;
1730 	req->r_attempts++;
1731 
1732 	req->r_sent = osd->o_incarnation;
1733 	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
1734 	ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
1735 }
1736 
1737 static void maybe_request_map(struct ceph_osd_client *osdc)
1738 {
1739 	bool continuous = false;
1740 
1741 	verify_osdc_locked(osdc);
1742 	WARN_ON(!osdc->osdmap->epoch);
1743 
1744 	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1745 	    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
1746 	    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
1747 		dout("%s osdc %p continuous\n", __func__, osdc);
1748 		continuous = true;
1749 	} else {
1750 		dout("%s osdc %p onetime\n", __func__, osdc);
1751 	}
1752 
1753 	if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
1754 			       osdc->osdmap->epoch + 1, continuous))
1755 		ceph_monc_renew_subs(&osdc->client->monc);
1756 }
1757 
1758 static void complete_request(struct ceph_osd_request *req, int err);
1759 static void send_map_check(struct ceph_osd_request *req);
1760 
1761 static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
1762 {
1763 	struct ceph_osd_client *osdc = req->r_osdc;
1764 	struct ceph_osd *osd;
1765 	enum calc_target_result ct_res;
1766 	bool need_send = false;
1767 	bool promoted = false;
1768 	bool need_abort = false;
1769 
1770 	WARN_ON(req->r_tid);
1771 	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
1772 
1773 again:
1774 	ct_res = calc_target(osdc, &req->r_t, NULL, false);
1775 	if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
1776 		goto promote;
1777 
1778 	osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
1779 	if (IS_ERR(osd)) {
1780 		WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
1781 		goto promote;
1782 	}
1783 
1784 	if (osdc->osdmap->epoch < osdc->epoch_barrier) {
1785 		dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
1786 		     osdc->epoch_barrier);
1787 		req->r_t.paused = true;
1788 		maybe_request_map(osdc);
1789 	} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1790 		   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
1791 		dout("req %p pausewr\n", req);
1792 		req->r_t.paused = true;
1793 		maybe_request_map(osdc);
1794 	} else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
1795 		   ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
1796 		dout("req %p pauserd\n", req);
1797 		req->r_t.paused = true;
1798 		maybe_request_map(osdc);
1799 	} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
1800 		   !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
1801 				     CEPH_OSD_FLAG_FULL_FORCE)) &&
1802 		   (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1803 		    pool_full(osdc, req->r_t.base_oloc.pool))) {
1804 		dout("req %p full/pool_full\n", req);
1805 		pr_warn_ratelimited("FULL or reached pool quota\n");
1806 		req->r_t.paused = true;
1807 		maybe_request_map(osdc);
1808 		if (req->r_abort_on_full)
1809 			need_abort = true;
1810 	} else if (!osd_homeless(osd)) {
1811 		need_send = true;
1812 	} else {
1813 		maybe_request_map(osdc);
1814 	}
1815 
1816 	mutex_lock(&osd->lock);
1817 	/*
1818 	 * Assign the tid atomically with send_request() to protect
1819 	 * multiple writes to the same object from racing with each
1820 	 * other, resulting in out of order ops on the OSDs.
1821 	 */
1822 	req->r_tid = atomic64_inc_return(&osdc->last_tid);
1823 	link_request(osd, req);
1824 	if (need_send)
1825 		send_request(req);
1826 	else if (need_abort)
1827 		complete_request(req, -ENOSPC);
1828 	mutex_unlock(&osd->lock);
1829 
1830 	if (ct_res == CALC_TARGET_POOL_DNE)
1831 		send_map_check(req);
1832 
1833 	if (promoted)
1834 		downgrade_write(&osdc->lock);
1835 	return;
1836 
1837 promote:
1838 	up_read(&osdc->lock);
1839 	down_write(&osdc->lock);
1840 	wrlocked = true;
1841 	promoted = true;
1842 	goto again;
1843 }
1844 
1845 static void account_request(struct ceph_osd_request *req)
1846 {
1847 	WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
1848 	WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
1849 
1850 	req->r_flags |= CEPH_OSD_FLAG_ONDISK;
1851 	atomic_inc(&req->r_osdc->num_requests);
1852 
1853 	req->r_start_stamp = jiffies;
1854 }
1855 
1856 static void submit_request(struct ceph_osd_request *req, bool wrlocked)
1857 {
1858 	ceph_osdc_get_request(req);
1859 	account_request(req);
1860 	__submit_request(req, wrlocked);
1861 }
1862 
1863 static void finish_request(struct ceph_osd_request *req)
1864 {
1865 	struct ceph_osd_client *osdc = req->r_osdc;
1866 	struct ceph_osd *osd = req->r_osd;
1867 
1868 	verify_osd_locked(osd);
1869 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
1870 
1871 	WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
1872 	unlink_request(osd, req);
1873 	atomic_dec(&osdc->num_requests);
1874 
1875 	/*
1876 	 * If an OSD has failed or returned and a request has been sent
1877 	 * twice, it's possible to get a reply and end up here while the
1878 	 * request message is queued for delivery.  We will ignore the
1879 	 * reply, so not a big deal, but better to try and catch it.
1880 	 */
1881 	ceph_msg_revoke(req->r_request);
1882 	ceph_msg_revoke_incoming(req->r_reply);
1883 }
1884 
1885 static void __complete_request(struct ceph_osd_request *req)
1886 {
1887 	if (req->r_callback) {
1888 		dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
1889 		     req->r_tid, req->r_callback, req->r_result);
1890 		req->r_callback(req);
1891 	}
1892 }
1893 
1894 /*
1895  * This is open-coded in handle_reply().
1896  */
1897 static void complete_request(struct ceph_osd_request *req, int err)
1898 {
1899 	dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
1900 
1901 	req->r_result = err;
1902 	finish_request(req);
1903 	__complete_request(req);
1904 	complete_all(&req->r_completion);
1905 	ceph_osdc_put_request(req);
1906 }
1907 
1908 static void cancel_map_check(struct ceph_osd_request *req)
1909 {
1910 	struct ceph_osd_client *osdc = req->r_osdc;
1911 	struct ceph_osd_request *lookup_req;
1912 
1913 	verify_osdc_wrlocked(osdc);
1914 
1915 	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
1916 	if (!lookup_req)
1917 		return;
1918 
1919 	WARN_ON(lookup_req != req);
1920 	erase_request_mc(&osdc->map_checks, req);
1921 	ceph_osdc_put_request(req);
1922 }
1923 
1924 static void cancel_request(struct ceph_osd_request *req)
1925 {
1926 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
1927 
1928 	cancel_map_check(req);
1929 	finish_request(req);
1930 	complete_all(&req->r_completion);
1931 	ceph_osdc_put_request(req);
1932 }
1933 
1934 static void abort_request(struct ceph_osd_request *req, int err)
1935 {
1936 	dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
1937 
1938 	cancel_map_check(req);
1939 	complete_request(req, err);
1940 }
1941 
1942 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
1943 {
1944 	if (likely(eb > osdc->epoch_barrier)) {
1945 		dout("updating epoch_barrier from %u to %u\n",
1946 				osdc->epoch_barrier, eb);
1947 		osdc->epoch_barrier = eb;
1948 		/* Request map if we're not to the barrier yet */
1949 		if (eb > osdc->osdmap->epoch)
1950 			maybe_request_map(osdc);
1951 	}
1952 }
1953 
1954 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
1955 {
1956 	down_read(&osdc->lock);
1957 	if (unlikely(eb > osdc->epoch_barrier)) {
1958 		up_read(&osdc->lock);
1959 		down_write(&osdc->lock);
1960 		update_epoch_barrier(osdc, eb);
1961 		up_write(&osdc->lock);
1962 	} else {
1963 		up_read(&osdc->lock);
1964 	}
1965 }
1966 EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
1967 
1968 /*
1969  * Drop all pending requests that are stalled waiting on a full condition to
1970  * clear, and complete them with ENOSPC as the return code. Set the
1971  * osdc->epoch_barrier to the latest map epoch that we've seen if any were
1972  * cancelled.
1973  */
1974 static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
1975 {
1976 	struct rb_node *n;
1977 	bool victims = false;
1978 
1979 	dout("enter abort_on_full\n");
1980 
1981 	if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
1982 		goto out;
1983 
1984 	/* Scan list and see if there is anything to abort */
1985 	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
1986 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
1987 		struct rb_node *m;
1988 
1989 		m = rb_first(&osd->o_requests);
1990 		while (m) {
1991 			struct ceph_osd_request *req = rb_entry(m,
1992 					struct ceph_osd_request, r_node);
1993 			m = rb_next(m);
1994 
1995 			if (req->r_abort_on_full) {
1996 				victims = true;
1997 				break;
1998 			}
1999 		}
2000 		if (victims)
2001 			break;
2002 	}
2003 
2004 	if (!victims)
2005 		goto out;
2006 
2007 	/*
2008 	 * Update the barrier to current epoch if it's behind that point,
2009 	 * since we know we have some calls to be aborted in the tree.
2010 	 */
2011 	update_epoch_barrier(osdc, osdc->osdmap->epoch);
2012 
2013 	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2014 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2015 		struct rb_node *m;
2016 
2017 		m = rb_first(&osd->o_requests);
2018 		while (m) {
2019 			struct ceph_osd_request *req = rb_entry(m,
2020 					struct ceph_osd_request, r_node);
2021 			m = rb_next(m);
2022 
2023 			if (req->r_abort_on_full &&
2024 			    (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2025 			     pool_full(osdc, req->r_t.target_oloc.pool)))
2026 				abort_request(req, -ENOSPC);
2027 		}
2028 	}
2029 out:
2030 	dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
2031 }
2032 
2033 static void check_pool_dne(struct ceph_osd_request *req)
2034 {
2035 	struct ceph_osd_client *osdc = req->r_osdc;
2036 	struct ceph_osdmap *map = osdc->osdmap;
2037 
2038 	verify_osdc_wrlocked(osdc);
2039 	WARN_ON(!map->epoch);
2040 
2041 	if (req->r_attempts) {
2042 		/*
2043 		 * We sent a request earlier, which means that
2044 		 * previously the pool existed, and now it does not
2045 		 * (i.e., it was deleted).
2046 		 */
2047 		req->r_map_dne_bound = map->epoch;
2048 		dout("%s req %p tid %llu pool disappeared\n", __func__, req,
2049 		     req->r_tid);
2050 	} else {
2051 		dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
2052 		     req, req->r_tid, req->r_map_dne_bound, map->epoch);
2053 	}
2054 
2055 	if (req->r_map_dne_bound) {
2056 		if (map->epoch >= req->r_map_dne_bound) {
2057 			/* we had a new enough map */
2058 			pr_info_ratelimited("tid %llu pool does not exist\n",
2059 					    req->r_tid);
2060 			complete_request(req, -ENOENT);
2061 		}
2062 	} else {
2063 		send_map_check(req);
2064 	}
2065 }
2066 
2067 static void map_check_cb(struct ceph_mon_generic_request *greq)
2068 {
2069 	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
2070 	struct ceph_osd_request *req;
2071 	u64 tid = greq->private_data;
2072 
2073 	WARN_ON(greq->result || !greq->u.newest);
2074 
2075 	down_write(&osdc->lock);
2076 	req = lookup_request_mc(&osdc->map_checks, tid);
2077 	if (!req) {
2078 		dout("%s tid %llu dne\n", __func__, tid);
2079 		goto out_unlock;
2080 	}
2081 
2082 	dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
2083 	     req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
2084 	if (!req->r_map_dne_bound)
2085 		req->r_map_dne_bound = greq->u.newest;
2086 	erase_request_mc(&osdc->map_checks, req);
2087 	check_pool_dne(req);
2088 
2089 	ceph_osdc_put_request(req);
2090 out_unlock:
2091 	up_write(&osdc->lock);
2092 }
2093 
2094 static void send_map_check(struct ceph_osd_request *req)
2095 {
2096 	struct ceph_osd_client *osdc = req->r_osdc;
2097 	struct ceph_osd_request *lookup_req;
2098 	int ret;
2099 
2100 	verify_osdc_wrlocked(osdc);
2101 
2102 	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2103 	if (lookup_req) {
2104 		WARN_ON(lookup_req != req);
2105 		return;
2106 	}
2107 
2108 	ceph_osdc_get_request(req);
2109 	insert_request_mc(&osdc->map_checks, req);
2110 	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
2111 					  map_check_cb, req->r_tid);
2112 	WARN_ON(ret);
2113 }
2114 
2115 /*
2116  * lingering requests, watch/notify v2 infrastructure
2117  */
2118 static void linger_release(struct kref *kref)
2119 {
2120 	struct ceph_osd_linger_request *lreq =
2121 	    container_of(kref, struct ceph_osd_linger_request, kref);
2122 
2123 	dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
2124 	     lreq->reg_req, lreq->ping_req);
2125 	WARN_ON(!RB_EMPTY_NODE(&lreq->node));
2126 	WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
2127 	WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
2128 	WARN_ON(!list_empty(&lreq->scan_item));
2129 	WARN_ON(!list_empty(&lreq->pending_lworks));
2130 	WARN_ON(lreq->osd);
2131 
2132 	if (lreq->reg_req)
2133 		ceph_osdc_put_request(lreq->reg_req);
2134 	if (lreq->ping_req)
2135 		ceph_osdc_put_request(lreq->ping_req);
2136 	target_destroy(&lreq->t);
2137 	kfree(lreq);
2138 }
2139 
2140 static void linger_put(struct ceph_osd_linger_request *lreq)
2141 {
2142 	if (lreq)
2143 		kref_put(&lreq->kref, linger_release);
2144 }
2145 
2146 static struct ceph_osd_linger_request *
2147 linger_get(struct ceph_osd_linger_request *lreq)
2148 {
2149 	kref_get(&lreq->kref);
2150 	return lreq;
2151 }
2152 
2153 static struct ceph_osd_linger_request *
2154 linger_alloc(struct ceph_osd_client *osdc)
2155 {
2156 	struct ceph_osd_linger_request *lreq;
2157 
2158 	lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
2159 	if (!lreq)
2160 		return NULL;
2161 
2162 	kref_init(&lreq->kref);
2163 	mutex_init(&lreq->lock);
2164 	RB_CLEAR_NODE(&lreq->node);
2165 	RB_CLEAR_NODE(&lreq->osdc_node);
2166 	RB_CLEAR_NODE(&lreq->mc_node);
2167 	INIT_LIST_HEAD(&lreq->scan_item);
2168 	INIT_LIST_HEAD(&lreq->pending_lworks);
2169 	init_completion(&lreq->reg_commit_wait);
2170 	init_completion(&lreq->notify_finish_wait);
2171 
2172 	lreq->osdc = osdc;
2173 	target_init(&lreq->t);
2174 
2175 	dout("%s lreq %p\n", __func__, lreq);
2176 	return lreq;
2177 }
2178 
2179 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
2180 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
2181 DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
2182 
2183 /*
2184  * Create linger request <-> OSD session relation.
2185  *
2186  * @lreq has to be registered, @osd may be homeless.
2187  */
2188 static void link_linger(struct ceph_osd *osd,
2189 			struct ceph_osd_linger_request *lreq)
2190 {
2191 	verify_osd_locked(osd);
2192 	WARN_ON(!lreq->linger_id || lreq->osd);
2193 	dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2194 	     osd->o_osd, lreq, lreq->linger_id);
2195 
2196 	if (!osd_homeless(osd))
2197 		__remove_osd_from_lru(osd);
2198 	else
2199 		atomic_inc(&osd->o_osdc->num_homeless);
2200 
2201 	get_osd(osd);
2202 	insert_linger(&osd->o_linger_requests, lreq);
2203 	lreq->osd = osd;
2204 }
2205 
2206 static void unlink_linger(struct ceph_osd *osd,
2207 			  struct ceph_osd_linger_request *lreq)
2208 {
2209 	verify_osd_locked(osd);
2210 	WARN_ON(lreq->osd != osd);
2211 	dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2212 	     osd->o_osd, lreq, lreq->linger_id);
2213 
2214 	lreq->osd = NULL;
2215 	erase_linger(&osd->o_linger_requests, lreq);
2216 	put_osd(osd);
2217 
2218 	if (!osd_homeless(osd))
2219 		maybe_move_osd_to_lru(osd);
2220 	else
2221 		atomic_dec(&osd->o_osdc->num_homeless);
2222 }
2223 
2224 static bool __linger_registered(struct ceph_osd_linger_request *lreq)
2225 {
2226 	verify_osdc_locked(lreq->osdc);
2227 
2228 	return !RB_EMPTY_NODE(&lreq->osdc_node);
2229 }
2230 
2231 static bool linger_registered(struct ceph_osd_linger_request *lreq)
2232 {
2233 	struct ceph_osd_client *osdc = lreq->osdc;
2234 	bool registered;
2235 
2236 	down_read(&osdc->lock);
2237 	registered = __linger_registered(lreq);
2238 	up_read(&osdc->lock);
2239 
2240 	return registered;
2241 }
2242 
2243 static void linger_register(struct ceph_osd_linger_request *lreq)
2244 {
2245 	struct ceph_osd_client *osdc = lreq->osdc;
2246 
2247 	verify_osdc_wrlocked(osdc);
2248 	WARN_ON(lreq->linger_id);
2249 
2250 	linger_get(lreq);
2251 	lreq->linger_id = ++osdc->last_linger_id;
2252 	insert_linger_osdc(&osdc->linger_requests, lreq);
2253 }
2254 
2255 static void linger_unregister(struct ceph_osd_linger_request *lreq)
2256 {
2257 	struct ceph_osd_client *osdc = lreq->osdc;
2258 
2259 	verify_osdc_wrlocked(osdc);
2260 
2261 	erase_linger_osdc(&osdc->linger_requests, lreq);
2262 	linger_put(lreq);
2263 }
2264 
2265 static void cancel_linger_request(struct ceph_osd_request *req)
2266 {
2267 	struct ceph_osd_linger_request *lreq = req->r_priv;
2268 
2269 	WARN_ON(!req->r_linger);
2270 	cancel_request(req);
2271 	linger_put(lreq);
2272 }
2273 
2274 struct linger_work {
2275 	struct work_struct work;
2276 	struct ceph_osd_linger_request *lreq;
2277 	struct list_head pending_item;
2278 	unsigned long queued_stamp;
2279 
2280 	union {
2281 		struct {
2282 			u64 notify_id;
2283 			u64 notifier_id;
2284 			void *payload; /* points into @msg front */
2285 			size_t payload_len;
2286 
2287 			struct ceph_msg *msg; /* for ceph_msg_put() */
2288 		} notify;
2289 		struct {
2290 			int err;
2291 		} error;
2292 	};
2293 };
2294 
2295 static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
2296 				       work_func_t workfn)
2297 {
2298 	struct linger_work *lwork;
2299 
2300 	lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
2301 	if (!lwork)
2302 		return NULL;
2303 
2304 	INIT_WORK(&lwork->work, workfn);
2305 	INIT_LIST_HEAD(&lwork->pending_item);
2306 	lwork->lreq = linger_get(lreq);
2307 
2308 	return lwork;
2309 }
2310 
2311 static void lwork_free(struct linger_work *lwork)
2312 {
2313 	struct ceph_osd_linger_request *lreq = lwork->lreq;
2314 
2315 	mutex_lock(&lreq->lock);
2316 	list_del(&lwork->pending_item);
2317 	mutex_unlock(&lreq->lock);
2318 
2319 	linger_put(lreq);
2320 	kfree(lwork);
2321 }
2322 
2323 static void lwork_queue(struct linger_work *lwork)
2324 {
2325 	struct ceph_osd_linger_request *lreq = lwork->lreq;
2326 	struct ceph_osd_client *osdc = lreq->osdc;
2327 
2328 	verify_lreq_locked(lreq);
2329 	WARN_ON(!list_empty(&lwork->pending_item));
2330 
2331 	lwork->queued_stamp = jiffies;
2332 	list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
2333 	queue_work(osdc->notify_wq, &lwork->work);
2334 }
2335 
2336 static void do_watch_notify(struct work_struct *w)
2337 {
2338 	struct linger_work *lwork = container_of(w, struct linger_work, work);
2339 	struct ceph_osd_linger_request *lreq = lwork->lreq;
2340 
2341 	if (!linger_registered(lreq)) {
2342 		dout("%s lreq %p not registered\n", __func__, lreq);
2343 		goto out;
2344 	}
2345 
2346 	WARN_ON(!lreq->is_watch);
2347 	dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
2348 	     __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
2349 	     lwork->notify.payload_len);
2350 	lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
2351 		  lwork->notify.notifier_id, lwork->notify.payload,
2352 		  lwork->notify.payload_len);
2353 
2354 out:
2355 	ceph_msg_put(lwork->notify.msg);
2356 	lwork_free(lwork);
2357 }
2358 
2359 static void do_watch_error(struct work_struct *w)
2360 {
2361 	struct linger_work *lwork = container_of(w, struct linger_work, work);
2362 	struct ceph_osd_linger_request *lreq = lwork->lreq;
2363 
2364 	if (!linger_registered(lreq)) {
2365 		dout("%s lreq %p not registered\n", __func__, lreq);
2366 		goto out;
2367 	}
2368 
2369 	dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
2370 	lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
2371 
2372 out:
2373 	lwork_free(lwork);
2374 }
2375 
2376 static void queue_watch_error(struct ceph_osd_linger_request *lreq)
2377 {
2378 	struct linger_work *lwork;
2379 
2380 	lwork = lwork_alloc(lreq, do_watch_error);
2381 	if (!lwork) {
2382 		pr_err("failed to allocate error-lwork\n");
2383 		return;
2384 	}
2385 
2386 	lwork->error.err = lreq->last_error;
2387 	lwork_queue(lwork);
2388 }
2389 
2390 static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
2391 				       int result)
2392 {
2393 	if (!completion_done(&lreq->reg_commit_wait)) {
2394 		lreq->reg_commit_error = (result <= 0 ? result : 0);
2395 		complete_all(&lreq->reg_commit_wait);
2396 	}
2397 }
2398 
2399 static void linger_commit_cb(struct ceph_osd_request *req)
2400 {
2401 	struct ceph_osd_linger_request *lreq = req->r_priv;
2402 
2403 	mutex_lock(&lreq->lock);
2404 	dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
2405 	     lreq->linger_id, req->r_result);
2406 	linger_reg_commit_complete(lreq, req->r_result);
2407 	lreq->committed = true;
2408 
2409 	if (!lreq->is_watch) {
2410 		struct ceph_osd_data *osd_data =
2411 		    osd_req_op_data(req, 0, notify, response_data);
2412 		void *p = page_address(osd_data->pages[0]);
2413 
2414 		WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
2415 			osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
2416 
2417 		/* make note of the notify_id */
2418 		if (req->r_ops[0].outdata_len >= sizeof(u64)) {
2419 			lreq->notify_id = ceph_decode_64(&p);
2420 			dout("lreq %p notify_id %llu\n", lreq,
2421 			     lreq->notify_id);
2422 		} else {
2423 			dout("lreq %p no notify_id\n", lreq);
2424 		}
2425 	}
2426 
2427 	mutex_unlock(&lreq->lock);
2428 	linger_put(lreq);
2429 }
2430 
2431 static int normalize_watch_error(int err)
2432 {
2433 	/*
2434 	 * Translate ENOENT -> ENOTCONN so that a delete->disconnection
2435 	 * notification and a failure to reconnect because we raced with
2436 	 * the delete appear the same to the user.
2437 	 */
2438 	if (err == -ENOENT)
2439 		err = -ENOTCONN;
2440 
2441 	return err;
2442 }
2443 
2444 static void linger_reconnect_cb(struct ceph_osd_request *req)
2445 {
2446 	struct ceph_osd_linger_request *lreq = req->r_priv;
2447 
2448 	mutex_lock(&lreq->lock);
2449 	dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
2450 	     lreq, lreq->linger_id, req->r_result, lreq->last_error);
2451 	if (req->r_result < 0) {
2452 		if (!lreq->last_error) {
2453 			lreq->last_error = normalize_watch_error(req->r_result);
2454 			queue_watch_error(lreq);
2455 		}
2456 	}
2457 
2458 	mutex_unlock(&lreq->lock);
2459 	linger_put(lreq);
2460 }
2461 
2462 static void send_linger(struct ceph_osd_linger_request *lreq)
2463 {
2464 	struct ceph_osd_request *req = lreq->reg_req;
2465 	struct ceph_osd_req_op *op = &req->r_ops[0];
2466 
2467 	verify_osdc_wrlocked(req->r_osdc);
2468 	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
2469 
2470 	if (req->r_osd)
2471 		cancel_linger_request(req);
2472 
2473 	request_reinit(req);
2474 	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
2475 	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
2476 	req->r_flags = lreq->t.flags;
2477 	req->r_mtime = lreq->mtime;
2478 
2479 	mutex_lock(&lreq->lock);
2480 	if (lreq->is_watch && lreq->committed) {
2481 		WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2482 			op->watch.cookie != lreq->linger_id);
2483 		op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
2484 		op->watch.gen = ++lreq->register_gen;
2485 		dout("lreq %p reconnect register_gen %u\n", lreq,
2486 		     op->watch.gen);
2487 		req->r_callback = linger_reconnect_cb;
2488 	} else {
2489 		if (!lreq->is_watch)
2490 			lreq->notify_id = 0;
2491 		else
2492 			WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
2493 		dout("lreq %p register\n", lreq);
2494 		req->r_callback = linger_commit_cb;
2495 	}
2496 	mutex_unlock(&lreq->lock);
2497 
2498 	req->r_priv = linger_get(lreq);
2499 	req->r_linger = true;
2500 
2501 	submit_request(req, true);
2502 }
2503 
2504 static void linger_ping_cb(struct ceph_osd_request *req)
2505 {
2506 	struct ceph_osd_linger_request *lreq = req->r_priv;
2507 
2508 	mutex_lock(&lreq->lock);
2509 	dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
2510 	     __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
2511 	     lreq->last_error);
2512 	if (lreq->register_gen == req->r_ops[0].watch.gen) {
2513 		if (!req->r_result) {
2514 			lreq->watch_valid_thru = lreq->ping_sent;
2515 		} else if (!lreq->last_error) {
2516 			lreq->last_error = normalize_watch_error(req->r_result);
2517 			queue_watch_error(lreq);
2518 		}
2519 	} else {
2520 		dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
2521 		     lreq->register_gen, req->r_ops[0].watch.gen);
2522 	}
2523 
2524 	mutex_unlock(&lreq->lock);
2525 	linger_put(lreq);
2526 }
2527 
2528 static void send_linger_ping(struct ceph_osd_linger_request *lreq)
2529 {
2530 	struct ceph_osd_client *osdc = lreq->osdc;
2531 	struct ceph_osd_request *req = lreq->ping_req;
2532 	struct ceph_osd_req_op *op = &req->r_ops[0];
2533 
2534 	if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2535 		dout("%s PAUSERD\n", __func__);
2536 		return;
2537 	}
2538 
2539 	lreq->ping_sent = jiffies;
2540 	dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
2541 	     __func__, lreq, lreq->linger_id, lreq->ping_sent,
2542 	     lreq->register_gen);
2543 
2544 	if (req->r_osd)
2545 		cancel_linger_request(req);
2546 
2547 	request_reinit(req);
2548 	target_copy(&req->r_t, &lreq->t);
2549 
2550 	WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2551 		op->watch.cookie != lreq->linger_id ||
2552 		op->watch.op != CEPH_OSD_WATCH_OP_PING);
2553 	op->watch.gen = lreq->register_gen;
2554 	req->r_callback = linger_ping_cb;
2555 	req->r_priv = linger_get(lreq);
2556 	req->r_linger = true;
2557 
2558 	ceph_osdc_get_request(req);
2559 	account_request(req);
2560 	req->r_tid = atomic64_inc_return(&osdc->last_tid);
2561 	link_request(lreq->osd, req);
2562 	send_request(req);
2563 }
2564 
2565 static void linger_submit(struct ceph_osd_linger_request *lreq)
2566 {
2567 	struct ceph_osd_client *osdc = lreq->osdc;
2568 	struct ceph_osd *osd;
2569 
2570 	calc_target(osdc, &lreq->t, NULL, false);
2571 	osd = lookup_create_osd(osdc, lreq->t.osd, true);
2572 	link_linger(osd, lreq);
2573 
2574 	send_linger(lreq);
2575 }
2576 
2577 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
2578 {
2579 	struct ceph_osd_client *osdc = lreq->osdc;
2580 	struct ceph_osd_linger_request *lookup_lreq;
2581 
2582 	verify_osdc_wrlocked(osdc);
2583 
2584 	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
2585 				       lreq->linger_id);
2586 	if (!lookup_lreq)
2587 		return;
2588 
2589 	WARN_ON(lookup_lreq != lreq);
2590 	erase_linger_mc(&osdc->linger_map_checks, lreq);
2591 	linger_put(lreq);
2592 }
2593 
2594 /*
2595  * @lreq has to be both registered and linked.
2596  */
2597 static void __linger_cancel(struct ceph_osd_linger_request *lreq)
2598 {
2599 	if (lreq->is_watch && lreq->ping_req->r_osd)
2600 		cancel_linger_request(lreq->ping_req);
2601 	if (lreq->reg_req->r_osd)
2602 		cancel_linger_request(lreq->reg_req);
2603 	cancel_linger_map_check(lreq);
2604 	unlink_linger(lreq->osd, lreq);
2605 	linger_unregister(lreq);
2606 }
2607 
2608 static void linger_cancel(struct ceph_osd_linger_request *lreq)
2609 {
2610 	struct ceph_osd_client *osdc = lreq->osdc;
2611 
2612 	down_write(&osdc->lock);
2613 	if (__linger_registered(lreq))
2614 		__linger_cancel(lreq);
2615 	up_write(&osdc->lock);
2616 }
2617 
2618 static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
2619 
2620 static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
2621 {
2622 	struct ceph_osd_client *osdc = lreq->osdc;
2623 	struct ceph_osdmap *map = osdc->osdmap;
2624 
2625 	verify_osdc_wrlocked(osdc);
2626 	WARN_ON(!map->epoch);
2627 
2628 	if (lreq->register_gen) {
2629 		lreq->map_dne_bound = map->epoch;
2630 		dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
2631 		     lreq, lreq->linger_id);
2632 	} else {
2633 		dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
2634 		     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
2635 		     map->epoch);
2636 	}
2637 
2638 	if (lreq->map_dne_bound) {
2639 		if (map->epoch >= lreq->map_dne_bound) {
2640 			/* we had a new enough map */
2641 			pr_info("linger_id %llu pool does not exist\n",
2642 				lreq->linger_id);
2643 			linger_reg_commit_complete(lreq, -ENOENT);
2644 			__linger_cancel(lreq);
2645 		}
2646 	} else {
2647 		send_linger_map_check(lreq);
2648 	}
2649 }
2650 
2651 static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
2652 {
2653 	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
2654 	struct ceph_osd_linger_request *lreq;
2655 	u64 linger_id = greq->private_data;
2656 
2657 	WARN_ON(greq->result || !greq->u.newest);
2658 
2659 	down_write(&osdc->lock);
2660 	lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
2661 	if (!lreq) {
2662 		dout("%s linger_id %llu dne\n", __func__, linger_id);
2663 		goto out_unlock;
2664 	}
2665 
2666 	dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
2667 	     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
2668 	     greq->u.newest);
2669 	if (!lreq->map_dne_bound)
2670 		lreq->map_dne_bound = greq->u.newest;
2671 	erase_linger_mc(&osdc->linger_map_checks, lreq);
2672 	check_linger_pool_dne(lreq);
2673 
2674 	linger_put(lreq);
2675 out_unlock:
2676 	up_write(&osdc->lock);
2677 }
2678 
2679 static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
2680 {
2681 	struct ceph_osd_client *osdc = lreq->osdc;
2682 	struct ceph_osd_linger_request *lookup_lreq;
2683 	int ret;
2684 
2685 	verify_osdc_wrlocked(osdc);
2686 
2687 	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
2688 				       lreq->linger_id);
2689 	if (lookup_lreq) {
2690 		WARN_ON(lookup_lreq != lreq);
2691 		return;
2692 	}
2693 
2694 	linger_get(lreq);
2695 	insert_linger_mc(&osdc->linger_map_checks, lreq);
2696 	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
2697 					  linger_map_check_cb, lreq->linger_id);
2698 	WARN_ON(ret);
2699 }
2700 
2701 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
2702 {
2703 	int ret;
2704 
2705 	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
2706 	ret = wait_for_completion_interruptible(&lreq->reg_commit_wait);
2707 	return ret ?: lreq->reg_commit_error;
2708 }
2709 
2710 static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq)
2711 {
2712 	int ret;
2713 
2714 	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
2715 	ret = wait_for_completion_interruptible(&lreq->notify_finish_wait);
2716 	return ret ?: lreq->notify_finish_error;
2717 }
2718 
2719 /*
2720  * Timeout callback, called every N seconds.  When 1 or more OSD
2721  * requests has been active for more than N seconds, we send a keepalive
2722  * (tag + timestamp) to its OSD to ensure any communications channel
2723  * reset is detected.
2724  */
2725 static void handle_timeout(struct work_struct *work)
2726 {
2727 	struct ceph_osd_client *osdc =
2728 		container_of(work, struct ceph_osd_client, timeout_work.work);
2729 	struct ceph_options *opts = osdc->client->options;
2730 	unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
2731 	unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout;
2732 	LIST_HEAD(slow_osds);
2733 	struct rb_node *n, *p;
2734 
2735 	dout("%s osdc %p\n", __func__, osdc);
2736 	down_write(&osdc->lock);
2737 
2738 	/*
2739 	 * ping osds that are a bit slow.  this ensures that if there
2740 	 * is a break in the TCP connection we will notice, and reopen
2741 	 * a connection with that osd (from the fault callback).
2742 	 */
2743 	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2744 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2745 		bool found = false;
2746 
2747 		for (p = rb_first(&osd->o_requests); p; ) {
2748 			struct ceph_osd_request *req =
2749 			    rb_entry(p, struct ceph_osd_request, r_node);
2750 
2751 			p = rb_next(p); /* abort_request() */
2752 
2753 			if (time_before(req->r_stamp, cutoff)) {
2754 				dout(" req %p tid %llu on osd%d is laggy\n",
2755 				     req, req->r_tid, osd->o_osd);
2756 				found = true;
2757 			}
2758 			if (opts->osd_request_timeout &&
2759 			    time_before(req->r_start_stamp, expiry_cutoff)) {
2760 				pr_err_ratelimited("tid %llu on osd%d timeout\n",
2761 				       req->r_tid, osd->o_osd);
2762 				abort_request(req, -ETIMEDOUT);
2763 			}
2764 		}
2765 		for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
2766 			struct ceph_osd_linger_request *lreq =
2767 			    rb_entry(p, struct ceph_osd_linger_request, node);
2768 
2769 			dout(" lreq %p linger_id %llu is served by osd%d\n",
2770 			     lreq, lreq->linger_id, osd->o_osd);
2771 			found = true;
2772 
2773 			mutex_lock(&lreq->lock);
2774 			if (lreq->is_watch && lreq->committed && !lreq->last_error)
2775 				send_linger_ping(lreq);
2776 			mutex_unlock(&lreq->lock);
2777 		}
2778 
2779 		if (found)
2780 			list_move_tail(&osd->o_keepalive_item, &slow_osds);
2781 	}
2782 
2783 	if (opts->osd_request_timeout) {
2784 		for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
2785 			struct ceph_osd_request *req =
2786 			    rb_entry(p, struct ceph_osd_request, r_node);
2787 
2788 			p = rb_next(p); /* abort_request() */
2789 
2790 			if (time_before(req->r_start_stamp, expiry_cutoff)) {
2791 				pr_err_ratelimited("tid %llu on osd%d timeout\n",
2792 				       req->r_tid, osdc->homeless_osd.o_osd);
2793 				abort_request(req, -ETIMEDOUT);
2794 			}
2795 		}
2796 	}
2797 
2798 	if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
2799 		maybe_request_map(osdc);
2800 
2801 	while (!list_empty(&slow_osds)) {
2802 		struct ceph_osd *osd = list_first_entry(&slow_osds,
2803 							struct ceph_osd,
2804 							o_keepalive_item);
2805 		list_del_init(&osd->o_keepalive_item);
2806 		ceph_con_keepalive(&osd->o_con);
2807 	}
2808 
2809 	up_write(&osdc->lock);
2810 	schedule_delayed_work(&osdc->timeout_work,
2811 			      osdc->client->options->osd_keepalive_timeout);
2812 }
2813 
2814 static void handle_osds_timeout(struct work_struct *work)
2815 {
2816 	struct ceph_osd_client *osdc =
2817 		container_of(work, struct ceph_osd_client,
2818 			     osds_timeout_work.work);
2819 	unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
2820 	struct ceph_osd *osd, *nosd;
2821 
2822 	dout("%s osdc %p\n", __func__, osdc);
2823 	down_write(&osdc->lock);
2824 	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
2825 		if (time_before(jiffies, osd->lru_ttl))
2826 			break;
2827 
2828 		WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
2829 		WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
2830 		close_osd(osd);
2831 	}
2832 
2833 	up_write(&osdc->lock);
2834 	schedule_delayed_work(&osdc->osds_timeout_work,
2835 			      round_jiffies_relative(delay));
2836 }
2837 
2838 static int ceph_oloc_decode(void **p, void *end,
2839 			    struct ceph_object_locator *oloc)
2840 {
2841 	u8 struct_v, struct_cv;
2842 	u32 len;
2843 	void *struct_end;
2844 	int ret = 0;
2845 
2846 	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
2847 	struct_v = ceph_decode_8(p);
2848 	struct_cv = ceph_decode_8(p);
2849 	if (struct_v < 3) {
2850 		pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
2851 			struct_v, struct_cv);
2852 		goto e_inval;
2853 	}
2854 	if (struct_cv > 6) {
2855 		pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
2856 			struct_v, struct_cv);
2857 		goto e_inval;
2858 	}
2859 	len = ceph_decode_32(p);
2860 	ceph_decode_need(p, end, len, e_inval);
2861 	struct_end = *p + len;
2862 
2863 	oloc->pool = ceph_decode_64(p);
2864 	*p += 4; /* skip preferred */
2865 
2866 	len = ceph_decode_32(p);
2867 	if (len > 0) {
2868 		pr_warn("ceph_object_locator::key is set\n");
2869 		goto e_inval;
2870 	}
2871 
2872 	if (struct_v >= 5) {
2873 		bool changed = false;
2874 
2875 		len = ceph_decode_32(p);
2876 		if (len > 0) {
2877 			ceph_decode_need(p, end, len, e_inval);
2878 			if (!oloc->pool_ns ||
2879 			    ceph_compare_string(oloc->pool_ns, *p, len))
2880 				changed = true;
2881 			*p += len;
2882 		} else {
2883 			if (oloc->pool_ns)
2884 				changed = true;
2885 		}
2886 		if (changed) {
2887 			/* redirect changes namespace */
2888 			pr_warn("ceph_object_locator::nspace is changed\n");
2889 			goto e_inval;
2890 		}
2891 	}
2892 
2893 	if (struct_v >= 6) {
2894 		s64 hash = ceph_decode_64(p);
2895 		if (hash != -1) {
2896 			pr_warn("ceph_object_locator::hash is set\n");
2897 			goto e_inval;
2898 		}
2899 	}
2900 
2901 	/* skip the rest */
2902 	*p = struct_end;
2903 out:
2904 	return ret;
2905 
2906 e_inval:
2907 	ret = -EINVAL;
2908 	goto out;
2909 }
2910 
2911 static int ceph_redirect_decode(void **p, void *end,
2912 				struct ceph_request_redirect *redir)
2913 {
2914 	u8 struct_v, struct_cv;
2915 	u32 len;
2916 	void *struct_end;
2917 	int ret;
2918 
2919 	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
2920 	struct_v = ceph_decode_8(p);
2921 	struct_cv = ceph_decode_8(p);
2922 	if (struct_cv > 1) {
2923 		pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
2924 			struct_v, struct_cv);
2925 		goto e_inval;
2926 	}
2927 	len = ceph_decode_32(p);
2928 	ceph_decode_need(p, end, len, e_inval);
2929 	struct_end = *p + len;
2930 
2931 	ret = ceph_oloc_decode(p, end, &redir->oloc);
2932 	if (ret)
2933 		goto out;
2934 
2935 	len = ceph_decode_32(p);
2936 	if (len > 0) {
2937 		pr_warn("ceph_request_redirect::object_name is set\n");
2938 		goto e_inval;
2939 	}
2940 
2941 	len = ceph_decode_32(p);
2942 	*p += len; /* skip osd_instructions */
2943 
2944 	/* skip the rest */
2945 	*p = struct_end;
2946 out:
2947 	return ret;
2948 
2949 e_inval:
2950 	ret = -EINVAL;
2951 	goto out;
2952 }
2953 
2954 struct MOSDOpReply {
2955 	struct ceph_pg pgid;
2956 	u64 flags;
2957 	int result;
2958 	u32 epoch;
2959 	int num_ops;
2960 	u32 outdata_len[CEPH_OSD_MAX_OPS];
2961 	s32 rval[CEPH_OSD_MAX_OPS];
2962 	int retry_attempt;
2963 	struct ceph_eversion replay_version;
2964 	u64 user_version;
2965 	struct ceph_request_redirect redirect;
2966 };
2967 
2968 static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
2969 {
2970 	void *p = msg->front.iov_base;
2971 	void *const end = p + msg->front.iov_len;
2972 	u16 version = le16_to_cpu(msg->hdr.version);
2973 	struct ceph_eversion bad_replay_version;
2974 	u8 decode_redir;
2975 	u32 len;
2976 	int ret;
2977 	int i;
2978 
2979 	ceph_decode_32_safe(&p, end, len, e_inval);
2980 	ceph_decode_need(&p, end, len, e_inval);
2981 	p += len; /* skip oid */
2982 
2983 	ret = ceph_decode_pgid(&p, end, &m->pgid);
2984 	if (ret)
2985 		return ret;
2986 
2987 	ceph_decode_64_safe(&p, end, m->flags, e_inval);
2988 	ceph_decode_32_safe(&p, end, m->result, e_inval);
2989 	ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
2990 	memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
2991 	p += sizeof(bad_replay_version);
2992 	ceph_decode_32_safe(&p, end, m->epoch, e_inval);
2993 
2994 	ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
2995 	if (m->num_ops > ARRAY_SIZE(m->outdata_len))
2996 		goto e_inval;
2997 
2998 	ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
2999 			 e_inval);
3000 	for (i = 0; i < m->num_ops; i++) {
3001 		struct ceph_osd_op *op = p;
3002 
3003 		m->outdata_len[i] = le32_to_cpu(op->payload_len);
3004 		p += sizeof(*op);
3005 	}
3006 
3007 	ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
3008 	for (i = 0; i < m->num_ops; i++)
3009 		ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
3010 
3011 	if (version >= 5) {
3012 		ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
3013 		memcpy(&m->replay_version, p, sizeof(m->replay_version));
3014 		p += sizeof(m->replay_version);
3015 		ceph_decode_64_safe(&p, end, m->user_version, e_inval);
3016 	} else {
3017 		m->replay_version = bad_replay_version; /* struct */
3018 		m->user_version = le64_to_cpu(m->replay_version.version);
3019 	}
3020 
3021 	if (version >= 6) {
3022 		if (version >= 7)
3023 			ceph_decode_8_safe(&p, end, decode_redir, e_inval);
3024 		else
3025 			decode_redir = 1;
3026 	} else {
3027 		decode_redir = 0;
3028 	}
3029 
3030 	if (decode_redir) {
3031 		ret = ceph_redirect_decode(&p, end, &m->redirect);
3032 		if (ret)
3033 			return ret;
3034 	} else {
3035 		ceph_oloc_init(&m->redirect.oloc);
3036 	}
3037 
3038 	return 0;
3039 
3040 e_inval:
3041 	return -EINVAL;
3042 }
3043 
3044 /*
3045  * Handle MOSDOpReply.  Set ->r_result and call the callback if it is
3046  * specified.
3047  */
3048 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3049 {
3050 	struct ceph_osd_client *osdc = osd->o_osdc;
3051 	struct ceph_osd_request *req;
3052 	struct MOSDOpReply m;
3053 	u64 tid = le64_to_cpu(msg->hdr.tid);
3054 	u32 data_len = 0;
3055 	int ret;
3056 	int i;
3057 
3058 	dout("%s msg %p tid %llu\n", __func__, msg, tid);
3059 
3060 	down_read(&osdc->lock);
3061 	if (!osd_registered(osd)) {
3062 		dout("%s osd%d unknown\n", __func__, osd->o_osd);
3063 		goto out_unlock_osdc;
3064 	}
3065 	WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
3066 
3067 	mutex_lock(&osd->lock);
3068 	req = lookup_request(&osd->o_requests, tid);
3069 	if (!req) {
3070 		dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
3071 		goto out_unlock_session;
3072 	}
3073 
3074 	m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
3075 	ret = decode_MOSDOpReply(msg, &m);
3076 	m.redirect.oloc.pool_ns = NULL;
3077 	if (ret) {
3078 		pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
3079 		       req->r_tid, ret);
3080 		ceph_msg_dump(msg);
3081 		goto fail_request;
3082 	}
3083 	dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
3084 	     __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
3085 	     m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
3086 	     le64_to_cpu(m.replay_version.version), m.user_version);
3087 
3088 	if (m.retry_attempt >= 0) {
3089 		if (m.retry_attempt != req->r_attempts - 1) {
3090 			dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
3091 			     req, req->r_tid, m.retry_attempt,
3092 			     req->r_attempts - 1);
3093 			goto out_unlock_session;
3094 		}
3095 	} else {
3096 		WARN_ON(1); /* MOSDOpReply v4 is assumed */
3097 	}
3098 
3099 	if (!ceph_oloc_empty(&m.redirect.oloc)) {
3100 		dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
3101 		     m.redirect.oloc.pool);
3102 		unlink_request(osd, req);
3103 		mutex_unlock(&osd->lock);
3104 
3105 		/*
3106 		 * Not ceph_oloc_copy() - changing pool_ns is not
3107 		 * supported.
3108 		 */
3109 		req->r_t.target_oloc.pool = m.redirect.oloc.pool;
3110 		req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
3111 		req->r_tid = 0;
3112 		__submit_request(req, false);
3113 		goto out_unlock_osdc;
3114 	}
3115 
3116 	if (m.num_ops != req->r_num_ops) {
3117 		pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
3118 		       req->r_num_ops, req->r_tid);
3119 		goto fail_request;
3120 	}
3121 	for (i = 0; i < req->r_num_ops; i++) {
3122 		dout(" req %p tid %llu op %d rval %d len %u\n", req,
3123 		     req->r_tid, i, m.rval[i], m.outdata_len[i]);
3124 		req->r_ops[i].rval = m.rval[i];
3125 		req->r_ops[i].outdata_len = m.outdata_len[i];
3126 		data_len += m.outdata_len[i];
3127 	}
3128 	if (data_len != le32_to_cpu(msg->hdr.data_len)) {
3129 		pr_err("sum of lens %u != %u for tid %llu\n", data_len,
3130 		       le32_to_cpu(msg->hdr.data_len), req->r_tid);
3131 		goto fail_request;
3132 	}
3133 	dout("%s req %p tid %llu result %d data_len %u\n", __func__,
3134 	     req, req->r_tid, m.result, data_len);
3135 
3136 	/*
3137 	 * Since we only ever request ONDISK, we should only ever get
3138 	 * one (type of) reply back.
3139 	 */
3140 	WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
3141 	req->r_result = m.result ?: data_len;
3142 	finish_request(req);
3143 	mutex_unlock(&osd->lock);
3144 	up_read(&osdc->lock);
3145 
3146 	__complete_request(req);
3147 	complete_all(&req->r_completion);
3148 	ceph_osdc_put_request(req);
3149 	return;
3150 
3151 fail_request:
3152 	complete_request(req, -EIO);
3153 out_unlock_session:
3154 	mutex_unlock(&osd->lock);
3155 out_unlock_osdc:
3156 	up_read(&osdc->lock);
3157 }
3158 
3159 static void set_pool_was_full(struct ceph_osd_client *osdc)
3160 {
3161 	struct rb_node *n;
3162 
3163 	for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
3164 		struct ceph_pg_pool_info *pi =
3165 		    rb_entry(n, struct ceph_pg_pool_info, node);
3166 
3167 		pi->was_full = __pool_full(pi);
3168 	}
3169 }
3170 
3171 static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
3172 {
3173 	struct ceph_pg_pool_info *pi;
3174 
3175 	pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
3176 	if (!pi)
3177 		return false;
3178 
3179 	return pi->was_full && !__pool_full(pi);
3180 }
3181 
3182 static enum calc_target_result
3183 recalc_linger_target(struct ceph_osd_linger_request *lreq)
3184 {
3185 	struct ceph_osd_client *osdc = lreq->osdc;
3186 	enum calc_target_result ct_res;
3187 
3188 	ct_res = calc_target(osdc, &lreq->t, NULL, true);
3189 	if (ct_res == CALC_TARGET_NEED_RESEND) {
3190 		struct ceph_osd *osd;
3191 
3192 		osd = lookup_create_osd(osdc, lreq->t.osd, true);
3193 		if (osd != lreq->osd) {
3194 			unlink_linger(lreq->osd, lreq);
3195 			link_linger(osd, lreq);
3196 		}
3197 	}
3198 
3199 	return ct_res;
3200 }
3201 
3202 /*
3203  * Requeue requests whose mapping to an OSD has changed.
3204  */
3205 static void scan_requests(struct ceph_osd *osd,
3206 			  bool force_resend,
3207 			  bool cleared_full,
3208 			  bool check_pool_cleared_full,
3209 			  struct rb_root *need_resend,
3210 			  struct list_head *need_resend_linger)
3211 {
3212 	struct ceph_osd_client *osdc = osd->o_osdc;
3213 	struct rb_node *n;
3214 	bool force_resend_writes;
3215 
3216 	for (n = rb_first(&osd->o_linger_requests); n; ) {
3217 		struct ceph_osd_linger_request *lreq =
3218 		    rb_entry(n, struct ceph_osd_linger_request, node);
3219 		enum calc_target_result ct_res;
3220 
3221 		n = rb_next(n); /* recalc_linger_target() */
3222 
3223 		dout("%s lreq %p linger_id %llu\n", __func__, lreq,
3224 		     lreq->linger_id);
3225 		ct_res = recalc_linger_target(lreq);
3226 		switch (ct_res) {
3227 		case CALC_TARGET_NO_ACTION:
3228 			force_resend_writes = cleared_full ||
3229 			    (check_pool_cleared_full &&
3230 			     pool_cleared_full(osdc, lreq->t.base_oloc.pool));
3231 			if (!force_resend && !force_resend_writes)
3232 				break;
3233 
3234 			/* fall through */
3235 		case CALC_TARGET_NEED_RESEND:
3236 			cancel_linger_map_check(lreq);
3237 			/*
3238 			 * scan_requests() for the previous epoch(s)
3239 			 * may have already added it to the list, since
3240 			 * it's not unlinked here.
3241 			 */
3242 			if (list_empty(&lreq->scan_item))
3243 				list_add_tail(&lreq->scan_item, need_resend_linger);
3244 			break;
3245 		case CALC_TARGET_POOL_DNE:
3246 			list_del_init(&lreq->scan_item);
3247 			check_linger_pool_dne(lreq);
3248 			break;
3249 		}
3250 	}
3251 
3252 	for (n = rb_first(&osd->o_requests); n; ) {
3253 		struct ceph_osd_request *req =
3254 		    rb_entry(n, struct ceph_osd_request, r_node);
3255 		enum calc_target_result ct_res;
3256 
3257 		n = rb_next(n); /* unlink_request(), check_pool_dne() */
3258 
3259 		dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3260 		ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con,
3261 				     false);
3262 		switch (ct_res) {
3263 		case CALC_TARGET_NO_ACTION:
3264 			force_resend_writes = cleared_full ||
3265 			    (check_pool_cleared_full &&
3266 			     pool_cleared_full(osdc, req->r_t.base_oloc.pool));
3267 			if (!force_resend &&
3268 			    (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
3269 			     !force_resend_writes))
3270 				break;
3271 
3272 			/* fall through */
3273 		case CALC_TARGET_NEED_RESEND:
3274 			cancel_map_check(req);
3275 			unlink_request(osd, req);
3276 			insert_request(need_resend, req);
3277 			break;
3278 		case CALC_TARGET_POOL_DNE:
3279 			check_pool_dne(req);
3280 			break;
3281 		}
3282 	}
3283 }
3284 
3285 static int handle_one_map(struct ceph_osd_client *osdc,
3286 			  void *p, void *end, bool incremental,
3287 			  struct rb_root *need_resend,
3288 			  struct list_head *need_resend_linger)
3289 {
3290 	struct ceph_osdmap *newmap;
3291 	struct rb_node *n;
3292 	bool skipped_map = false;
3293 	bool was_full;
3294 
3295 	was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
3296 	set_pool_was_full(osdc);
3297 
3298 	if (incremental)
3299 		newmap = osdmap_apply_incremental(&p, end, osdc->osdmap);
3300 	else
3301 		newmap = ceph_osdmap_decode(&p, end);
3302 	if (IS_ERR(newmap))
3303 		return PTR_ERR(newmap);
3304 
3305 	if (newmap != osdc->osdmap) {
3306 		/*
3307 		 * Preserve ->was_full before destroying the old map.
3308 		 * For pools that weren't in the old map, ->was_full
3309 		 * should be false.
3310 		 */
3311 		for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
3312 			struct ceph_pg_pool_info *pi =
3313 			    rb_entry(n, struct ceph_pg_pool_info, node);
3314 			struct ceph_pg_pool_info *old_pi;
3315 
3316 			old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
3317 			if (old_pi)
3318 				pi->was_full = old_pi->was_full;
3319 			else
3320 				WARN_ON(pi->was_full);
3321 		}
3322 
3323 		if (osdc->osdmap->epoch &&
3324 		    osdc->osdmap->epoch + 1 < newmap->epoch) {
3325 			WARN_ON(incremental);
3326 			skipped_map = true;
3327 		}
3328 
3329 		ceph_osdmap_destroy(osdc->osdmap);
3330 		osdc->osdmap = newmap;
3331 	}
3332 
3333 	was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
3334 	scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
3335 		      need_resend, need_resend_linger);
3336 
3337 	for (n = rb_first(&osdc->osds); n; ) {
3338 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3339 
3340 		n = rb_next(n); /* close_osd() */
3341 
3342 		scan_requests(osd, skipped_map, was_full, true, need_resend,
3343 			      need_resend_linger);
3344 		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
3345 		    memcmp(&osd->o_con.peer_addr,
3346 			   ceph_osd_addr(osdc->osdmap, osd->o_osd),
3347 			   sizeof(struct ceph_entity_addr)))
3348 			close_osd(osd);
3349 	}
3350 
3351 	return 0;
3352 }
3353 
3354 static void kick_requests(struct ceph_osd_client *osdc,
3355 			  struct rb_root *need_resend,
3356 			  struct list_head *need_resend_linger)
3357 {
3358 	struct ceph_osd_linger_request *lreq, *nlreq;
3359 	struct rb_node *n;
3360 
3361 	for (n = rb_first(need_resend); n; ) {
3362 		struct ceph_osd_request *req =
3363 		    rb_entry(n, struct ceph_osd_request, r_node);
3364 		struct ceph_osd *osd;
3365 
3366 		n = rb_next(n);
3367 		erase_request(need_resend, req); /* before link_request() */
3368 
3369 		WARN_ON(req->r_osd);
3370 		calc_target(osdc, &req->r_t, NULL, false);
3371 		osd = lookup_create_osd(osdc, req->r_t.osd, true);
3372 		link_request(osd, req);
3373 		if (!req->r_linger) {
3374 			if (!osd_homeless(osd) && !req->r_t.paused)
3375 				send_request(req);
3376 		} else {
3377 			cancel_linger_request(req);
3378 		}
3379 	}
3380 
3381 	list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
3382 		if (!osd_homeless(lreq->osd))
3383 			send_linger(lreq);
3384 
3385 		list_del_init(&lreq->scan_item);
3386 	}
3387 }
3388 
3389 /*
3390  * Process updated osd map.
3391  *
3392  * The message contains any number of incremental and full maps, normally
3393  * indicating some sort of topology change in the cluster.  Kick requests
3394  * off to different OSDs as needed.
3395  */
3396 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
3397 {
3398 	void *p = msg->front.iov_base;
3399 	void *const end = p + msg->front.iov_len;
3400 	u32 nr_maps, maplen;
3401 	u32 epoch;
3402 	struct ceph_fsid fsid;
3403 	struct rb_root need_resend = RB_ROOT;
3404 	LIST_HEAD(need_resend_linger);
3405 	bool handled_incremental = false;
3406 	bool was_pauserd, was_pausewr;
3407 	bool pauserd, pausewr;
3408 	int err;
3409 
3410 	dout("%s have %u\n", __func__, osdc->osdmap->epoch);
3411 	down_write(&osdc->lock);
3412 
3413 	/* verify fsid */
3414 	ceph_decode_need(&p, end, sizeof(fsid), bad);
3415 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
3416 	if (ceph_check_fsid(osdc->client, &fsid) < 0)
3417 		goto bad;
3418 
3419 	was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
3420 	was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
3421 		      ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
3422 		      have_pool_full(osdc);
3423 
3424 	/* incremental maps */
3425 	ceph_decode_32_safe(&p, end, nr_maps, bad);
3426 	dout(" %d inc maps\n", nr_maps);
3427 	while (nr_maps > 0) {
3428 		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3429 		epoch = ceph_decode_32(&p);
3430 		maplen = ceph_decode_32(&p);
3431 		ceph_decode_need(&p, end, maplen, bad);
3432 		if (osdc->osdmap->epoch &&
3433 		    osdc->osdmap->epoch + 1 == epoch) {
3434 			dout("applying incremental map %u len %d\n",
3435 			     epoch, maplen);
3436 			err = handle_one_map(osdc, p, p + maplen, true,
3437 					     &need_resend, &need_resend_linger);
3438 			if (err)
3439 				goto bad;
3440 			handled_incremental = true;
3441 		} else {
3442 			dout("ignoring incremental map %u len %d\n",
3443 			     epoch, maplen);
3444 		}
3445 		p += maplen;
3446 		nr_maps--;
3447 	}
3448 	if (handled_incremental)
3449 		goto done;
3450 
3451 	/* full maps */
3452 	ceph_decode_32_safe(&p, end, nr_maps, bad);
3453 	dout(" %d full maps\n", nr_maps);
3454 	while (nr_maps) {
3455 		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3456 		epoch = ceph_decode_32(&p);
3457 		maplen = ceph_decode_32(&p);
3458 		ceph_decode_need(&p, end, maplen, bad);
3459 		if (nr_maps > 1) {
3460 			dout("skipping non-latest full map %u len %d\n",
3461 			     epoch, maplen);
3462 		} else if (osdc->osdmap->epoch >= epoch) {
3463 			dout("skipping full map %u len %d, "
3464 			     "older than our %u\n", epoch, maplen,
3465 			     osdc->osdmap->epoch);
3466 		} else {
3467 			dout("taking full map %u len %d\n", epoch, maplen);
3468 			err = handle_one_map(osdc, p, p + maplen, false,
3469 					     &need_resend, &need_resend_linger);
3470 			if (err)
3471 				goto bad;
3472 		}
3473 		p += maplen;
3474 		nr_maps--;
3475 	}
3476 
3477 done:
3478 	/*
3479 	 * subscribe to subsequent osdmap updates if full to ensure
3480 	 * we find out when we are no longer full and stop returning
3481 	 * ENOSPC.
3482 	 */
3483 	pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
3484 	pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
3485 		  ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
3486 		  have_pool_full(osdc);
3487 	if (was_pauserd || was_pausewr || pauserd || pausewr ||
3488 	    osdc->osdmap->epoch < osdc->epoch_barrier)
3489 		maybe_request_map(osdc);
3490 
3491 	kick_requests(osdc, &need_resend, &need_resend_linger);
3492 
3493 	ceph_osdc_abort_on_full(osdc);
3494 	ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
3495 			  osdc->osdmap->epoch);
3496 	up_write(&osdc->lock);
3497 	wake_up_all(&osdc->client->auth_wq);
3498 	return;
3499 
3500 bad:
3501 	pr_err("osdc handle_map corrupt msg\n");
3502 	ceph_msg_dump(msg);
3503 	up_write(&osdc->lock);
3504 }
3505 
3506 /*
3507  * Resubmit requests pending on the given osd.
3508  */
3509 static void kick_osd_requests(struct ceph_osd *osd)
3510 {
3511 	struct rb_node *n;
3512 
3513 	for (n = rb_first(&osd->o_requests); n; ) {
3514 		struct ceph_osd_request *req =
3515 		    rb_entry(n, struct ceph_osd_request, r_node);
3516 
3517 		n = rb_next(n); /* cancel_linger_request() */
3518 
3519 		if (!req->r_linger) {
3520 			if (!req->r_t.paused)
3521 				send_request(req);
3522 		} else {
3523 			cancel_linger_request(req);
3524 		}
3525 	}
3526 	for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
3527 		struct ceph_osd_linger_request *lreq =
3528 		    rb_entry(n, struct ceph_osd_linger_request, node);
3529 
3530 		send_linger(lreq);
3531 	}
3532 }
3533 
3534 /*
3535  * If the osd connection drops, we need to resubmit all requests.
3536  */
3537 static void osd_fault(struct ceph_connection *con)
3538 {
3539 	struct ceph_osd *osd = con->private;
3540 	struct ceph_osd_client *osdc = osd->o_osdc;
3541 
3542 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
3543 
3544 	down_write(&osdc->lock);
3545 	if (!osd_registered(osd)) {
3546 		dout("%s osd%d unknown\n", __func__, osd->o_osd);
3547 		goto out_unlock;
3548 	}
3549 
3550 	if (!reopen_osd(osd))
3551 		kick_osd_requests(osd);
3552 	maybe_request_map(osdc);
3553 
3554 out_unlock:
3555 	up_write(&osdc->lock);
3556 }
3557 
3558 /*
3559  * Process osd watch notifications
3560  */
3561 static void handle_watch_notify(struct ceph_osd_client *osdc,
3562 				struct ceph_msg *msg)
3563 {
3564 	void *p = msg->front.iov_base;
3565 	void *const end = p + msg->front.iov_len;
3566 	struct ceph_osd_linger_request *lreq;
3567 	struct linger_work *lwork;
3568 	u8 proto_ver, opcode;
3569 	u64 cookie, notify_id;
3570 	u64 notifier_id = 0;
3571 	s32 return_code = 0;
3572 	void *payload = NULL;
3573 	u32 payload_len = 0;
3574 
3575 	ceph_decode_8_safe(&p, end, proto_ver, bad);
3576 	ceph_decode_8_safe(&p, end, opcode, bad);
3577 	ceph_decode_64_safe(&p, end, cookie, bad);
3578 	p += 8; /* skip ver */
3579 	ceph_decode_64_safe(&p, end, notify_id, bad);
3580 
3581 	if (proto_ver >= 1) {
3582 		ceph_decode_32_safe(&p, end, payload_len, bad);
3583 		ceph_decode_need(&p, end, payload_len, bad);
3584 		payload = p;
3585 		p += payload_len;
3586 	}
3587 
3588 	if (le16_to_cpu(msg->hdr.version) >= 2)
3589 		ceph_decode_32_safe(&p, end, return_code, bad);
3590 
3591 	if (le16_to_cpu(msg->hdr.version) >= 3)
3592 		ceph_decode_64_safe(&p, end, notifier_id, bad);
3593 
3594 	down_read(&osdc->lock);
3595 	lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
3596 	if (!lreq) {
3597 		dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
3598 		     cookie);
3599 		goto out_unlock_osdc;
3600 	}
3601 
3602 	mutex_lock(&lreq->lock);
3603 	dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
3604 	     opcode, cookie, lreq, lreq->is_watch);
3605 	if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
3606 		if (!lreq->last_error) {
3607 			lreq->last_error = -ENOTCONN;
3608 			queue_watch_error(lreq);
3609 		}
3610 	} else if (!lreq->is_watch) {
3611 		/* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
3612 		if (lreq->notify_id && lreq->notify_id != notify_id) {
3613 			dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
3614 			     lreq->notify_id, notify_id);
3615 		} else if (!completion_done(&lreq->notify_finish_wait)) {
3616 			struct ceph_msg_data *data =
3617 			    list_first_entry_or_null(&msg->data,
3618 						     struct ceph_msg_data,
3619 						     links);
3620 
3621 			if (data) {
3622 				if (lreq->preply_pages) {
3623 					WARN_ON(data->type !=
3624 							CEPH_MSG_DATA_PAGES);
3625 					*lreq->preply_pages = data->pages;
3626 					*lreq->preply_len = data->length;
3627 				} else {
3628 					ceph_release_page_vector(data->pages,
3629 					       calc_pages_for(0, data->length));
3630 				}
3631 			}
3632 			lreq->notify_finish_error = return_code;
3633 			complete_all(&lreq->notify_finish_wait);
3634 		}
3635 	} else {
3636 		/* CEPH_WATCH_EVENT_NOTIFY */
3637 		lwork = lwork_alloc(lreq, do_watch_notify);
3638 		if (!lwork) {
3639 			pr_err("failed to allocate notify-lwork\n");
3640 			goto out_unlock_lreq;
3641 		}
3642 
3643 		lwork->notify.notify_id = notify_id;
3644 		lwork->notify.notifier_id = notifier_id;
3645 		lwork->notify.payload = payload;
3646 		lwork->notify.payload_len = payload_len;
3647 		lwork->notify.msg = ceph_msg_get(msg);
3648 		lwork_queue(lwork);
3649 	}
3650 
3651 out_unlock_lreq:
3652 	mutex_unlock(&lreq->lock);
3653 out_unlock_osdc:
3654 	up_read(&osdc->lock);
3655 	return;
3656 
3657 bad:
3658 	pr_err("osdc handle_watch_notify corrupt msg\n");
3659 }
3660 
3661 /*
3662  * Register request, send initial attempt.
3663  */
3664 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
3665 			    struct ceph_osd_request *req,
3666 			    bool nofail)
3667 {
3668 	down_read(&osdc->lock);
3669 	submit_request(req, false);
3670 	up_read(&osdc->lock);
3671 
3672 	return 0;
3673 }
3674 EXPORT_SYMBOL(ceph_osdc_start_request);
3675 
3676 /*
3677  * Unregister a registered request.  The request is not completed:
3678  * ->r_result isn't set and __complete_request() isn't called.
3679  */
3680 void ceph_osdc_cancel_request(struct ceph_osd_request *req)
3681 {
3682 	struct ceph_osd_client *osdc = req->r_osdc;
3683 
3684 	down_write(&osdc->lock);
3685 	if (req->r_osd)
3686 		cancel_request(req);
3687 	up_write(&osdc->lock);
3688 }
3689 EXPORT_SYMBOL(ceph_osdc_cancel_request);
3690 
3691 /*
3692  * @timeout: in jiffies, 0 means "wait forever"
3693  */
3694 static int wait_request_timeout(struct ceph_osd_request *req,
3695 				unsigned long timeout)
3696 {
3697 	long left;
3698 
3699 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3700 	left = wait_for_completion_killable_timeout(&req->r_completion,
3701 						ceph_timeout_jiffies(timeout));
3702 	if (left <= 0) {
3703 		left = left ?: -ETIMEDOUT;
3704 		ceph_osdc_cancel_request(req);
3705 	} else {
3706 		left = req->r_result; /* completed */
3707 	}
3708 
3709 	return left;
3710 }
3711 
3712 /*
3713  * wait for a request to complete
3714  */
3715 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
3716 			   struct ceph_osd_request *req)
3717 {
3718 	return wait_request_timeout(req, 0);
3719 }
3720 EXPORT_SYMBOL(ceph_osdc_wait_request);
3721 
3722 /*
3723  * sync - wait for all in-flight requests to flush.  avoid starvation.
3724  */
3725 void ceph_osdc_sync(struct ceph_osd_client *osdc)
3726 {
3727 	struct rb_node *n, *p;
3728 	u64 last_tid = atomic64_read(&osdc->last_tid);
3729 
3730 again:
3731 	down_read(&osdc->lock);
3732 	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
3733 		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3734 
3735 		mutex_lock(&osd->lock);
3736 		for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
3737 			struct ceph_osd_request *req =
3738 			    rb_entry(p, struct ceph_osd_request, r_node);
3739 
3740 			if (req->r_tid > last_tid)
3741 				break;
3742 
3743 			if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
3744 				continue;
3745 
3746 			ceph_osdc_get_request(req);
3747 			mutex_unlock(&osd->lock);
3748 			up_read(&osdc->lock);
3749 			dout("%s waiting on req %p tid %llu last_tid %llu\n",
3750 			     __func__, req, req->r_tid, last_tid);
3751 			wait_for_completion(&req->r_completion);
3752 			ceph_osdc_put_request(req);
3753 			goto again;
3754 		}
3755 
3756 		mutex_unlock(&osd->lock);
3757 	}
3758 
3759 	up_read(&osdc->lock);
3760 	dout("%s done last_tid %llu\n", __func__, last_tid);
3761 }
3762 EXPORT_SYMBOL(ceph_osdc_sync);
3763 
3764 static struct ceph_osd_request *
3765 alloc_linger_request(struct ceph_osd_linger_request *lreq)
3766 {
3767 	struct ceph_osd_request *req;
3768 
3769 	req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO);
3770 	if (!req)
3771 		return NULL;
3772 
3773 	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
3774 	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
3775 
3776 	if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
3777 		ceph_osdc_put_request(req);
3778 		return NULL;
3779 	}
3780 
3781 	return req;
3782 }
3783 
3784 /*
3785  * Returns a handle, caller owns a ref.
3786  */
3787 struct ceph_osd_linger_request *
3788 ceph_osdc_watch(struct ceph_osd_client *osdc,
3789 		struct ceph_object_id *oid,
3790 		struct ceph_object_locator *oloc,
3791 		rados_watchcb2_t wcb,
3792 		rados_watcherrcb_t errcb,
3793 		void *data)
3794 {
3795 	struct ceph_osd_linger_request *lreq;
3796 	int ret;
3797 
3798 	lreq = linger_alloc(osdc);
3799 	if (!lreq)
3800 		return ERR_PTR(-ENOMEM);
3801 
3802 	lreq->is_watch = true;
3803 	lreq->wcb = wcb;
3804 	lreq->errcb = errcb;
3805 	lreq->data = data;
3806 	lreq->watch_valid_thru = jiffies;
3807 
3808 	ceph_oid_copy(&lreq->t.base_oid, oid);
3809 	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
3810 	lreq->t.flags = CEPH_OSD_FLAG_WRITE;
3811 	ktime_get_real_ts(&lreq->mtime);
3812 
3813 	lreq->reg_req = alloc_linger_request(lreq);
3814 	if (!lreq->reg_req) {
3815 		ret = -ENOMEM;
3816 		goto err_put_lreq;
3817 	}
3818 
3819 	lreq->ping_req = alloc_linger_request(lreq);
3820 	if (!lreq->ping_req) {
3821 		ret = -ENOMEM;
3822 		goto err_put_lreq;
3823 	}
3824 
3825 	down_write(&osdc->lock);
3826 	linger_register(lreq); /* before osd_req_op_* */
3827 	osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
3828 			      CEPH_OSD_WATCH_OP_WATCH);
3829 	osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
3830 			      CEPH_OSD_WATCH_OP_PING);
3831 	linger_submit(lreq);
3832 	up_write(&osdc->lock);
3833 
3834 	ret = linger_reg_commit_wait(lreq);
3835 	if (ret) {
3836 		linger_cancel(lreq);
3837 		goto err_put_lreq;
3838 	}
3839 
3840 	return lreq;
3841 
3842 err_put_lreq:
3843 	linger_put(lreq);
3844 	return ERR_PTR(ret);
3845 }
3846 EXPORT_SYMBOL(ceph_osdc_watch);
3847 
3848 /*
3849  * Releases a ref.
3850  *
3851  * Times out after mount_timeout to preserve rbd unmap behaviour
3852  * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
3853  * with mount_timeout").
3854  */
3855 int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
3856 		      struct ceph_osd_linger_request *lreq)
3857 {
3858 	struct ceph_options *opts = osdc->client->options;
3859 	struct ceph_osd_request *req;
3860 	int ret;
3861 
3862 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
3863 	if (!req)
3864 		return -ENOMEM;
3865 
3866 	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
3867 	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
3868 	req->r_flags = CEPH_OSD_FLAG_WRITE;
3869 	ktime_get_real_ts(&req->r_mtime);
3870 	osd_req_op_watch_init(req, 0, lreq->linger_id,
3871 			      CEPH_OSD_WATCH_OP_UNWATCH);
3872 
3873 	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3874 	if (ret)
3875 		goto out_put_req;
3876 
3877 	ceph_osdc_start_request(osdc, req, false);
3878 	linger_cancel(lreq);
3879 	linger_put(lreq);
3880 	ret = wait_request_timeout(req, opts->mount_timeout);
3881 
3882 out_put_req:
3883 	ceph_osdc_put_request(req);
3884 	return ret;
3885 }
3886 EXPORT_SYMBOL(ceph_osdc_unwatch);
3887 
3888 static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
3889 				      u64 notify_id, u64 cookie, void *payload,
3890 				      size_t payload_len)
3891 {
3892 	struct ceph_osd_req_op *op;
3893 	struct ceph_pagelist *pl;
3894 	int ret;
3895 
3896 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
3897 
3898 	pl = kmalloc(sizeof(*pl), GFP_NOIO);
3899 	if (!pl)
3900 		return -ENOMEM;
3901 
3902 	ceph_pagelist_init(pl);
3903 	ret = ceph_pagelist_encode_64(pl, notify_id);
3904 	ret |= ceph_pagelist_encode_64(pl, cookie);
3905 	if (payload) {
3906 		ret |= ceph_pagelist_encode_32(pl, payload_len);
3907 		ret |= ceph_pagelist_append(pl, payload, payload_len);
3908 	} else {
3909 		ret |= ceph_pagelist_encode_32(pl, 0);
3910 	}
3911 	if (ret) {
3912 		ceph_pagelist_release(pl);
3913 		return -ENOMEM;
3914 	}
3915 
3916 	ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
3917 	op->indata_len = pl->length;
3918 	return 0;
3919 }
3920 
3921 int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
3922 			 struct ceph_object_id *oid,
3923 			 struct ceph_object_locator *oloc,
3924 			 u64 notify_id,
3925 			 u64 cookie,
3926 			 void *payload,
3927 			 size_t payload_len)
3928 {
3929 	struct ceph_osd_request *req;
3930 	int ret;
3931 
3932 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
3933 	if (!req)
3934 		return -ENOMEM;
3935 
3936 	ceph_oid_copy(&req->r_base_oid, oid);
3937 	ceph_oloc_copy(&req->r_base_oloc, oloc);
3938 	req->r_flags = CEPH_OSD_FLAG_READ;
3939 
3940 	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
3941 	if (ret)
3942 		goto out_put_req;
3943 
3944 	ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
3945 					 payload_len);
3946 	if (ret)
3947 		goto out_put_req;
3948 
3949 	ceph_osdc_start_request(osdc, req, false);
3950 	ret = ceph_osdc_wait_request(osdc, req);
3951 
3952 out_put_req:
3953 	ceph_osdc_put_request(req);
3954 	return ret;
3955 }
3956 EXPORT_SYMBOL(ceph_osdc_notify_ack);
3957 
3958 static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
3959 				  u64 cookie, u32 prot_ver, u32 timeout,
3960 				  void *payload, size_t payload_len)
3961 {
3962 	struct ceph_osd_req_op *op;
3963 	struct ceph_pagelist *pl;
3964 	int ret;
3965 
3966 	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
3967 	op->notify.cookie = cookie;
3968 
3969 	pl = kmalloc(sizeof(*pl), GFP_NOIO);
3970 	if (!pl)
3971 		return -ENOMEM;
3972 
3973 	ceph_pagelist_init(pl);
3974 	ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
3975 	ret |= ceph_pagelist_encode_32(pl, timeout);
3976 	ret |= ceph_pagelist_encode_32(pl, payload_len);
3977 	ret |= ceph_pagelist_append(pl, payload, payload_len);
3978 	if (ret) {
3979 		ceph_pagelist_release(pl);
3980 		return -ENOMEM;
3981 	}
3982 
3983 	ceph_osd_data_pagelist_init(&op->notify.request_data, pl);
3984 	op->indata_len = pl->length;
3985 	return 0;
3986 }
3987 
3988 /*
3989  * @timeout: in seconds
3990  *
3991  * @preply_{pages,len} are initialized both on success and error.
3992  * The caller is responsible for:
3993  *
3994  *     ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
3995  */
3996 int ceph_osdc_notify(struct ceph_osd_client *osdc,
3997 		     struct ceph_object_id *oid,
3998 		     struct ceph_object_locator *oloc,
3999 		     void *payload,
4000 		     size_t payload_len,
4001 		     u32 timeout,
4002 		     struct page ***preply_pages,
4003 		     size_t *preply_len)
4004 {
4005 	struct ceph_osd_linger_request *lreq;
4006 	struct page **pages;
4007 	int ret;
4008 
4009 	WARN_ON(!timeout);
4010 	if (preply_pages) {
4011 		*preply_pages = NULL;
4012 		*preply_len = 0;
4013 	}
4014 
4015 	lreq = linger_alloc(osdc);
4016 	if (!lreq)
4017 		return -ENOMEM;
4018 
4019 	lreq->preply_pages = preply_pages;
4020 	lreq->preply_len = preply_len;
4021 
4022 	ceph_oid_copy(&lreq->t.base_oid, oid);
4023 	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4024 	lreq->t.flags = CEPH_OSD_FLAG_READ;
4025 
4026 	lreq->reg_req = alloc_linger_request(lreq);
4027 	if (!lreq->reg_req) {
4028 		ret = -ENOMEM;
4029 		goto out_put_lreq;
4030 	}
4031 
4032 	/* for notify_id */
4033 	pages = ceph_alloc_page_vector(1, GFP_NOIO);
4034 	if (IS_ERR(pages)) {
4035 		ret = PTR_ERR(pages);
4036 		goto out_put_lreq;
4037 	}
4038 
4039 	down_write(&osdc->lock);
4040 	linger_register(lreq); /* before osd_req_op_* */
4041 	ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
4042 				     timeout, payload, payload_len);
4043 	if (ret) {
4044 		linger_unregister(lreq);
4045 		up_write(&osdc->lock);
4046 		ceph_release_page_vector(pages, 1);
4047 		goto out_put_lreq;
4048 	}
4049 	ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
4050 						 response_data),
4051 				 pages, PAGE_SIZE, 0, false, true);
4052 	linger_submit(lreq);
4053 	up_write(&osdc->lock);
4054 
4055 	ret = linger_reg_commit_wait(lreq);
4056 	if (!ret)
4057 		ret = linger_notify_finish_wait(lreq);
4058 	else
4059 		dout("lreq %p failed to initiate notify %d\n", lreq, ret);
4060 
4061 	linger_cancel(lreq);
4062 out_put_lreq:
4063 	linger_put(lreq);
4064 	return ret;
4065 }
4066 EXPORT_SYMBOL(ceph_osdc_notify);
4067 
4068 /*
4069  * Return the number of milliseconds since the watch was last
4070  * confirmed, or an error.  If there is an error, the watch is no
4071  * longer valid, and should be destroyed with ceph_osdc_unwatch().
4072  */
4073 int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
4074 			  struct ceph_osd_linger_request *lreq)
4075 {
4076 	unsigned long stamp, age;
4077 	int ret;
4078 
4079 	down_read(&osdc->lock);
4080 	mutex_lock(&lreq->lock);
4081 	stamp = lreq->watch_valid_thru;
4082 	if (!list_empty(&lreq->pending_lworks)) {
4083 		struct linger_work *lwork =
4084 		    list_first_entry(&lreq->pending_lworks,
4085 				     struct linger_work,
4086 				     pending_item);
4087 
4088 		if (time_before(lwork->queued_stamp, stamp))
4089 			stamp = lwork->queued_stamp;
4090 	}
4091 	age = jiffies - stamp;
4092 	dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__,
4093 	     lreq, lreq->linger_id, age, lreq->last_error);
4094 	/* we are truncating to msecs, so return a safe upper bound */
4095 	ret = lreq->last_error ?: 1 + jiffies_to_msecs(age);
4096 
4097 	mutex_unlock(&lreq->lock);
4098 	up_read(&osdc->lock);
4099 	return ret;
4100 }
4101 
4102 static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
4103 {
4104 	u8 struct_v;
4105 	u32 struct_len;
4106 	int ret;
4107 
4108 	ret = ceph_start_decoding(p, end, 2, "watch_item_t",
4109 				  &struct_v, &struct_len);
4110 	if (ret)
4111 		return ret;
4112 
4113 	ceph_decode_copy(p, &item->name, sizeof(item->name));
4114 	item->cookie = ceph_decode_64(p);
4115 	*p += 4; /* skip timeout_seconds */
4116 	if (struct_v >= 2) {
4117 		ceph_decode_copy(p, &item->addr, sizeof(item->addr));
4118 		ceph_decode_addr(&item->addr);
4119 	}
4120 
4121 	dout("%s %s%llu cookie %llu addr %s\n", __func__,
4122 	     ENTITY_NAME(item->name), item->cookie,
4123 	     ceph_pr_addr(&item->addr.in_addr));
4124 	return 0;
4125 }
4126 
4127 static int decode_watchers(void **p, void *end,
4128 			   struct ceph_watch_item **watchers,
4129 			   u32 *num_watchers)
4130 {
4131 	u8 struct_v;
4132 	u32 struct_len;
4133 	int i;
4134 	int ret;
4135 
4136 	ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
4137 				  &struct_v, &struct_len);
4138 	if (ret)
4139 		return ret;
4140 
4141 	*num_watchers = ceph_decode_32(p);
4142 	*watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
4143 	if (!*watchers)
4144 		return -ENOMEM;
4145 
4146 	for (i = 0; i < *num_watchers; i++) {
4147 		ret = decode_watcher(p, end, *watchers + i);
4148 		if (ret) {
4149 			kfree(*watchers);
4150 			return ret;
4151 		}
4152 	}
4153 
4154 	return 0;
4155 }
4156 
4157 /*
4158  * On success, the caller is responsible for:
4159  *
4160  *     kfree(watchers);
4161  */
4162 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
4163 			    struct ceph_object_id *oid,
4164 			    struct ceph_object_locator *oloc,
4165 			    struct ceph_watch_item **watchers,
4166 			    u32 *num_watchers)
4167 {
4168 	struct ceph_osd_request *req;
4169 	struct page **pages;
4170 	int ret;
4171 
4172 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4173 	if (!req)
4174 		return -ENOMEM;
4175 
4176 	ceph_oid_copy(&req->r_base_oid, oid);
4177 	ceph_oloc_copy(&req->r_base_oloc, oloc);
4178 	req->r_flags = CEPH_OSD_FLAG_READ;
4179 
4180 	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4181 	if (ret)
4182 		goto out_put_req;
4183 
4184 	pages = ceph_alloc_page_vector(1, GFP_NOIO);
4185 	if (IS_ERR(pages)) {
4186 		ret = PTR_ERR(pages);
4187 		goto out_put_req;
4188 	}
4189 
4190 	osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
4191 	ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
4192 						 response_data),
4193 				 pages, PAGE_SIZE, 0, false, true);
4194 
4195 	ceph_osdc_start_request(osdc, req, false);
4196 	ret = ceph_osdc_wait_request(osdc, req);
4197 	if (ret >= 0) {
4198 		void *p = page_address(pages[0]);
4199 		void *const end = p + req->r_ops[0].outdata_len;
4200 
4201 		ret = decode_watchers(&p, end, watchers, num_watchers);
4202 	}
4203 
4204 out_put_req:
4205 	ceph_osdc_put_request(req);
4206 	return ret;
4207 }
4208 EXPORT_SYMBOL(ceph_osdc_list_watchers);
4209 
4210 /*
4211  * Call all pending notify callbacks - for use after a watch is
4212  * unregistered, to make sure no more callbacks for it will be invoked
4213  */
4214 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
4215 {
4216 	dout("%s osdc %p\n", __func__, osdc);
4217 	flush_workqueue(osdc->notify_wq);
4218 }
4219 EXPORT_SYMBOL(ceph_osdc_flush_notifies);
4220 
4221 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
4222 {
4223 	down_read(&osdc->lock);
4224 	maybe_request_map(osdc);
4225 	up_read(&osdc->lock);
4226 }
4227 EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
4228 
4229 /*
4230  * Execute an OSD class method on an object.
4231  *
4232  * @flags: CEPH_OSD_FLAG_*
4233  * @resp_len: in/out param for reply length
4234  */
4235 int ceph_osdc_call(struct ceph_osd_client *osdc,
4236 		   struct ceph_object_id *oid,
4237 		   struct ceph_object_locator *oloc,
4238 		   const char *class, const char *method,
4239 		   unsigned int flags,
4240 		   struct page *req_page, size_t req_len,
4241 		   struct page *resp_page, size_t *resp_len)
4242 {
4243 	struct ceph_osd_request *req;
4244 	int ret;
4245 
4246 	if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE))
4247 		return -E2BIG;
4248 
4249 	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4250 	if (!req)
4251 		return -ENOMEM;
4252 
4253 	ceph_oid_copy(&req->r_base_oid, oid);
4254 	ceph_oloc_copy(&req->r_base_oloc, oloc);
4255 	req->r_flags = flags;
4256 
4257 	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4258 	if (ret)
4259 		goto out_put_req;
4260 
4261 	osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
4262 	if (req_page)
4263 		osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
4264 						  0, false, false);
4265 	if (resp_page)
4266 		osd_req_op_cls_response_data_pages(req, 0, &resp_page,
4267 						   *resp_len, 0, false, false);
4268 
4269 	ceph_osdc_start_request(osdc, req, false);
4270 	ret = ceph_osdc_wait_request(osdc, req);
4271 	if (ret >= 0) {
4272 		ret = req->r_ops[0].rval;
4273 		if (resp_page)
4274 			*resp_len = req->r_ops[0].outdata_len;
4275 	}
4276 
4277 out_put_req:
4278 	ceph_osdc_put_request(req);
4279 	return ret;
4280 }
4281 EXPORT_SYMBOL(ceph_osdc_call);
4282 
4283 /*
4284  * init, shutdown
4285  */
4286 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
4287 {
4288 	int err;
4289 
4290 	dout("init\n");
4291 	osdc->client = client;
4292 	init_rwsem(&osdc->lock);
4293 	osdc->osds = RB_ROOT;
4294 	INIT_LIST_HEAD(&osdc->osd_lru);
4295 	spin_lock_init(&osdc->osd_lru_lock);
4296 	osd_init(&osdc->homeless_osd);
4297 	osdc->homeless_osd.o_osdc = osdc;
4298 	osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
4299 	osdc->last_linger_id = CEPH_LINGER_ID_START;
4300 	osdc->linger_requests = RB_ROOT;
4301 	osdc->map_checks = RB_ROOT;
4302 	osdc->linger_map_checks = RB_ROOT;
4303 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
4304 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
4305 
4306 	err = -ENOMEM;
4307 	osdc->osdmap = ceph_osdmap_alloc();
4308 	if (!osdc->osdmap)
4309 		goto out;
4310 
4311 	osdc->req_mempool = mempool_create_slab_pool(10,
4312 						     ceph_osd_request_cache);
4313 	if (!osdc->req_mempool)
4314 		goto out_map;
4315 
4316 	err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
4317 				PAGE_SIZE, 10, true, "osd_op");
4318 	if (err < 0)
4319 		goto out_mempool;
4320 	err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
4321 				PAGE_SIZE, 10, true, "osd_op_reply");
4322 	if (err < 0)
4323 		goto out_msgpool;
4324 
4325 	err = -ENOMEM;
4326 	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
4327 	if (!osdc->notify_wq)
4328 		goto out_msgpool_reply;
4329 
4330 	schedule_delayed_work(&osdc->timeout_work,
4331 			      osdc->client->options->osd_keepalive_timeout);
4332 	schedule_delayed_work(&osdc->osds_timeout_work,
4333 	    round_jiffies_relative(osdc->client->options->osd_idle_ttl));
4334 
4335 	return 0;
4336 
4337 out_msgpool_reply:
4338 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
4339 out_msgpool:
4340 	ceph_msgpool_destroy(&osdc->msgpool_op);
4341 out_mempool:
4342 	mempool_destroy(osdc->req_mempool);
4343 out_map:
4344 	ceph_osdmap_destroy(osdc->osdmap);
4345 out:
4346 	return err;
4347 }
4348 
4349 void ceph_osdc_stop(struct ceph_osd_client *osdc)
4350 {
4351 	flush_workqueue(osdc->notify_wq);
4352 	destroy_workqueue(osdc->notify_wq);
4353 	cancel_delayed_work_sync(&osdc->timeout_work);
4354 	cancel_delayed_work_sync(&osdc->osds_timeout_work);
4355 
4356 	down_write(&osdc->lock);
4357 	while (!RB_EMPTY_ROOT(&osdc->osds)) {
4358 		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
4359 						struct ceph_osd, o_node);
4360 		close_osd(osd);
4361 	}
4362 	up_write(&osdc->lock);
4363 	WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
4364 	osd_cleanup(&osdc->homeless_osd);
4365 
4366 	WARN_ON(!list_empty(&osdc->osd_lru));
4367 	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
4368 	WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
4369 	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
4370 	WARN_ON(atomic_read(&osdc->num_requests));
4371 	WARN_ON(atomic_read(&osdc->num_homeless));
4372 
4373 	ceph_osdmap_destroy(osdc->osdmap);
4374 	mempool_destroy(osdc->req_mempool);
4375 	ceph_msgpool_destroy(&osdc->msgpool_op);
4376 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
4377 }
4378 
4379 /*
4380  * Read some contiguous pages.  If we cross a stripe boundary, shorten
4381  * *plen.  Return number of bytes read, or error.
4382  */
4383 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
4384 			struct ceph_vino vino, struct ceph_file_layout *layout,
4385 			u64 off, u64 *plen,
4386 			u32 truncate_seq, u64 truncate_size,
4387 			struct page **pages, int num_pages, int page_align)
4388 {
4389 	struct ceph_osd_request *req;
4390 	int rc = 0;
4391 
4392 	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
4393 	     vino.snap, off, *plen);
4394 	req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
4395 				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
4396 				    NULL, truncate_seq, truncate_size,
4397 				    false);
4398 	if (IS_ERR(req))
4399 		return PTR_ERR(req);
4400 
4401 	/* it may be a short read due to an object boundary */
4402 	osd_req_op_extent_osd_data_pages(req, 0,
4403 				pages, *plen, page_align, false, false);
4404 
4405 	dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
4406 	     off, *plen, *plen, page_align);
4407 
4408 	rc = ceph_osdc_start_request(osdc, req, false);
4409 	if (!rc)
4410 		rc = ceph_osdc_wait_request(osdc, req);
4411 
4412 	ceph_osdc_put_request(req);
4413 	dout("readpages result %d\n", rc);
4414 	return rc;
4415 }
4416 EXPORT_SYMBOL(ceph_osdc_readpages);
4417 
4418 /*
4419  * do a synchronous write on N pages
4420  */
4421 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
4422 			 struct ceph_file_layout *layout,
4423 			 struct ceph_snap_context *snapc,
4424 			 u64 off, u64 len,
4425 			 u32 truncate_seq, u64 truncate_size,
4426 			 struct timespec *mtime,
4427 			 struct page **pages, int num_pages)
4428 {
4429 	struct ceph_osd_request *req;
4430 	int rc = 0;
4431 	int page_align = off & ~PAGE_MASK;
4432 
4433 	req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
4434 				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
4435 				    snapc, truncate_seq, truncate_size,
4436 				    true);
4437 	if (IS_ERR(req))
4438 		return PTR_ERR(req);
4439 
4440 	/* it may be a short write due to an object boundary */
4441 	osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
4442 				false, false);
4443 	dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
4444 
4445 	req->r_mtime = *mtime;
4446 	rc = ceph_osdc_start_request(osdc, req, true);
4447 	if (!rc)
4448 		rc = ceph_osdc_wait_request(osdc, req);
4449 
4450 	ceph_osdc_put_request(req);
4451 	if (rc == 0)
4452 		rc = len;
4453 	dout("writepages result %d\n", rc);
4454 	return rc;
4455 }
4456 EXPORT_SYMBOL(ceph_osdc_writepages);
4457 
4458 int ceph_osdc_setup(void)
4459 {
4460 	size_t size = sizeof(struct ceph_osd_request) +
4461 	    CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
4462 
4463 	BUG_ON(ceph_osd_request_cache);
4464 	ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
4465 						   0, 0, NULL);
4466 
4467 	return ceph_osd_request_cache ? 0 : -ENOMEM;
4468 }
4469 EXPORT_SYMBOL(ceph_osdc_setup);
4470 
4471 void ceph_osdc_cleanup(void)
4472 {
4473 	BUG_ON(!ceph_osd_request_cache);
4474 	kmem_cache_destroy(ceph_osd_request_cache);
4475 	ceph_osd_request_cache = NULL;
4476 }
4477 EXPORT_SYMBOL(ceph_osdc_cleanup);
4478 
4479 /*
4480  * handle incoming message
4481  */
4482 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
4483 {
4484 	struct ceph_osd *osd = con->private;
4485 	struct ceph_osd_client *osdc = osd->o_osdc;
4486 	int type = le16_to_cpu(msg->hdr.type);
4487 
4488 	switch (type) {
4489 	case CEPH_MSG_OSD_MAP:
4490 		ceph_osdc_handle_map(osdc, msg);
4491 		break;
4492 	case CEPH_MSG_OSD_OPREPLY:
4493 		handle_reply(osd, msg);
4494 		break;
4495 	case CEPH_MSG_WATCH_NOTIFY:
4496 		handle_watch_notify(osdc, msg);
4497 		break;
4498 
4499 	default:
4500 		pr_err("received unknown message type %d %s\n", type,
4501 		       ceph_msg_type_name(type));
4502 	}
4503 
4504 	ceph_msg_put(msg);
4505 }
4506 
4507 /*
4508  * Lookup and return message for incoming reply.  Don't try to do
4509  * anything about a larger than preallocated data portion of the
4510  * message at the moment - for now, just skip the message.
4511  */
4512 static struct ceph_msg *get_reply(struct ceph_connection *con,
4513 				  struct ceph_msg_header *hdr,
4514 				  int *skip)
4515 {
4516 	struct ceph_osd *osd = con->private;
4517 	struct ceph_osd_client *osdc = osd->o_osdc;
4518 	struct ceph_msg *m = NULL;
4519 	struct ceph_osd_request *req;
4520 	int front_len = le32_to_cpu(hdr->front_len);
4521 	int data_len = le32_to_cpu(hdr->data_len);
4522 	u64 tid = le64_to_cpu(hdr->tid);
4523 
4524 	down_read(&osdc->lock);
4525 	if (!osd_registered(osd)) {
4526 		dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
4527 		*skip = 1;
4528 		goto out_unlock_osdc;
4529 	}
4530 	WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
4531 
4532 	mutex_lock(&osd->lock);
4533 	req = lookup_request(&osd->o_requests, tid);
4534 	if (!req) {
4535 		dout("%s osd%d tid %llu unknown, skipping\n", __func__,
4536 		     osd->o_osd, tid);
4537 		*skip = 1;
4538 		goto out_unlock_session;
4539 	}
4540 
4541 	ceph_msg_revoke_incoming(req->r_reply);
4542 
4543 	if (front_len > req->r_reply->front_alloc_len) {
4544 		pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
4545 			__func__, osd->o_osd, req->r_tid, front_len,
4546 			req->r_reply->front_alloc_len);
4547 		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
4548 				 false);
4549 		if (!m)
4550 			goto out_unlock_session;
4551 		ceph_msg_put(req->r_reply);
4552 		req->r_reply = m;
4553 	}
4554 
4555 	if (data_len > req->r_reply->data_length) {
4556 		pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
4557 			__func__, osd->o_osd, req->r_tid, data_len,
4558 			req->r_reply->data_length);
4559 		m = NULL;
4560 		*skip = 1;
4561 		goto out_unlock_session;
4562 	}
4563 
4564 	m = ceph_msg_get(req->r_reply);
4565 	dout("get_reply tid %lld %p\n", tid, m);
4566 
4567 out_unlock_session:
4568 	mutex_unlock(&osd->lock);
4569 out_unlock_osdc:
4570 	up_read(&osdc->lock);
4571 	return m;
4572 }
4573 
4574 /*
4575  * TODO: switch to a msg-owned pagelist
4576  */
4577 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
4578 {
4579 	struct ceph_msg *m;
4580 	int type = le16_to_cpu(hdr->type);
4581 	u32 front_len = le32_to_cpu(hdr->front_len);
4582 	u32 data_len = le32_to_cpu(hdr->data_len);
4583 
4584 	m = ceph_msg_new(type, front_len, GFP_NOIO, false);
4585 	if (!m)
4586 		return NULL;
4587 
4588 	if (data_len) {
4589 		struct page **pages;
4590 		struct ceph_osd_data osd_data;
4591 
4592 		pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
4593 					       GFP_NOIO);
4594 		if (IS_ERR(pages)) {
4595 			ceph_msg_put(m);
4596 			return NULL;
4597 		}
4598 
4599 		ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false,
4600 					 false);
4601 		ceph_osdc_msg_data_add(m, &osd_data);
4602 	}
4603 
4604 	return m;
4605 }
4606 
4607 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
4608 				  struct ceph_msg_header *hdr,
4609 				  int *skip)
4610 {
4611 	struct ceph_osd *osd = con->private;
4612 	int type = le16_to_cpu(hdr->type);
4613 
4614 	*skip = 0;
4615 	switch (type) {
4616 	case CEPH_MSG_OSD_MAP:
4617 	case CEPH_MSG_WATCH_NOTIFY:
4618 		return alloc_msg_with_page_vector(hdr);
4619 	case CEPH_MSG_OSD_OPREPLY:
4620 		return get_reply(con, hdr, skip);
4621 	default:
4622 		pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
4623 			osd->o_osd, type);
4624 		*skip = 1;
4625 		return NULL;
4626 	}
4627 }
4628 
4629 /*
4630  * Wrappers to refcount containing ceph_osd struct
4631  */
4632 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
4633 {
4634 	struct ceph_osd *osd = con->private;
4635 	if (get_osd(osd))
4636 		return con;
4637 	return NULL;
4638 }
4639 
4640 static void put_osd_con(struct ceph_connection *con)
4641 {
4642 	struct ceph_osd *osd = con->private;
4643 	put_osd(osd);
4644 }
4645 
4646 /*
4647  * authentication
4648  */
4649 /*
4650  * Note: returned pointer is the address of a structure that's
4651  * managed separately.  Caller must *not* attempt to free it.
4652  */
4653 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
4654 					int *proto, int force_new)
4655 {
4656 	struct ceph_osd *o = con->private;
4657 	struct ceph_osd_client *osdc = o->o_osdc;
4658 	struct ceph_auth_client *ac = osdc->client->monc.auth;
4659 	struct ceph_auth_handshake *auth = &o->o_auth;
4660 
4661 	if (force_new && auth->authorizer) {
4662 		ceph_auth_destroy_authorizer(auth->authorizer);
4663 		auth->authorizer = NULL;
4664 	}
4665 	if (!auth->authorizer) {
4666 		int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
4667 						      auth);
4668 		if (ret)
4669 			return ERR_PTR(ret);
4670 	} else {
4671 		int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
4672 						     auth);
4673 		if (ret)
4674 			return ERR_PTR(ret);
4675 	}
4676 	*proto = ac->protocol;
4677 
4678 	return auth;
4679 }
4680 
4681 
4682 static int verify_authorizer_reply(struct ceph_connection *con)
4683 {
4684 	struct ceph_osd *o = con->private;
4685 	struct ceph_osd_client *osdc = o->o_osdc;
4686 	struct ceph_auth_client *ac = osdc->client->monc.auth;
4687 
4688 	return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer);
4689 }
4690 
4691 static int invalidate_authorizer(struct ceph_connection *con)
4692 {
4693 	struct ceph_osd *o = con->private;
4694 	struct ceph_osd_client *osdc = o->o_osdc;
4695 	struct ceph_auth_client *ac = osdc->client->monc.auth;
4696 
4697 	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
4698 	return ceph_monc_validate_auth(&osdc->client->monc);
4699 }
4700 
4701 static void osd_reencode_message(struct ceph_msg *msg)
4702 {
4703 	encode_request_finish(msg);
4704 }
4705 
4706 static int osd_sign_message(struct ceph_msg *msg)
4707 {
4708 	struct ceph_osd *o = msg->con->private;
4709 	struct ceph_auth_handshake *auth = &o->o_auth;
4710 
4711 	return ceph_auth_sign_message(auth, msg);
4712 }
4713 
4714 static int osd_check_message_signature(struct ceph_msg *msg)
4715 {
4716 	struct ceph_osd *o = msg->con->private;
4717 	struct ceph_auth_handshake *auth = &o->o_auth;
4718 
4719 	return ceph_auth_check_message_signature(auth, msg);
4720 }
4721 
4722 static const struct ceph_connection_operations osd_con_ops = {
4723 	.get = get_osd_con,
4724 	.put = put_osd_con,
4725 	.dispatch = dispatch,
4726 	.get_authorizer = get_authorizer,
4727 	.verify_authorizer_reply = verify_authorizer_reply,
4728 	.invalidate_authorizer = invalidate_authorizer,
4729 	.alloc_msg = alloc_msg,
4730 	.reencode_message = osd_reencode_message,
4731 	.sign_message = osd_sign_message,
4732 	.check_message_signature = osd_check_message_signature,
4733 	.fault = osd_fault,
4734 };
4735