xref: /openbmc/linux/net/ceph/osd_client.c (revision 97da55fc)
1 #include <linux/ceph/ceph_debug.h>
2 
3 #include <linux/module.h>
4 #include <linux/err.h>
5 #include <linux/highmem.h>
6 #include <linux/mm.h>
7 #include <linux/pagemap.h>
8 #include <linux/slab.h>
9 #include <linux/uaccess.h>
10 #ifdef CONFIG_BLOCK
11 #include <linux/bio.h>
12 #endif
13 
14 #include <linux/ceph/libceph.h>
15 #include <linux/ceph/osd_client.h>
16 #include <linux/ceph/messenger.h>
17 #include <linux/ceph/decode.h>
18 #include <linux/ceph/auth.h>
19 #include <linux/ceph/pagelist.h>
20 
21 #define OSD_OP_FRONT_LEN	4096
22 #define OSD_OPREPLY_FRONT_LEN	512
23 
24 static const struct ceph_connection_operations osd_con_ops;
25 
26 static void __send_queued(struct ceph_osd_client *osdc);
27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28 static void __register_request(struct ceph_osd_client *osdc,
29 			       struct ceph_osd_request *req);
30 static void __unregister_linger_request(struct ceph_osd_client *osdc,
31 					struct ceph_osd_request *req);
32 static void __send_request(struct ceph_osd_client *osdc,
33 			   struct ceph_osd_request *req);
34 
35 static int op_has_extent(int op)
36 {
37 	return (op == CEPH_OSD_OP_READ ||
38 		op == CEPH_OSD_OP_WRITE);
39 }
40 
41 /*
42  * Implement client access to distributed object storage cluster.
43  *
44  * All data objects are stored within a cluster/cloud of OSDs, or
45  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
46  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
47  * remote daemons serving up and coordinating consistent and safe
48  * access to storage.
49  *
50  * Cluster membership and the mapping of data objects onto storage devices
51  * are described by the osd map.
52  *
53  * We keep track of pending OSD requests (read, write), resubmit
54  * requests to different OSDs when the cluster topology/data layout
55  * change, or retry the affected requests when the communications
56  * channel with an OSD is reset.
57  */
58 
59 /*
60  * calculate the mapping of a file extent onto an object, and fill out the
61  * request accordingly.  shorten extent as necessary if it crosses an
62  * object boundary.
63  *
64  * fill osd op in request message.
65  */
66 static int calc_layout(struct ceph_vino vino,
67 		       struct ceph_file_layout *layout,
68 		       u64 off, u64 *plen,
69 		       struct ceph_osd_request *req,
70 		       struct ceph_osd_req_op *op)
71 {
72 	u64 orig_len = *plen;
73 	u64 bno = 0;
74 	u64 objoff = 0;
75 	u64 objlen = 0;
76 	int r;
77 
78 	/* object extent? */
79 	r = ceph_calc_file_object_mapping(layout, off, orig_len, &bno,
80 					  &objoff, &objlen);
81 	if (r < 0)
82 		return r;
83 	if (objlen < orig_len) {
84 		*plen = objlen;
85 		dout(" skipping last %llu, final file extent %llu~%llu\n",
86 		     orig_len - *plen, off, *plen);
87 	}
88 
89 	if (op_has_extent(op->op)) {
90 		u32 osize = le32_to_cpu(layout->fl_object_size);
91 		op->extent.offset = objoff;
92 		op->extent.length = objlen;
93 		if (op->extent.truncate_size <= off - objoff) {
94 			op->extent.truncate_size = 0;
95 		} else {
96 			op->extent.truncate_size -= off - objoff;
97 			if (op->extent.truncate_size > osize)
98 				op->extent.truncate_size = osize;
99 		}
100 	}
101 	req->r_num_pages = calc_pages_for(off, *plen);
102 	req->r_page_alignment = off & ~PAGE_MASK;
103 	if (op->op == CEPH_OSD_OP_WRITE)
104 		op->payload_len = *plen;
105 
106 	dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
107 	     bno, objoff, objlen, req->r_num_pages);
108 
109 	snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
110 	req->r_oid_len = strlen(req->r_oid);
111 
112 	return r;
113 }
114 
115 /*
116  * requests
117  */
118 void ceph_osdc_release_request(struct kref *kref)
119 {
120 	struct ceph_osd_request *req = container_of(kref,
121 						    struct ceph_osd_request,
122 						    r_kref);
123 
124 	if (req->r_request)
125 		ceph_msg_put(req->r_request);
126 	if (req->r_con_filling_msg) {
127 		dout("%s revoking msg %p from con %p\n", __func__,
128 		     req->r_reply, req->r_con_filling_msg);
129 		ceph_msg_revoke_incoming(req->r_reply);
130 		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
131 		req->r_con_filling_msg = NULL;
132 	}
133 	if (req->r_reply)
134 		ceph_msg_put(req->r_reply);
135 	if (req->r_own_pages)
136 		ceph_release_page_vector(req->r_pages,
137 					 req->r_num_pages);
138 	ceph_put_snap_context(req->r_snapc);
139 	ceph_pagelist_release(&req->r_trail);
140 	if (req->r_mempool)
141 		mempool_free(req, req->r_osdc->req_mempool);
142 	else
143 		kfree(req);
144 }
145 EXPORT_SYMBOL(ceph_osdc_release_request);
146 
147 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
148 					       struct ceph_snap_context *snapc,
149 					       unsigned int num_ops,
150 					       bool use_mempool,
151 					       gfp_t gfp_flags)
152 {
153 	struct ceph_osd_request *req;
154 	struct ceph_msg *msg;
155 	size_t msg_size;
156 
157 	msg_size = 4 + 4 + 8 + 8 + 4+8;
158 	msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
159 	msg_size += 1 + 8 + 4 + 4;     /* pg_t */
160 	msg_size += 4 + MAX_OBJ_NAME_SIZE;
161 	msg_size += 2 + num_ops*sizeof(struct ceph_osd_op);
162 	msg_size += 8;  /* snapid */
163 	msg_size += 8;  /* snap_seq */
164 	msg_size += 8 * (snapc ? snapc->num_snaps : 0);  /* snaps */
165 	msg_size += 4;
166 
167 	if (use_mempool) {
168 		req = mempool_alloc(osdc->req_mempool, gfp_flags);
169 		memset(req, 0, sizeof(*req));
170 	} else {
171 		req = kzalloc(sizeof(*req), gfp_flags);
172 	}
173 	if (req == NULL)
174 		return NULL;
175 
176 	req->r_osdc = osdc;
177 	req->r_mempool = use_mempool;
178 
179 	kref_init(&req->r_kref);
180 	init_completion(&req->r_completion);
181 	init_completion(&req->r_safe_completion);
182 	RB_CLEAR_NODE(&req->r_node);
183 	INIT_LIST_HEAD(&req->r_unsafe_item);
184 	INIT_LIST_HEAD(&req->r_linger_item);
185 	INIT_LIST_HEAD(&req->r_linger_osd);
186 	INIT_LIST_HEAD(&req->r_req_lru_item);
187 	INIT_LIST_HEAD(&req->r_osd_item);
188 
189 	/* create reply message */
190 	if (use_mempool)
191 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
192 	else
193 		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
194 				   OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
195 	if (!msg) {
196 		ceph_osdc_put_request(req);
197 		return NULL;
198 	}
199 	req->r_reply = msg;
200 
201 	ceph_pagelist_init(&req->r_trail);
202 
203 	/* create request message; allow space for oid */
204 	if (use_mempool)
205 		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
206 	else
207 		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
208 	if (!msg) {
209 		ceph_osdc_put_request(req);
210 		return NULL;
211 	}
212 
213 	memset(msg->front.iov_base, 0, msg->front.iov_len);
214 
215 	req->r_request = msg;
216 
217 	return req;
218 }
219 EXPORT_SYMBOL(ceph_osdc_alloc_request);
220 
221 static void osd_req_encode_op(struct ceph_osd_request *req,
222 			      struct ceph_osd_op *dst,
223 			      struct ceph_osd_req_op *src)
224 {
225 	dst->op = cpu_to_le16(src->op);
226 
227 	switch (src->op) {
228 	case CEPH_OSD_OP_STAT:
229 		break;
230 	case CEPH_OSD_OP_READ:
231 	case CEPH_OSD_OP_WRITE:
232 		dst->extent.offset =
233 			cpu_to_le64(src->extent.offset);
234 		dst->extent.length =
235 			cpu_to_le64(src->extent.length);
236 		dst->extent.truncate_size =
237 			cpu_to_le64(src->extent.truncate_size);
238 		dst->extent.truncate_seq =
239 			cpu_to_le32(src->extent.truncate_seq);
240 		break;
241 	case CEPH_OSD_OP_CALL:
242 		dst->cls.class_len = src->cls.class_len;
243 		dst->cls.method_len = src->cls.method_len;
244 		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
245 
246 		ceph_pagelist_append(&req->r_trail, src->cls.class_name,
247 				     src->cls.class_len);
248 		ceph_pagelist_append(&req->r_trail, src->cls.method_name,
249 				     src->cls.method_len);
250 		ceph_pagelist_append(&req->r_trail, src->cls.indata,
251 				     src->cls.indata_len);
252 		break;
253 	case CEPH_OSD_OP_STARTSYNC:
254 		break;
255 	case CEPH_OSD_OP_NOTIFY_ACK:
256 	case CEPH_OSD_OP_WATCH:
257 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
258 		dst->watch.ver = cpu_to_le64(src->watch.ver);
259 		dst->watch.flag = src->watch.flag;
260 		break;
261 	default:
262 		pr_err("unrecognized osd opcode %d\n", dst->op);
263 		WARN_ON(1);
264 		break;
265 	case CEPH_OSD_OP_MAPEXT:
266 	case CEPH_OSD_OP_MASKTRUNC:
267 	case CEPH_OSD_OP_SPARSE_READ:
268 	case CEPH_OSD_OP_NOTIFY:
269 	case CEPH_OSD_OP_ASSERT_VER:
270 	case CEPH_OSD_OP_WRITEFULL:
271 	case CEPH_OSD_OP_TRUNCATE:
272 	case CEPH_OSD_OP_ZERO:
273 	case CEPH_OSD_OP_DELETE:
274 	case CEPH_OSD_OP_APPEND:
275 	case CEPH_OSD_OP_SETTRUNC:
276 	case CEPH_OSD_OP_TRIMTRUNC:
277 	case CEPH_OSD_OP_TMAPUP:
278 	case CEPH_OSD_OP_TMAPPUT:
279 	case CEPH_OSD_OP_TMAPGET:
280 	case CEPH_OSD_OP_CREATE:
281 	case CEPH_OSD_OP_ROLLBACK:
282 	case CEPH_OSD_OP_OMAPGETKEYS:
283 	case CEPH_OSD_OP_OMAPGETVALS:
284 	case CEPH_OSD_OP_OMAPGETHEADER:
285 	case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
286 	case CEPH_OSD_OP_MODE_RD:
287 	case CEPH_OSD_OP_OMAPSETVALS:
288 	case CEPH_OSD_OP_OMAPSETHEADER:
289 	case CEPH_OSD_OP_OMAPCLEAR:
290 	case CEPH_OSD_OP_OMAPRMKEYS:
291 	case CEPH_OSD_OP_OMAP_CMP:
292 	case CEPH_OSD_OP_CLONERANGE:
293 	case CEPH_OSD_OP_ASSERT_SRC_VERSION:
294 	case CEPH_OSD_OP_SRC_CMPXATTR:
295 	case CEPH_OSD_OP_GETXATTR:
296 	case CEPH_OSD_OP_GETXATTRS:
297 	case CEPH_OSD_OP_CMPXATTR:
298 	case CEPH_OSD_OP_SETXATTR:
299 	case CEPH_OSD_OP_SETXATTRS:
300 	case CEPH_OSD_OP_RESETXATTRS:
301 	case CEPH_OSD_OP_RMXATTR:
302 	case CEPH_OSD_OP_PULL:
303 	case CEPH_OSD_OP_PUSH:
304 	case CEPH_OSD_OP_BALANCEREADS:
305 	case CEPH_OSD_OP_UNBALANCEREADS:
306 	case CEPH_OSD_OP_SCRUB:
307 	case CEPH_OSD_OP_SCRUB_RESERVE:
308 	case CEPH_OSD_OP_SCRUB_UNRESERVE:
309 	case CEPH_OSD_OP_SCRUB_STOP:
310 	case CEPH_OSD_OP_SCRUB_MAP:
311 	case CEPH_OSD_OP_WRLOCK:
312 	case CEPH_OSD_OP_WRUNLOCK:
313 	case CEPH_OSD_OP_RDLOCK:
314 	case CEPH_OSD_OP_RDUNLOCK:
315 	case CEPH_OSD_OP_UPLOCK:
316 	case CEPH_OSD_OP_DNLOCK:
317 	case CEPH_OSD_OP_PGLS:
318 	case CEPH_OSD_OP_PGLS_FILTER:
319 		pr_err("unsupported osd opcode %s\n",
320 			ceph_osd_op_name(dst->op));
321 		WARN_ON(1);
322 		break;
323 	}
324 	dst->payload_len = cpu_to_le32(src->payload_len);
325 }
326 
327 /*
328  * build new request AND message
329  *
330  */
331 void ceph_osdc_build_request(struct ceph_osd_request *req,
332 			     u64 off, u64 len, unsigned int num_ops,
333 			     struct ceph_osd_req_op *src_ops,
334 			     struct ceph_snap_context *snapc, u64 snap_id,
335 			     struct timespec *mtime)
336 {
337 	struct ceph_msg *msg = req->r_request;
338 	struct ceph_osd_req_op *src_op;
339 	void *p;
340 	size_t msg_size;
341 	int flags = req->r_flags;
342 	u64 data_len;
343 	int i;
344 
345 	req->r_num_ops = num_ops;
346 	req->r_snapid = snap_id;
347 	req->r_snapc = ceph_get_snap_context(snapc);
348 
349 	/* encode request */
350 	msg->hdr.version = cpu_to_le16(4);
351 
352 	p = msg->front.iov_base;
353 	ceph_encode_32(&p, 1);   /* client_inc  is always 1 */
354 	req->r_request_osdmap_epoch = p;
355 	p += 4;
356 	req->r_request_flags = p;
357 	p += 4;
358 	if (req->r_flags & CEPH_OSD_FLAG_WRITE)
359 		ceph_encode_timespec(p, mtime);
360 	p += sizeof(struct ceph_timespec);
361 	req->r_request_reassert_version = p;
362 	p += sizeof(struct ceph_eversion); /* will get filled in */
363 
364 	/* oloc */
365 	ceph_encode_8(&p, 4);
366 	ceph_encode_8(&p, 4);
367 	ceph_encode_32(&p, 8 + 4 + 4);
368 	req->r_request_pool = p;
369 	p += 8;
370 	ceph_encode_32(&p, -1);  /* preferred */
371 	ceph_encode_32(&p, 0);   /* key len */
372 
373 	ceph_encode_8(&p, 1);
374 	req->r_request_pgid = p;
375 	p += 8 + 4;
376 	ceph_encode_32(&p, -1);  /* preferred */
377 
378 	/* oid */
379 	ceph_encode_32(&p, req->r_oid_len);
380 	memcpy(p, req->r_oid, req->r_oid_len);
381 	dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len);
382 	p += req->r_oid_len;
383 
384 	/* ops */
385 	ceph_encode_16(&p, num_ops);
386 	src_op = src_ops;
387 	req->r_request_ops = p;
388 	for (i = 0; i < num_ops; i++, src_op++) {
389 		osd_req_encode_op(req, p, src_op);
390 		p += sizeof(struct ceph_osd_op);
391 	}
392 
393 	/* snaps */
394 	ceph_encode_64(&p, req->r_snapid);
395 	ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0);
396 	ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0);
397 	if (req->r_snapc) {
398 		for (i = 0; i < snapc->num_snaps; i++) {
399 			ceph_encode_64(&p, req->r_snapc->snaps[i]);
400 		}
401 	}
402 
403 	req->r_request_attempts = p;
404 	p += 4;
405 
406 	data_len = req->r_trail.length;
407 	if (flags & CEPH_OSD_FLAG_WRITE) {
408 		req->r_request->hdr.data_off = cpu_to_le16(off);
409 		data_len += len;
410 	}
411 	req->r_request->hdr.data_len = cpu_to_le32(data_len);
412 	req->r_request->page_alignment = req->r_page_alignment;
413 
414 	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
415 	msg_size = p - msg->front.iov_base;
416 	msg->front.iov_len = msg_size;
417 	msg->hdr.front_len = cpu_to_le32(msg_size);
418 
419 	dout("build_request msg_size was %d num_ops %d\n", (int)msg_size,
420 	     num_ops);
421 	return;
422 }
423 EXPORT_SYMBOL(ceph_osdc_build_request);
424 
425 /*
426  * build new request AND message, calculate layout, and adjust file
427  * extent as needed.
428  *
429  * if the file was recently truncated, we include information about its
430  * old and new size so that the object can be updated appropriately.  (we
431  * avoid synchronously deleting truncated objects because it's slow.)
432  *
433  * if @do_sync, include a 'startsync' command so that the osd will flush
434  * data quickly.
435  */
436 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
437 					       struct ceph_file_layout *layout,
438 					       struct ceph_vino vino,
439 					       u64 off, u64 *plen,
440 					       int opcode, int flags,
441 					       struct ceph_snap_context *snapc,
442 					       int do_sync,
443 					       u32 truncate_seq,
444 					       u64 truncate_size,
445 					       struct timespec *mtime,
446 					       bool use_mempool,
447 					       int page_align)
448 {
449 	struct ceph_osd_req_op ops[2];
450 	struct ceph_osd_request *req;
451 	unsigned int num_op = 1;
452 	int r;
453 
454 	memset(&ops, 0, sizeof ops);
455 
456 	ops[0].op = opcode;
457 	ops[0].extent.truncate_seq = truncate_seq;
458 	ops[0].extent.truncate_size = truncate_size;
459 
460 	if (do_sync) {
461 		ops[1].op = CEPH_OSD_OP_STARTSYNC;
462 		num_op++;
463 	}
464 
465 	req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool,
466 					GFP_NOFS);
467 	if (!req)
468 		return ERR_PTR(-ENOMEM);
469 	req->r_flags = flags;
470 
471 	/* calculate max write size */
472 	r = calc_layout(vino, layout, off, plen, req, ops);
473 	if (r < 0)
474 		return ERR_PTR(r);
475 	req->r_file_layout = *layout;  /* keep a copy */
476 
477 	/* in case it differs from natural (file) alignment that
478 	   calc_layout filled in for us */
479 	req->r_num_pages = calc_pages_for(page_align, *plen);
480 	req->r_page_alignment = page_align;
481 
482 	ceph_osdc_build_request(req, off, *plen, num_op, ops,
483 				snapc, vino.snap, mtime);
484 
485 	return req;
486 }
487 EXPORT_SYMBOL(ceph_osdc_new_request);
488 
489 /*
490  * We keep osd requests in an rbtree, sorted by ->r_tid.
491  */
492 static void __insert_request(struct ceph_osd_client *osdc,
493 			     struct ceph_osd_request *new)
494 {
495 	struct rb_node **p = &osdc->requests.rb_node;
496 	struct rb_node *parent = NULL;
497 	struct ceph_osd_request *req = NULL;
498 
499 	while (*p) {
500 		parent = *p;
501 		req = rb_entry(parent, struct ceph_osd_request, r_node);
502 		if (new->r_tid < req->r_tid)
503 			p = &(*p)->rb_left;
504 		else if (new->r_tid > req->r_tid)
505 			p = &(*p)->rb_right;
506 		else
507 			BUG();
508 	}
509 
510 	rb_link_node(&new->r_node, parent, p);
511 	rb_insert_color(&new->r_node, &osdc->requests);
512 }
513 
514 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
515 						 u64 tid)
516 {
517 	struct ceph_osd_request *req;
518 	struct rb_node *n = osdc->requests.rb_node;
519 
520 	while (n) {
521 		req = rb_entry(n, struct ceph_osd_request, r_node);
522 		if (tid < req->r_tid)
523 			n = n->rb_left;
524 		else if (tid > req->r_tid)
525 			n = n->rb_right;
526 		else
527 			return req;
528 	}
529 	return NULL;
530 }
531 
532 static struct ceph_osd_request *
533 __lookup_request_ge(struct ceph_osd_client *osdc,
534 		    u64 tid)
535 {
536 	struct ceph_osd_request *req;
537 	struct rb_node *n = osdc->requests.rb_node;
538 
539 	while (n) {
540 		req = rb_entry(n, struct ceph_osd_request, r_node);
541 		if (tid < req->r_tid) {
542 			if (!n->rb_left)
543 				return req;
544 			n = n->rb_left;
545 		} else if (tid > req->r_tid) {
546 			n = n->rb_right;
547 		} else {
548 			return req;
549 		}
550 	}
551 	return NULL;
552 }
553 
554 /*
555  * Resubmit requests pending on the given osd.
556  */
557 static void __kick_osd_requests(struct ceph_osd_client *osdc,
558 				struct ceph_osd *osd)
559 {
560 	struct ceph_osd_request *req, *nreq;
561 	int err;
562 
563 	dout("__kick_osd_requests osd%d\n", osd->o_osd);
564 	err = __reset_osd(osdc, osd);
565 	if (err)
566 		return;
567 
568 	list_for_each_entry(req, &osd->o_requests, r_osd_item) {
569 		list_move(&req->r_req_lru_item, &osdc->req_unsent);
570 		dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
571 		     osd->o_osd);
572 		if (!req->r_linger)
573 			req->r_flags |= CEPH_OSD_FLAG_RETRY;
574 	}
575 
576 	list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
577 				 r_linger_osd) {
578 		/*
579 		 * reregister request prior to unregistering linger so
580 		 * that r_osd is preserved.
581 		 */
582 		BUG_ON(!list_empty(&req->r_req_lru_item));
583 		__register_request(osdc, req);
584 		list_add(&req->r_req_lru_item, &osdc->req_unsent);
585 		list_add(&req->r_osd_item, &req->r_osd->o_requests);
586 		__unregister_linger_request(osdc, req);
587 		dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
588 		     osd->o_osd);
589 	}
590 }
591 
592 /*
593  * If the osd connection drops, we need to resubmit all requests.
594  */
595 static void osd_reset(struct ceph_connection *con)
596 {
597 	struct ceph_osd *osd = con->private;
598 	struct ceph_osd_client *osdc;
599 
600 	if (!osd)
601 		return;
602 	dout("osd_reset osd%d\n", osd->o_osd);
603 	osdc = osd->o_osdc;
604 	down_read(&osdc->map_sem);
605 	mutex_lock(&osdc->request_mutex);
606 	__kick_osd_requests(osdc, osd);
607 	__send_queued(osdc);
608 	mutex_unlock(&osdc->request_mutex);
609 	up_read(&osdc->map_sem);
610 }
611 
612 /*
613  * Track open sessions with osds.
614  */
615 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
616 {
617 	struct ceph_osd *osd;
618 
619 	osd = kzalloc(sizeof(*osd), GFP_NOFS);
620 	if (!osd)
621 		return NULL;
622 
623 	atomic_set(&osd->o_ref, 1);
624 	osd->o_osdc = osdc;
625 	osd->o_osd = onum;
626 	RB_CLEAR_NODE(&osd->o_node);
627 	INIT_LIST_HEAD(&osd->o_requests);
628 	INIT_LIST_HEAD(&osd->o_linger_requests);
629 	INIT_LIST_HEAD(&osd->o_osd_lru);
630 	osd->o_incarnation = 1;
631 
632 	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
633 
634 	INIT_LIST_HEAD(&osd->o_keepalive_item);
635 	return osd;
636 }
637 
638 static struct ceph_osd *get_osd(struct ceph_osd *osd)
639 {
640 	if (atomic_inc_not_zero(&osd->o_ref)) {
641 		dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
642 		     atomic_read(&osd->o_ref));
643 		return osd;
644 	} else {
645 		dout("get_osd %p FAIL\n", osd);
646 		return NULL;
647 	}
648 }
649 
650 static void put_osd(struct ceph_osd *osd)
651 {
652 	dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
653 	     atomic_read(&osd->o_ref) - 1);
654 	if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) {
655 		struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
656 
657 		if (ac->ops && ac->ops->destroy_authorizer)
658 			ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer);
659 		kfree(osd);
660 	}
661 }
662 
663 /*
664  * remove an osd from our map
665  */
666 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
667 {
668 	dout("__remove_osd %p\n", osd);
669 	BUG_ON(!list_empty(&osd->o_requests));
670 	rb_erase(&osd->o_node, &osdc->osds);
671 	list_del_init(&osd->o_osd_lru);
672 	ceph_con_close(&osd->o_con);
673 	put_osd(osd);
674 }
675 
676 static void remove_all_osds(struct ceph_osd_client *osdc)
677 {
678 	dout("%s %p\n", __func__, osdc);
679 	mutex_lock(&osdc->request_mutex);
680 	while (!RB_EMPTY_ROOT(&osdc->osds)) {
681 		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
682 						struct ceph_osd, o_node);
683 		__remove_osd(osdc, osd);
684 	}
685 	mutex_unlock(&osdc->request_mutex);
686 }
687 
688 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
689 			      struct ceph_osd *osd)
690 {
691 	dout("__move_osd_to_lru %p\n", osd);
692 	BUG_ON(!list_empty(&osd->o_osd_lru));
693 	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
694 	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
695 }
696 
697 static void __remove_osd_from_lru(struct ceph_osd *osd)
698 {
699 	dout("__remove_osd_from_lru %p\n", osd);
700 	if (!list_empty(&osd->o_osd_lru))
701 		list_del_init(&osd->o_osd_lru);
702 }
703 
704 static void remove_old_osds(struct ceph_osd_client *osdc)
705 {
706 	struct ceph_osd *osd, *nosd;
707 
708 	dout("__remove_old_osds %p\n", osdc);
709 	mutex_lock(&osdc->request_mutex);
710 	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
711 		if (time_before(jiffies, osd->lru_ttl))
712 			break;
713 		__remove_osd(osdc, osd);
714 	}
715 	mutex_unlock(&osdc->request_mutex);
716 }
717 
718 /*
719  * reset osd connect
720  */
721 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
722 {
723 	struct ceph_entity_addr *peer_addr;
724 
725 	dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
726 	if (list_empty(&osd->o_requests) &&
727 	    list_empty(&osd->o_linger_requests)) {
728 		__remove_osd(osdc, osd);
729 
730 		return -ENODEV;
731 	}
732 
733 	peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
734 	if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
735 			!ceph_con_opened(&osd->o_con)) {
736 		struct ceph_osd_request *req;
737 
738 		dout(" osd addr hasn't changed and connection never opened,"
739 		     " letting msgr retry");
740 		/* touch each r_stamp for handle_timeout()'s benfit */
741 		list_for_each_entry(req, &osd->o_requests, r_osd_item)
742 			req->r_stamp = jiffies;
743 
744 		return -EAGAIN;
745 	}
746 
747 	ceph_con_close(&osd->o_con);
748 	ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
749 	osd->o_incarnation++;
750 
751 	return 0;
752 }
753 
754 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
755 {
756 	struct rb_node **p = &osdc->osds.rb_node;
757 	struct rb_node *parent = NULL;
758 	struct ceph_osd *osd = NULL;
759 
760 	dout("__insert_osd %p osd%d\n", new, new->o_osd);
761 	while (*p) {
762 		parent = *p;
763 		osd = rb_entry(parent, struct ceph_osd, o_node);
764 		if (new->o_osd < osd->o_osd)
765 			p = &(*p)->rb_left;
766 		else if (new->o_osd > osd->o_osd)
767 			p = &(*p)->rb_right;
768 		else
769 			BUG();
770 	}
771 
772 	rb_link_node(&new->o_node, parent, p);
773 	rb_insert_color(&new->o_node, &osdc->osds);
774 }
775 
776 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
777 {
778 	struct ceph_osd *osd;
779 	struct rb_node *n = osdc->osds.rb_node;
780 
781 	while (n) {
782 		osd = rb_entry(n, struct ceph_osd, o_node);
783 		if (o < osd->o_osd)
784 			n = n->rb_left;
785 		else if (o > osd->o_osd)
786 			n = n->rb_right;
787 		else
788 			return osd;
789 	}
790 	return NULL;
791 }
792 
793 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
794 {
795 	schedule_delayed_work(&osdc->timeout_work,
796 			osdc->client->options->osd_keepalive_timeout * HZ);
797 }
798 
799 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
800 {
801 	cancel_delayed_work(&osdc->timeout_work);
802 }
803 
804 /*
805  * Register request, assign tid.  If this is the first request, set up
806  * the timeout event.
807  */
808 static void __register_request(struct ceph_osd_client *osdc,
809 			       struct ceph_osd_request *req)
810 {
811 	req->r_tid = ++osdc->last_tid;
812 	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
813 	dout("__register_request %p tid %lld\n", req, req->r_tid);
814 	__insert_request(osdc, req);
815 	ceph_osdc_get_request(req);
816 	osdc->num_requests++;
817 	if (osdc->num_requests == 1) {
818 		dout(" first request, scheduling timeout\n");
819 		__schedule_osd_timeout(osdc);
820 	}
821 }
822 
823 static void register_request(struct ceph_osd_client *osdc,
824 			     struct ceph_osd_request *req)
825 {
826 	mutex_lock(&osdc->request_mutex);
827 	__register_request(osdc, req);
828 	mutex_unlock(&osdc->request_mutex);
829 }
830 
831 /*
832  * called under osdc->request_mutex
833  */
834 static void __unregister_request(struct ceph_osd_client *osdc,
835 				 struct ceph_osd_request *req)
836 {
837 	if (RB_EMPTY_NODE(&req->r_node)) {
838 		dout("__unregister_request %p tid %lld not registered\n",
839 			req, req->r_tid);
840 		return;
841 	}
842 
843 	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
844 	rb_erase(&req->r_node, &osdc->requests);
845 	osdc->num_requests--;
846 
847 	if (req->r_osd) {
848 		/* make sure the original request isn't in flight. */
849 		ceph_msg_revoke(req->r_request);
850 
851 		list_del_init(&req->r_osd_item);
852 		if (list_empty(&req->r_osd->o_requests) &&
853 		    list_empty(&req->r_osd->o_linger_requests)) {
854 			dout("moving osd to %p lru\n", req->r_osd);
855 			__move_osd_to_lru(osdc, req->r_osd);
856 		}
857 		if (list_empty(&req->r_linger_item))
858 			req->r_osd = NULL;
859 	}
860 
861 	list_del_init(&req->r_req_lru_item);
862 	ceph_osdc_put_request(req);
863 
864 	if (osdc->num_requests == 0) {
865 		dout(" no requests, canceling timeout\n");
866 		__cancel_osd_timeout(osdc);
867 	}
868 }
869 
870 /*
871  * Cancel a previously queued request message
872  */
873 static void __cancel_request(struct ceph_osd_request *req)
874 {
875 	if (req->r_sent && req->r_osd) {
876 		ceph_msg_revoke(req->r_request);
877 		req->r_sent = 0;
878 	}
879 }
880 
881 static void __register_linger_request(struct ceph_osd_client *osdc,
882 				    struct ceph_osd_request *req)
883 {
884 	dout("__register_linger_request %p\n", req);
885 	list_add_tail(&req->r_linger_item, &osdc->req_linger);
886 	if (req->r_osd)
887 		list_add_tail(&req->r_linger_osd,
888 			      &req->r_osd->o_linger_requests);
889 }
890 
891 static void __unregister_linger_request(struct ceph_osd_client *osdc,
892 					struct ceph_osd_request *req)
893 {
894 	dout("__unregister_linger_request %p\n", req);
895 	list_del_init(&req->r_linger_item);
896 	if (req->r_osd) {
897 		list_del_init(&req->r_linger_osd);
898 
899 		if (list_empty(&req->r_osd->o_requests) &&
900 		    list_empty(&req->r_osd->o_linger_requests)) {
901 			dout("moving osd to %p lru\n", req->r_osd);
902 			__move_osd_to_lru(osdc, req->r_osd);
903 		}
904 		if (list_empty(&req->r_osd_item))
905 			req->r_osd = NULL;
906 	}
907 }
908 
909 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
910 					 struct ceph_osd_request *req)
911 {
912 	mutex_lock(&osdc->request_mutex);
913 	if (req->r_linger) {
914 		__unregister_linger_request(osdc, req);
915 		ceph_osdc_put_request(req);
916 	}
917 	mutex_unlock(&osdc->request_mutex);
918 }
919 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
920 
921 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
922 				  struct ceph_osd_request *req)
923 {
924 	if (!req->r_linger) {
925 		dout("set_request_linger %p\n", req);
926 		req->r_linger = 1;
927 		/*
928 		 * caller is now responsible for calling
929 		 * unregister_linger_request
930 		 */
931 		ceph_osdc_get_request(req);
932 	}
933 }
934 EXPORT_SYMBOL(ceph_osdc_set_request_linger);
935 
936 /*
937  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
938  * (as needed), and set the request r_osd appropriately.  If there is
939  * no up osd, set r_osd to NULL.  Move the request to the appropriate list
940  * (unsent, homeless) or leave on in-flight lru.
941  *
942  * Return 0 if unchanged, 1 if changed, or negative on error.
943  *
944  * Caller should hold map_sem for read and request_mutex.
945  */
946 static int __map_request(struct ceph_osd_client *osdc,
947 			 struct ceph_osd_request *req, int force_resend)
948 {
949 	struct ceph_pg pgid;
950 	int acting[CEPH_PG_MAX_SIZE];
951 	int o = -1, num = 0;
952 	int err;
953 
954 	dout("map_request %p tid %lld\n", req, req->r_tid);
955 	err = ceph_calc_object_layout(&pgid, req->r_oid,
956 				      &req->r_file_layout, osdc->osdmap);
957 	if (err) {
958 		list_move(&req->r_req_lru_item, &osdc->req_notarget);
959 		return err;
960 	}
961 	req->r_pgid = pgid;
962 
963 	err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
964 	if (err > 0) {
965 		o = acting[0];
966 		num = err;
967 	}
968 
969 	if ((!force_resend &&
970 	     req->r_osd && req->r_osd->o_osd == o &&
971 	     req->r_sent >= req->r_osd->o_incarnation &&
972 	     req->r_num_pg_osds == num &&
973 	     memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
974 	    (req->r_osd == NULL && o == -1))
975 		return 0;  /* no change */
976 
977 	dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
978 	     req->r_tid, pgid.pool, pgid.seed, o,
979 	     req->r_osd ? req->r_osd->o_osd : -1);
980 
981 	/* record full pg acting set */
982 	memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
983 	req->r_num_pg_osds = num;
984 
985 	if (req->r_osd) {
986 		__cancel_request(req);
987 		list_del_init(&req->r_osd_item);
988 		req->r_osd = NULL;
989 	}
990 
991 	req->r_osd = __lookup_osd(osdc, o);
992 	if (!req->r_osd && o >= 0) {
993 		err = -ENOMEM;
994 		req->r_osd = create_osd(osdc, o);
995 		if (!req->r_osd) {
996 			list_move(&req->r_req_lru_item, &osdc->req_notarget);
997 			goto out;
998 		}
999 
1000 		dout("map_request osd %p is osd%d\n", req->r_osd, o);
1001 		__insert_osd(osdc, req->r_osd);
1002 
1003 		ceph_con_open(&req->r_osd->o_con,
1004 			      CEPH_ENTITY_TYPE_OSD, o,
1005 			      &osdc->osdmap->osd_addr[o]);
1006 	}
1007 
1008 	if (req->r_osd) {
1009 		__remove_osd_from_lru(req->r_osd);
1010 		list_add(&req->r_osd_item, &req->r_osd->o_requests);
1011 		list_move(&req->r_req_lru_item, &osdc->req_unsent);
1012 	} else {
1013 		list_move(&req->r_req_lru_item, &osdc->req_notarget);
1014 	}
1015 	err = 1;   /* osd or pg changed */
1016 
1017 out:
1018 	return err;
1019 }
1020 
1021 /*
1022  * caller should hold map_sem (for read) and request_mutex
1023  */
1024 static void __send_request(struct ceph_osd_client *osdc,
1025 			   struct ceph_osd_request *req)
1026 {
1027 	void *p;
1028 
1029 	dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n",
1030 	     req, req->r_tid, req->r_osd->o_osd, req->r_flags,
1031 	     (unsigned long long)req->r_pgid.pool, req->r_pgid.seed);
1032 
1033 	/* fill in message content that changes each time we send it */
1034 	put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
1035 	put_unaligned_le32(req->r_flags, req->r_request_flags);
1036 	put_unaligned_le64(req->r_pgid.pool, req->r_request_pool);
1037 	p = req->r_request_pgid;
1038 	ceph_encode_64(&p, req->r_pgid.pool);
1039 	ceph_encode_32(&p, req->r_pgid.seed);
1040 	put_unaligned_le64(1, req->r_request_attempts);  /* FIXME */
1041 	memcpy(req->r_request_reassert_version, &req->r_reassert_version,
1042 	       sizeof(req->r_reassert_version));
1043 
1044 	req->r_stamp = jiffies;
1045 	list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1046 
1047 	ceph_msg_get(req->r_request); /* send consumes a ref */
1048 	ceph_con_send(&req->r_osd->o_con, req->r_request);
1049 	req->r_sent = req->r_osd->o_incarnation;
1050 }
1051 
1052 /*
1053  * Send any requests in the queue (req_unsent).
1054  */
1055 static void __send_queued(struct ceph_osd_client *osdc)
1056 {
1057 	struct ceph_osd_request *req, *tmp;
1058 
1059 	dout("__send_queued\n");
1060 	list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item)
1061 		__send_request(osdc, req);
1062 }
1063 
1064 /*
1065  * Timeout callback, called every N seconds when 1 or more osd
1066  * requests has been active for more than N seconds.  When this
1067  * happens, we ping all OSDs with requests who have timed out to
1068  * ensure any communications channel reset is detected.  Reset the
1069  * request timeouts another N seconds in the future as we go.
1070  * Reschedule the timeout event another N seconds in future (unless
1071  * there are no open requests).
1072  */
1073 static void handle_timeout(struct work_struct *work)
1074 {
1075 	struct ceph_osd_client *osdc =
1076 		container_of(work, struct ceph_osd_client, timeout_work.work);
1077 	struct ceph_osd_request *req;
1078 	struct ceph_osd *osd;
1079 	unsigned long keepalive =
1080 		osdc->client->options->osd_keepalive_timeout * HZ;
1081 	struct list_head slow_osds;
1082 	dout("timeout\n");
1083 	down_read(&osdc->map_sem);
1084 
1085 	ceph_monc_request_next_osdmap(&osdc->client->monc);
1086 
1087 	mutex_lock(&osdc->request_mutex);
1088 
1089 	/*
1090 	 * ping osds that are a bit slow.  this ensures that if there
1091 	 * is a break in the TCP connection we will notice, and reopen
1092 	 * a connection with that osd (from the fault callback).
1093 	 */
1094 	INIT_LIST_HEAD(&slow_osds);
1095 	list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1096 		if (time_before(jiffies, req->r_stamp + keepalive))
1097 			break;
1098 
1099 		osd = req->r_osd;
1100 		BUG_ON(!osd);
1101 		dout(" tid %llu is slow, will send keepalive on osd%d\n",
1102 		     req->r_tid, osd->o_osd);
1103 		list_move_tail(&osd->o_keepalive_item, &slow_osds);
1104 	}
1105 	while (!list_empty(&slow_osds)) {
1106 		osd = list_entry(slow_osds.next, struct ceph_osd,
1107 				 o_keepalive_item);
1108 		list_del_init(&osd->o_keepalive_item);
1109 		ceph_con_keepalive(&osd->o_con);
1110 	}
1111 
1112 	__schedule_osd_timeout(osdc);
1113 	__send_queued(osdc);
1114 	mutex_unlock(&osdc->request_mutex);
1115 	up_read(&osdc->map_sem);
1116 }
1117 
1118 static void handle_osds_timeout(struct work_struct *work)
1119 {
1120 	struct ceph_osd_client *osdc =
1121 		container_of(work, struct ceph_osd_client,
1122 			     osds_timeout_work.work);
1123 	unsigned long delay =
1124 		osdc->client->options->osd_idle_ttl * HZ >> 2;
1125 
1126 	dout("osds timeout\n");
1127 	down_read(&osdc->map_sem);
1128 	remove_old_osds(osdc);
1129 	up_read(&osdc->map_sem);
1130 
1131 	schedule_delayed_work(&osdc->osds_timeout_work,
1132 			      round_jiffies_relative(delay));
1133 }
1134 
1135 static void complete_request(struct ceph_osd_request *req)
1136 {
1137 	if (req->r_safe_callback)
1138 		req->r_safe_callback(req, NULL);
1139 	complete_all(&req->r_safe_completion);  /* fsync waiter */
1140 }
1141 
1142 static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid)
1143 {
1144 	__u8 v;
1145 
1146 	ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad);
1147 	v = ceph_decode_8(p);
1148 	if (v > 1) {
1149 		pr_warning("do not understand pg encoding %d > 1", v);
1150 		return -EINVAL;
1151 	}
1152 	pgid->pool = ceph_decode_64(p);
1153 	pgid->seed = ceph_decode_32(p);
1154 	*p += 4;
1155 	return 0;
1156 
1157 bad:
1158 	pr_warning("incomplete pg encoding");
1159 	return -EINVAL;
1160 }
1161 
1162 /*
1163  * handle osd op reply.  either call the callback if it is specified,
1164  * or do the completion to wake up the waiting thread.
1165  */
1166 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1167 			 struct ceph_connection *con)
1168 {
1169 	void *p, *end;
1170 	struct ceph_osd_request *req;
1171 	u64 tid;
1172 	int object_len;
1173 	int numops, payload_len, flags;
1174 	s32 result;
1175 	s32 retry_attempt;
1176 	struct ceph_pg pg;
1177 	int err;
1178 	u32 reassert_epoch;
1179 	u64 reassert_version;
1180 	u32 osdmap_epoch;
1181 	int i;
1182 
1183 	tid = le64_to_cpu(msg->hdr.tid);
1184 	dout("handle_reply %p tid %llu\n", msg, tid);
1185 
1186 	p = msg->front.iov_base;
1187 	end = p + msg->front.iov_len;
1188 
1189 	ceph_decode_need(&p, end, 4, bad);
1190 	object_len = ceph_decode_32(&p);
1191 	ceph_decode_need(&p, end, object_len, bad);
1192 	p += object_len;
1193 
1194 	err = __decode_pgid(&p, end, &pg);
1195 	if (err)
1196 		goto bad;
1197 
1198 	ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad);
1199 	flags = ceph_decode_64(&p);
1200 	result = ceph_decode_32(&p);
1201 	reassert_epoch = ceph_decode_32(&p);
1202 	reassert_version = ceph_decode_64(&p);
1203 	osdmap_epoch = ceph_decode_32(&p);
1204 
1205 	/* lookup */
1206 	mutex_lock(&osdc->request_mutex);
1207 	req = __lookup_request(osdc, tid);
1208 	if (req == NULL) {
1209 		dout("handle_reply tid %llu dne\n", tid);
1210 		mutex_unlock(&osdc->request_mutex);
1211 		return;
1212 	}
1213 	ceph_osdc_get_request(req);
1214 
1215 	dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
1216 	     req, result);
1217 
1218 	ceph_decode_need(&p, end, 4, bad);
1219 	numops = ceph_decode_32(&p);
1220 	if (numops > CEPH_OSD_MAX_OP)
1221 		goto bad_put;
1222 	if (numops != req->r_num_ops)
1223 		goto bad_put;
1224 	payload_len = 0;
1225 	ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
1226 	for (i = 0; i < numops; i++) {
1227 		struct ceph_osd_op *op = p;
1228 		int len;
1229 
1230 		len = le32_to_cpu(op->payload_len);
1231 		req->r_reply_op_len[i] = len;
1232 		dout(" op %d has %d bytes\n", i, len);
1233 		payload_len += len;
1234 		p += sizeof(*op);
1235 	}
1236 	if (payload_len != le32_to_cpu(msg->hdr.data_len)) {
1237 		pr_warning("sum of op payload lens %d != data_len %d",
1238 			   payload_len, le32_to_cpu(msg->hdr.data_len));
1239 		goto bad_put;
1240 	}
1241 
1242 	ceph_decode_need(&p, end, 4 + numops * 4, bad);
1243 	retry_attempt = ceph_decode_32(&p);
1244 	for (i = 0; i < numops; i++)
1245 		req->r_reply_op_result[i] = ceph_decode_32(&p);
1246 
1247 	/*
1248 	 * if this connection filled our message, drop our reference now, to
1249 	 * avoid a (safe but slower) revoke later.
1250 	 */
1251 	if (req->r_con_filling_msg == con && req->r_reply == msg) {
1252 		dout(" dropping con_filling_msg ref %p\n", con);
1253 		req->r_con_filling_msg = NULL;
1254 		con->ops->put(con);
1255 	}
1256 
1257 	if (!req->r_got_reply) {
1258 		unsigned int bytes;
1259 
1260 		req->r_result = result;
1261 		bytes = le32_to_cpu(msg->hdr.data_len);
1262 		dout("handle_reply result %d bytes %d\n", req->r_result,
1263 		     bytes);
1264 		if (req->r_result == 0)
1265 			req->r_result = bytes;
1266 
1267 		/* in case this is a write and we need to replay, */
1268 		req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch);
1269 		req->r_reassert_version.version = cpu_to_le64(reassert_version);
1270 
1271 		req->r_got_reply = 1;
1272 	} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1273 		dout("handle_reply tid %llu dup ack\n", tid);
1274 		mutex_unlock(&osdc->request_mutex);
1275 		goto done;
1276 	}
1277 
1278 	dout("handle_reply tid %llu flags %d\n", tid, flags);
1279 
1280 	if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1281 		__register_linger_request(osdc, req);
1282 
1283 	/* either this is a read, or we got the safe response */
1284 	if (result < 0 ||
1285 	    (flags & CEPH_OSD_FLAG_ONDISK) ||
1286 	    ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1287 		__unregister_request(osdc, req);
1288 
1289 	mutex_unlock(&osdc->request_mutex);
1290 
1291 	if (req->r_callback)
1292 		req->r_callback(req, msg);
1293 	else
1294 		complete_all(&req->r_completion);
1295 
1296 	if (flags & CEPH_OSD_FLAG_ONDISK)
1297 		complete_request(req);
1298 
1299 done:
1300 	dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1301 	ceph_osdc_put_request(req);
1302 	return;
1303 
1304 bad_put:
1305 	ceph_osdc_put_request(req);
1306 bad:
1307 	pr_err("corrupt osd_op_reply got %d %d\n",
1308 	       (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
1309 	ceph_msg_dump(msg);
1310 }
1311 
1312 static void reset_changed_osds(struct ceph_osd_client *osdc)
1313 {
1314 	struct rb_node *p, *n;
1315 
1316 	for (p = rb_first(&osdc->osds); p; p = n) {
1317 		struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1318 
1319 		n = rb_next(p);
1320 		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1321 		    memcmp(&osd->o_con.peer_addr,
1322 			   ceph_osd_addr(osdc->osdmap,
1323 					 osd->o_osd),
1324 			   sizeof(struct ceph_entity_addr)) != 0)
1325 			__reset_osd(osdc, osd);
1326 	}
1327 }
1328 
1329 /*
1330  * Requeue requests whose mapping to an OSD has changed.  If requests map to
1331  * no osd, request a new map.
1332  *
1333  * Caller should hold map_sem for read.
1334  */
1335 static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1336 {
1337 	struct ceph_osd_request *req, *nreq;
1338 	struct rb_node *p;
1339 	int needmap = 0;
1340 	int err;
1341 
1342 	dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1343 	mutex_lock(&osdc->request_mutex);
1344 	for (p = rb_first(&osdc->requests); p; ) {
1345 		req = rb_entry(p, struct ceph_osd_request, r_node);
1346 		p = rb_next(p);
1347 
1348 		/*
1349 		 * For linger requests that have not yet been
1350 		 * registered, move them to the linger list; they'll
1351 		 * be sent to the osd in the loop below.  Unregister
1352 		 * the request before re-registering it as a linger
1353 		 * request to ensure the __map_request() below
1354 		 * will decide it needs to be sent.
1355 		 */
1356 		if (req->r_linger && list_empty(&req->r_linger_item)) {
1357 			dout("%p tid %llu restart on osd%d\n",
1358 			     req, req->r_tid,
1359 			     req->r_osd ? req->r_osd->o_osd : -1);
1360 			__unregister_request(osdc, req);
1361 			__register_linger_request(osdc, req);
1362 			continue;
1363 		}
1364 
1365 		err = __map_request(osdc, req, force_resend);
1366 		if (err < 0)
1367 			continue;  /* error */
1368 		if (req->r_osd == NULL) {
1369 			dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1370 			needmap++;  /* request a newer map */
1371 		} else if (err > 0) {
1372 			if (!req->r_linger) {
1373 				dout("%p tid %llu requeued on osd%d\n", req,
1374 				     req->r_tid,
1375 				     req->r_osd ? req->r_osd->o_osd : -1);
1376 				req->r_flags |= CEPH_OSD_FLAG_RETRY;
1377 			}
1378 		}
1379 	}
1380 
1381 	list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1382 				 r_linger_item) {
1383 		dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1384 
1385 		err = __map_request(osdc, req, force_resend);
1386 		dout("__map_request returned %d\n", err);
1387 		if (err == 0)
1388 			continue;  /* no change and no osd was specified */
1389 		if (err < 0)
1390 			continue;  /* hrm! */
1391 		if (req->r_osd == NULL) {
1392 			dout("tid %llu maps to no valid osd\n", req->r_tid);
1393 			needmap++;  /* request a newer map */
1394 			continue;
1395 		}
1396 
1397 		dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1398 		     req->r_osd ? req->r_osd->o_osd : -1);
1399 		__register_request(osdc, req);
1400 		__unregister_linger_request(osdc, req);
1401 	}
1402 	mutex_unlock(&osdc->request_mutex);
1403 
1404 	if (needmap) {
1405 		dout("%d requests for down osds, need new map\n", needmap);
1406 		ceph_monc_request_next_osdmap(&osdc->client->monc);
1407 	}
1408 	reset_changed_osds(osdc);
1409 }
1410 
1411 
1412 /*
1413  * Process updated osd map.
1414  *
1415  * The message contains any number of incremental and full maps, normally
1416  * indicating some sort of topology change in the cluster.  Kick requests
1417  * off to different OSDs as needed.
1418  */
1419 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1420 {
1421 	void *p, *end, *next;
1422 	u32 nr_maps, maplen;
1423 	u32 epoch;
1424 	struct ceph_osdmap *newmap = NULL, *oldmap;
1425 	int err;
1426 	struct ceph_fsid fsid;
1427 
1428 	dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1429 	p = msg->front.iov_base;
1430 	end = p + msg->front.iov_len;
1431 
1432 	/* verify fsid */
1433 	ceph_decode_need(&p, end, sizeof(fsid), bad);
1434 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
1435 	if (ceph_check_fsid(osdc->client, &fsid) < 0)
1436 		return;
1437 
1438 	down_write(&osdc->map_sem);
1439 
1440 	/* incremental maps */
1441 	ceph_decode_32_safe(&p, end, nr_maps, bad);
1442 	dout(" %d inc maps\n", nr_maps);
1443 	while (nr_maps > 0) {
1444 		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1445 		epoch = ceph_decode_32(&p);
1446 		maplen = ceph_decode_32(&p);
1447 		ceph_decode_need(&p, end, maplen, bad);
1448 		next = p + maplen;
1449 		if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1450 			dout("applying incremental map %u len %d\n",
1451 			     epoch, maplen);
1452 			newmap = osdmap_apply_incremental(&p, next,
1453 							  osdc->osdmap,
1454 							  &osdc->client->msgr);
1455 			if (IS_ERR(newmap)) {
1456 				err = PTR_ERR(newmap);
1457 				goto bad;
1458 			}
1459 			BUG_ON(!newmap);
1460 			if (newmap != osdc->osdmap) {
1461 				ceph_osdmap_destroy(osdc->osdmap);
1462 				osdc->osdmap = newmap;
1463 			}
1464 			kick_requests(osdc, 0);
1465 		} else {
1466 			dout("ignoring incremental map %u len %d\n",
1467 			     epoch, maplen);
1468 		}
1469 		p = next;
1470 		nr_maps--;
1471 	}
1472 	if (newmap)
1473 		goto done;
1474 
1475 	/* full maps */
1476 	ceph_decode_32_safe(&p, end, nr_maps, bad);
1477 	dout(" %d full maps\n", nr_maps);
1478 	while (nr_maps) {
1479 		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1480 		epoch = ceph_decode_32(&p);
1481 		maplen = ceph_decode_32(&p);
1482 		ceph_decode_need(&p, end, maplen, bad);
1483 		if (nr_maps > 1) {
1484 			dout("skipping non-latest full map %u len %d\n",
1485 			     epoch, maplen);
1486 		} else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1487 			dout("skipping full map %u len %d, "
1488 			     "older than our %u\n", epoch, maplen,
1489 			     osdc->osdmap->epoch);
1490 		} else {
1491 			int skipped_map = 0;
1492 
1493 			dout("taking full map %u len %d\n", epoch, maplen);
1494 			newmap = osdmap_decode(&p, p+maplen);
1495 			if (IS_ERR(newmap)) {
1496 				err = PTR_ERR(newmap);
1497 				goto bad;
1498 			}
1499 			BUG_ON(!newmap);
1500 			oldmap = osdc->osdmap;
1501 			osdc->osdmap = newmap;
1502 			if (oldmap) {
1503 				if (oldmap->epoch + 1 < newmap->epoch)
1504 					skipped_map = 1;
1505 				ceph_osdmap_destroy(oldmap);
1506 			}
1507 			kick_requests(osdc, skipped_map);
1508 		}
1509 		p += maplen;
1510 		nr_maps--;
1511 	}
1512 
1513 done:
1514 	downgrade_write(&osdc->map_sem);
1515 	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1516 
1517 	/*
1518 	 * subscribe to subsequent osdmap updates if full to ensure
1519 	 * we find out when we are no longer full and stop returning
1520 	 * ENOSPC.
1521 	 */
1522 	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1523 		ceph_monc_request_next_osdmap(&osdc->client->monc);
1524 
1525 	mutex_lock(&osdc->request_mutex);
1526 	__send_queued(osdc);
1527 	mutex_unlock(&osdc->request_mutex);
1528 	up_read(&osdc->map_sem);
1529 	wake_up_all(&osdc->client->auth_wq);
1530 	return;
1531 
1532 bad:
1533 	pr_err("osdc handle_map corrupt msg\n");
1534 	ceph_msg_dump(msg);
1535 	up_write(&osdc->map_sem);
1536 	return;
1537 }
1538 
1539 /*
1540  * watch/notify callback event infrastructure
1541  *
1542  * These callbacks are used both for watch and notify operations.
1543  */
1544 static void __release_event(struct kref *kref)
1545 {
1546 	struct ceph_osd_event *event =
1547 		container_of(kref, struct ceph_osd_event, kref);
1548 
1549 	dout("__release_event %p\n", event);
1550 	kfree(event);
1551 }
1552 
1553 static void get_event(struct ceph_osd_event *event)
1554 {
1555 	kref_get(&event->kref);
1556 }
1557 
1558 void ceph_osdc_put_event(struct ceph_osd_event *event)
1559 {
1560 	kref_put(&event->kref, __release_event);
1561 }
1562 EXPORT_SYMBOL(ceph_osdc_put_event);
1563 
1564 static void __insert_event(struct ceph_osd_client *osdc,
1565 			     struct ceph_osd_event *new)
1566 {
1567 	struct rb_node **p = &osdc->event_tree.rb_node;
1568 	struct rb_node *parent = NULL;
1569 	struct ceph_osd_event *event = NULL;
1570 
1571 	while (*p) {
1572 		parent = *p;
1573 		event = rb_entry(parent, struct ceph_osd_event, node);
1574 		if (new->cookie < event->cookie)
1575 			p = &(*p)->rb_left;
1576 		else if (new->cookie > event->cookie)
1577 			p = &(*p)->rb_right;
1578 		else
1579 			BUG();
1580 	}
1581 
1582 	rb_link_node(&new->node, parent, p);
1583 	rb_insert_color(&new->node, &osdc->event_tree);
1584 }
1585 
1586 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1587 					        u64 cookie)
1588 {
1589 	struct rb_node **p = &osdc->event_tree.rb_node;
1590 	struct rb_node *parent = NULL;
1591 	struct ceph_osd_event *event = NULL;
1592 
1593 	while (*p) {
1594 		parent = *p;
1595 		event = rb_entry(parent, struct ceph_osd_event, node);
1596 		if (cookie < event->cookie)
1597 			p = &(*p)->rb_left;
1598 		else if (cookie > event->cookie)
1599 			p = &(*p)->rb_right;
1600 		else
1601 			return event;
1602 	}
1603 	return NULL;
1604 }
1605 
1606 static void __remove_event(struct ceph_osd_event *event)
1607 {
1608 	struct ceph_osd_client *osdc = event->osdc;
1609 
1610 	if (!RB_EMPTY_NODE(&event->node)) {
1611 		dout("__remove_event removed %p\n", event);
1612 		rb_erase(&event->node, &osdc->event_tree);
1613 		ceph_osdc_put_event(event);
1614 	} else {
1615 		dout("__remove_event didn't remove %p\n", event);
1616 	}
1617 }
1618 
1619 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1620 			   void (*event_cb)(u64, u64, u8, void *),
1621 			   void *data, struct ceph_osd_event **pevent)
1622 {
1623 	struct ceph_osd_event *event;
1624 
1625 	event = kmalloc(sizeof(*event), GFP_NOIO);
1626 	if (!event)
1627 		return -ENOMEM;
1628 
1629 	dout("create_event %p\n", event);
1630 	event->cb = event_cb;
1631 	event->one_shot = 0;
1632 	event->data = data;
1633 	event->osdc = osdc;
1634 	INIT_LIST_HEAD(&event->osd_node);
1635 	RB_CLEAR_NODE(&event->node);
1636 	kref_init(&event->kref);   /* one ref for us */
1637 	kref_get(&event->kref);    /* one ref for the caller */
1638 
1639 	spin_lock(&osdc->event_lock);
1640 	event->cookie = ++osdc->event_count;
1641 	__insert_event(osdc, event);
1642 	spin_unlock(&osdc->event_lock);
1643 
1644 	*pevent = event;
1645 	return 0;
1646 }
1647 EXPORT_SYMBOL(ceph_osdc_create_event);
1648 
1649 void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1650 {
1651 	struct ceph_osd_client *osdc = event->osdc;
1652 
1653 	dout("cancel_event %p\n", event);
1654 	spin_lock(&osdc->event_lock);
1655 	__remove_event(event);
1656 	spin_unlock(&osdc->event_lock);
1657 	ceph_osdc_put_event(event); /* caller's */
1658 }
1659 EXPORT_SYMBOL(ceph_osdc_cancel_event);
1660 
1661 
1662 static void do_event_work(struct work_struct *work)
1663 {
1664 	struct ceph_osd_event_work *event_work =
1665 		container_of(work, struct ceph_osd_event_work, work);
1666 	struct ceph_osd_event *event = event_work->event;
1667 	u64 ver = event_work->ver;
1668 	u64 notify_id = event_work->notify_id;
1669 	u8 opcode = event_work->opcode;
1670 
1671 	dout("do_event_work completing %p\n", event);
1672 	event->cb(ver, notify_id, opcode, event->data);
1673 	dout("do_event_work completed %p\n", event);
1674 	ceph_osdc_put_event(event);
1675 	kfree(event_work);
1676 }
1677 
1678 
1679 /*
1680  * Process osd watch notifications
1681  */
1682 static void handle_watch_notify(struct ceph_osd_client *osdc,
1683 				struct ceph_msg *msg)
1684 {
1685 	void *p, *end;
1686 	u8 proto_ver;
1687 	u64 cookie, ver, notify_id;
1688 	u8 opcode;
1689 	struct ceph_osd_event *event;
1690 	struct ceph_osd_event_work *event_work;
1691 
1692 	p = msg->front.iov_base;
1693 	end = p + msg->front.iov_len;
1694 
1695 	ceph_decode_8_safe(&p, end, proto_ver, bad);
1696 	ceph_decode_8_safe(&p, end, opcode, bad);
1697 	ceph_decode_64_safe(&p, end, cookie, bad);
1698 	ceph_decode_64_safe(&p, end, ver, bad);
1699 	ceph_decode_64_safe(&p, end, notify_id, bad);
1700 
1701 	spin_lock(&osdc->event_lock);
1702 	event = __find_event(osdc, cookie);
1703 	if (event) {
1704 		BUG_ON(event->one_shot);
1705 		get_event(event);
1706 	}
1707 	spin_unlock(&osdc->event_lock);
1708 	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1709 	     cookie, ver, event);
1710 	if (event) {
1711 		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1712 		if (!event_work) {
1713 			dout("ERROR: could not allocate event_work\n");
1714 			goto done_err;
1715 		}
1716 		INIT_WORK(&event_work->work, do_event_work);
1717 		event_work->event = event;
1718 		event_work->ver = ver;
1719 		event_work->notify_id = notify_id;
1720 		event_work->opcode = opcode;
1721 		if (!queue_work(osdc->notify_wq, &event_work->work)) {
1722 			dout("WARNING: failed to queue notify event work\n");
1723 			goto done_err;
1724 		}
1725 	}
1726 
1727 	return;
1728 
1729 done_err:
1730 	ceph_osdc_put_event(event);
1731 	return;
1732 
1733 bad:
1734 	pr_err("osdc handle_watch_notify corrupt msg\n");
1735 	return;
1736 }
1737 
1738 /*
1739  * Register request, send initial attempt.
1740  */
1741 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1742 			    struct ceph_osd_request *req,
1743 			    bool nofail)
1744 {
1745 	int rc = 0;
1746 
1747 	req->r_request->pages = req->r_pages;
1748 	req->r_request->nr_pages = req->r_num_pages;
1749 #ifdef CONFIG_BLOCK
1750 	req->r_request->bio = req->r_bio;
1751 #endif
1752 	req->r_request->trail = &req->r_trail;
1753 
1754 	register_request(osdc, req);
1755 
1756 	down_read(&osdc->map_sem);
1757 	mutex_lock(&osdc->request_mutex);
1758 	/*
1759 	 * a racing kick_requests() may have sent the message for us
1760 	 * while we dropped request_mutex above, so only send now if
1761 	 * the request still han't been touched yet.
1762 	 */
1763 	if (req->r_sent == 0) {
1764 		rc = __map_request(osdc, req, 0);
1765 		if (rc < 0) {
1766 			if (nofail) {
1767 				dout("osdc_start_request failed map, "
1768 				     " will retry %lld\n", req->r_tid);
1769 				rc = 0;
1770 			}
1771 			goto out_unlock;
1772 		}
1773 		if (req->r_osd == NULL) {
1774 			dout("send_request %p no up osds in pg\n", req);
1775 			ceph_monc_request_next_osdmap(&osdc->client->monc);
1776 		} else {
1777 			__send_request(osdc, req);
1778 		}
1779 		rc = 0;
1780 	}
1781 
1782 out_unlock:
1783 	mutex_unlock(&osdc->request_mutex);
1784 	up_read(&osdc->map_sem);
1785 	return rc;
1786 }
1787 EXPORT_SYMBOL(ceph_osdc_start_request);
1788 
1789 /*
1790  * wait for a request to complete
1791  */
1792 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1793 			   struct ceph_osd_request *req)
1794 {
1795 	int rc;
1796 
1797 	rc = wait_for_completion_interruptible(&req->r_completion);
1798 	if (rc < 0) {
1799 		mutex_lock(&osdc->request_mutex);
1800 		__cancel_request(req);
1801 		__unregister_request(osdc, req);
1802 		mutex_unlock(&osdc->request_mutex);
1803 		complete_request(req);
1804 		dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1805 		return rc;
1806 	}
1807 
1808 	dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1809 	return req->r_result;
1810 }
1811 EXPORT_SYMBOL(ceph_osdc_wait_request);
1812 
1813 /*
1814  * sync - wait for all in-flight requests to flush.  avoid starvation.
1815  */
1816 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1817 {
1818 	struct ceph_osd_request *req;
1819 	u64 last_tid, next_tid = 0;
1820 
1821 	mutex_lock(&osdc->request_mutex);
1822 	last_tid = osdc->last_tid;
1823 	while (1) {
1824 		req = __lookup_request_ge(osdc, next_tid);
1825 		if (!req)
1826 			break;
1827 		if (req->r_tid > last_tid)
1828 			break;
1829 
1830 		next_tid = req->r_tid + 1;
1831 		if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1832 			continue;
1833 
1834 		ceph_osdc_get_request(req);
1835 		mutex_unlock(&osdc->request_mutex);
1836 		dout("sync waiting on tid %llu (last is %llu)\n",
1837 		     req->r_tid, last_tid);
1838 		wait_for_completion(&req->r_safe_completion);
1839 		mutex_lock(&osdc->request_mutex);
1840 		ceph_osdc_put_request(req);
1841 	}
1842 	mutex_unlock(&osdc->request_mutex);
1843 	dout("sync done (thru tid %llu)\n", last_tid);
1844 }
1845 EXPORT_SYMBOL(ceph_osdc_sync);
1846 
1847 /*
1848  * init, shutdown
1849  */
1850 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1851 {
1852 	int err;
1853 
1854 	dout("init\n");
1855 	osdc->client = client;
1856 	osdc->osdmap = NULL;
1857 	init_rwsem(&osdc->map_sem);
1858 	init_completion(&osdc->map_waiters);
1859 	osdc->last_requested_map = 0;
1860 	mutex_init(&osdc->request_mutex);
1861 	osdc->last_tid = 0;
1862 	osdc->osds = RB_ROOT;
1863 	INIT_LIST_HEAD(&osdc->osd_lru);
1864 	osdc->requests = RB_ROOT;
1865 	INIT_LIST_HEAD(&osdc->req_lru);
1866 	INIT_LIST_HEAD(&osdc->req_unsent);
1867 	INIT_LIST_HEAD(&osdc->req_notarget);
1868 	INIT_LIST_HEAD(&osdc->req_linger);
1869 	osdc->num_requests = 0;
1870 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1871 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1872 	spin_lock_init(&osdc->event_lock);
1873 	osdc->event_tree = RB_ROOT;
1874 	osdc->event_count = 0;
1875 
1876 	schedule_delayed_work(&osdc->osds_timeout_work,
1877 	   round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1878 
1879 	err = -ENOMEM;
1880 	osdc->req_mempool = mempool_create_kmalloc_pool(10,
1881 					sizeof(struct ceph_osd_request));
1882 	if (!osdc->req_mempool)
1883 		goto out;
1884 
1885 	err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
1886 				OSD_OP_FRONT_LEN, 10, true,
1887 				"osd_op");
1888 	if (err < 0)
1889 		goto out_mempool;
1890 	err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
1891 				OSD_OPREPLY_FRONT_LEN, 10, true,
1892 				"osd_op_reply");
1893 	if (err < 0)
1894 		goto out_msgpool;
1895 
1896 	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1897 	if (IS_ERR(osdc->notify_wq)) {
1898 		err = PTR_ERR(osdc->notify_wq);
1899 		osdc->notify_wq = NULL;
1900 		goto out_msgpool;
1901 	}
1902 	return 0;
1903 
1904 out_msgpool:
1905 	ceph_msgpool_destroy(&osdc->msgpool_op);
1906 out_mempool:
1907 	mempool_destroy(osdc->req_mempool);
1908 out:
1909 	return err;
1910 }
1911 
1912 void ceph_osdc_stop(struct ceph_osd_client *osdc)
1913 {
1914 	flush_workqueue(osdc->notify_wq);
1915 	destroy_workqueue(osdc->notify_wq);
1916 	cancel_delayed_work_sync(&osdc->timeout_work);
1917 	cancel_delayed_work_sync(&osdc->osds_timeout_work);
1918 	if (osdc->osdmap) {
1919 		ceph_osdmap_destroy(osdc->osdmap);
1920 		osdc->osdmap = NULL;
1921 	}
1922 	remove_all_osds(osdc);
1923 	mempool_destroy(osdc->req_mempool);
1924 	ceph_msgpool_destroy(&osdc->msgpool_op);
1925 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1926 }
1927 
1928 /*
1929  * Read some contiguous pages.  If we cross a stripe boundary, shorten
1930  * *plen.  Return number of bytes read, or error.
1931  */
1932 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1933 			struct ceph_vino vino, struct ceph_file_layout *layout,
1934 			u64 off, u64 *plen,
1935 			u32 truncate_seq, u64 truncate_size,
1936 			struct page **pages, int num_pages, int page_align)
1937 {
1938 	struct ceph_osd_request *req;
1939 	int rc = 0;
1940 
1941 	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1942 	     vino.snap, off, *plen);
1943 	req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1944 				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1945 				    NULL, 0, truncate_seq, truncate_size, NULL,
1946 				    false, page_align);
1947 	if (IS_ERR(req))
1948 		return PTR_ERR(req);
1949 
1950 	/* it may be a short read due to an object boundary */
1951 	req->r_pages = pages;
1952 
1953 	dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
1954 	     off, *plen, req->r_num_pages, page_align);
1955 
1956 	rc = ceph_osdc_start_request(osdc, req, false);
1957 	if (!rc)
1958 		rc = ceph_osdc_wait_request(osdc, req);
1959 
1960 	ceph_osdc_put_request(req);
1961 	dout("readpages result %d\n", rc);
1962 	return rc;
1963 }
1964 EXPORT_SYMBOL(ceph_osdc_readpages);
1965 
1966 /*
1967  * do a synchronous write on N pages
1968  */
1969 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1970 			 struct ceph_file_layout *layout,
1971 			 struct ceph_snap_context *snapc,
1972 			 u64 off, u64 len,
1973 			 u32 truncate_seq, u64 truncate_size,
1974 			 struct timespec *mtime,
1975 			 struct page **pages, int num_pages)
1976 {
1977 	struct ceph_osd_request *req;
1978 	int rc = 0;
1979 	int page_align = off & ~PAGE_MASK;
1980 
1981 	BUG_ON(vino.snap != CEPH_NOSNAP);
1982 	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1983 				    CEPH_OSD_OP_WRITE,
1984 				    CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE,
1985 				    snapc, 0,
1986 				    truncate_seq, truncate_size, mtime,
1987 				    true, page_align);
1988 	if (IS_ERR(req))
1989 		return PTR_ERR(req);
1990 
1991 	/* it may be a short write due to an object boundary */
1992 	req->r_pages = pages;
1993 	dout("writepages %llu~%llu (%d pages)\n", off, len,
1994 	     req->r_num_pages);
1995 
1996 	rc = ceph_osdc_start_request(osdc, req, true);
1997 	if (!rc)
1998 		rc = ceph_osdc_wait_request(osdc, req);
1999 
2000 	ceph_osdc_put_request(req);
2001 	if (rc == 0)
2002 		rc = len;
2003 	dout("writepages result %d\n", rc);
2004 	return rc;
2005 }
2006 EXPORT_SYMBOL(ceph_osdc_writepages);
2007 
2008 /*
2009  * handle incoming message
2010  */
2011 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
2012 {
2013 	struct ceph_osd *osd = con->private;
2014 	struct ceph_osd_client *osdc;
2015 	int type = le16_to_cpu(msg->hdr.type);
2016 
2017 	if (!osd)
2018 		goto out;
2019 	osdc = osd->o_osdc;
2020 
2021 	switch (type) {
2022 	case CEPH_MSG_OSD_MAP:
2023 		ceph_osdc_handle_map(osdc, msg);
2024 		break;
2025 	case CEPH_MSG_OSD_OPREPLY:
2026 		handle_reply(osdc, msg, con);
2027 		break;
2028 	case CEPH_MSG_WATCH_NOTIFY:
2029 		handle_watch_notify(osdc, msg);
2030 		break;
2031 
2032 	default:
2033 		pr_err("received unknown message type %d %s\n", type,
2034 		       ceph_msg_type_name(type));
2035 	}
2036 out:
2037 	ceph_msg_put(msg);
2038 }
2039 
2040 /*
2041  * lookup and return message for incoming reply.  set up reply message
2042  * pages.
2043  */
2044 static struct ceph_msg *get_reply(struct ceph_connection *con,
2045 				  struct ceph_msg_header *hdr,
2046 				  int *skip)
2047 {
2048 	struct ceph_osd *osd = con->private;
2049 	struct ceph_osd_client *osdc = osd->o_osdc;
2050 	struct ceph_msg *m;
2051 	struct ceph_osd_request *req;
2052 	int front = le32_to_cpu(hdr->front_len);
2053 	int data_len = le32_to_cpu(hdr->data_len);
2054 	u64 tid;
2055 
2056 	tid = le64_to_cpu(hdr->tid);
2057 	mutex_lock(&osdc->request_mutex);
2058 	req = __lookup_request(osdc, tid);
2059 	if (!req) {
2060 		*skip = 1;
2061 		m = NULL;
2062 		dout("get_reply unknown tid %llu from osd%d\n", tid,
2063 		     osd->o_osd);
2064 		goto out;
2065 	}
2066 
2067 	if (req->r_con_filling_msg) {
2068 		dout("%s revoking msg %p from old con %p\n", __func__,
2069 		     req->r_reply, req->r_con_filling_msg);
2070 		ceph_msg_revoke_incoming(req->r_reply);
2071 		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
2072 		req->r_con_filling_msg = NULL;
2073 	}
2074 
2075 	if (front > req->r_reply->front.iov_len) {
2076 		pr_warning("get_reply front %d > preallocated %d\n",
2077 			   front, (int)req->r_reply->front.iov_len);
2078 		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
2079 		if (!m)
2080 			goto out;
2081 		ceph_msg_put(req->r_reply);
2082 		req->r_reply = m;
2083 	}
2084 	m = ceph_msg_get(req->r_reply);
2085 
2086 	if (data_len > 0) {
2087 		int want = calc_pages_for(req->r_page_alignment, data_len);
2088 
2089 		if (req->r_pages && unlikely(req->r_num_pages < want)) {
2090 			pr_warning("tid %lld reply has %d bytes %d pages, we"
2091 				   " had only %d pages ready\n", tid, data_len,
2092 				   want, req->r_num_pages);
2093 			*skip = 1;
2094 			ceph_msg_put(m);
2095 			m = NULL;
2096 			goto out;
2097 		}
2098 		m->pages = req->r_pages;
2099 		m->nr_pages = req->r_num_pages;
2100 		m->page_alignment = req->r_page_alignment;
2101 #ifdef CONFIG_BLOCK
2102 		m->bio = req->r_bio;
2103 #endif
2104 	}
2105 	*skip = 0;
2106 	req->r_con_filling_msg = con->ops->get(con);
2107 	dout("get_reply tid %lld %p\n", tid, m);
2108 
2109 out:
2110 	mutex_unlock(&osdc->request_mutex);
2111 	return m;
2112 
2113 }
2114 
2115 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2116 				  struct ceph_msg_header *hdr,
2117 				  int *skip)
2118 {
2119 	struct ceph_osd *osd = con->private;
2120 	int type = le16_to_cpu(hdr->type);
2121 	int front = le32_to_cpu(hdr->front_len);
2122 
2123 	*skip = 0;
2124 	switch (type) {
2125 	case CEPH_MSG_OSD_MAP:
2126 	case CEPH_MSG_WATCH_NOTIFY:
2127 		return ceph_msg_new(type, front, GFP_NOFS, false);
2128 	case CEPH_MSG_OSD_OPREPLY:
2129 		return get_reply(con, hdr, skip);
2130 	default:
2131 		pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2132 			osd->o_osd);
2133 		*skip = 1;
2134 		return NULL;
2135 	}
2136 }
2137 
2138 /*
2139  * Wrappers to refcount containing ceph_osd struct
2140  */
2141 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2142 {
2143 	struct ceph_osd *osd = con->private;
2144 	if (get_osd(osd))
2145 		return con;
2146 	return NULL;
2147 }
2148 
2149 static void put_osd_con(struct ceph_connection *con)
2150 {
2151 	struct ceph_osd *osd = con->private;
2152 	put_osd(osd);
2153 }
2154 
2155 /*
2156  * authentication
2157  */
2158 /*
2159  * Note: returned pointer is the address of a structure that's
2160  * managed separately.  Caller must *not* attempt to free it.
2161  */
2162 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
2163 					int *proto, int force_new)
2164 {
2165 	struct ceph_osd *o = con->private;
2166 	struct ceph_osd_client *osdc = o->o_osdc;
2167 	struct ceph_auth_client *ac = osdc->client->monc.auth;
2168 	struct ceph_auth_handshake *auth = &o->o_auth;
2169 
2170 	if (force_new && auth->authorizer) {
2171 		if (ac->ops && ac->ops->destroy_authorizer)
2172 			ac->ops->destroy_authorizer(ac, auth->authorizer);
2173 		auth->authorizer = NULL;
2174 	}
2175 	if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) {
2176 		int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
2177 							auth);
2178 		if (ret)
2179 			return ERR_PTR(ret);
2180 	}
2181 	*proto = ac->protocol;
2182 
2183 	return auth;
2184 }
2185 
2186 
2187 static int verify_authorizer_reply(struct ceph_connection *con, int len)
2188 {
2189 	struct ceph_osd *o = con->private;
2190 	struct ceph_osd_client *osdc = o->o_osdc;
2191 	struct ceph_auth_client *ac = osdc->client->monc.auth;
2192 
2193 	/*
2194 	 * XXX If ac->ops or ac->ops->verify_authorizer_reply is null,
2195 	 * XXX which do we do:  succeed or fail?
2196 	 */
2197 	return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len);
2198 }
2199 
2200 static int invalidate_authorizer(struct ceph_connection *con)
2201 {
2202 	struct ceph_osd *o = con->private;
2203 	struct ceph_osd_client *osdc = o->o_osdc;
2204 	struct ceph_auth_client *ac = osdc->client->monc.auth;
2205 
2206 	if (ac->ops && ac->ops->invalidate_authorizer)
2207 		ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2208 
2209 	return ceph_monc_validate_auth(&osdc->client->monc);
2210 }
2211 
2212 static const struct ceph_connection_operations osd_con_ops = {
2213 	.get = get_osd_con,
2214 	.put = put_osd_con,
2215 	.dispatch = dispatch,
2216 	.get_authorizer = get_authorizer,
2217 	.verify_authorizer_reply = verify_authorizer_reply,
2218 	.invalidate_authorizer = invalidate_authorizer,
2219 	.alloc_msg = alloc_msg,
2220 	.fault = osd_reset,
2221 };
2222