xref: /openbmc/linux/net/ceph/osd_client.c (revision 6f6c7006)
1 #include <linux/ceph/ceph_debug.h>
2 
3 #include <linux/module.h>
4 #include <linux/err.h>
5 #include <linux/highmem.h>
6 #include <linux/mm.h>
7 #include <linux/pagemap.h>
8 #include <linux/slab.h>
9 #include <linux/uaccess.h>
10 #ifdef CONFIG_BLOCK
11 #include <linux/bio.h>
12 #endif
13 
14 #include <linux/ceph/libceph.h>
15 #include <linux/ceph/osd_client.h>
16 #include <linux/ceph/messenger.h>
17 #include <linux/ceph/decode.h>
18 #include <linux/ceph/auth.h>
19 #include <linux/ceph/pagelist.h>
20 
21 #define OSD_OP_FRONT_LEN	4096
22 #define OSD_OPREPLY_FRONT_LEN	512
23 
24 static const struct ceph_connection_operations osd_con_ops;
25 
26 static void send_queued(struct ceph_osd_client *osdc);
27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28 
29 static int op_needs_trail(int op)
30 {
31 	switch (op) {
32 	case CEPH_OSD_OP_GETXATTR:
33 	case CEPH_OSD_OP_SETXATTR:
34 	case CEPH_OSD_OP_CMPXATTR:
35 	case CEPH_OSD_OP_CALL:
36 		return 1;
37 	default:
38 		return 0;
39 	}
40 }
41 
42 static int op_has_extent(int op)
43 {
44 	return (op == CEPH_OSD_OP_READ ||
45 		op == CEPH_OSD_OP_WRITE);
46 }
47 
48 void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
49 			struct ceph_file_layout *layout,
50 			u64 snapid,
51 			u64 off, u64 *plen, u64 *bno,
52 			struct ceph_osd_request *req,
53 			struct ceph_osd_req_op *op)
54 {
55 	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
56 	u64 orig_len = *plen;
57 	u64 objoff, objlen;    /* extent in object */
58 
59 	reqhead->snapid = cpu_to_le64(snapid);
60 
61 	/* object extent? */
62 	ceph_calc_file_object_mapping(layout, off, plen, bno,
63 				      &objoff, &objlen);
64 	if (*plen < orig_len)
65 		dout(" skipping last %llu, final file extent %llu~%llu\n",
66 		     orig_len - *plen, off, *plen);
67 
68 	if (op_has_extent(op->op)) {
69 		op->extent.offset = objoff;
70 		op->extent.length = objlen;
71 	}
72 	req->r_num_pages = calc_pages_for(off, *plen);
73 	req->r_page_alignment = off & ~PAGE_MASK;
74 	if (op->op == CEPH_OSD_OP_WRITE)
75 		op->payload_len = *plen;
76 
77 	dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
78 	     *bno, objoff, objlen, req->r_num_pages);
79 
80 }
81 EXPORT_SYMBOL(ceph_calc_raw_layout);
82 
83 /*
84  * Implement client access to distributed object storage cluster.
85  *
86  * All data objects are stored within a cluster/cloud of OSDs, or
87  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
88  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
89  * remote daemons serving up and coordinating consistent and safe
90  * access to storage.
91  *
92  * Cluster membership and the mapping of data objects onto storage devices
93  * are described by the osd map.
94  *
95  * We keep track of pending OSD requests (read, write), resubmit
96  * requests to different OSDs when the cluster topology/data layout
97  * change, or retry the affected requests when the communications
98  * channel with an OSD is reset.
99  */
100 
101 /*
102  * calculate the mapping of a file extent onto an object, and fill out the
103  * request accordingly.  shorten extent as necessary if it crosses an
104  * object boundary.
105  *
106  * fill osd op in request message.
107  */
108 static void calc_layout(struct ceph_osd_client *osdc,
109 			struct ceph_vino vino,
110 			struct ceph_file_layout *layout,
111 			u64 off, u64 *plen,
112 			struct ceph_osd_request *req,
113 			struct ceph_osd_req_op *op)
114 {
115 	u64 bno;
116 
117 	ceph_calc_raw_layout(osdc, layout, vino.snap, off,
118 			     plen, &bno, req, op);
119 
120 	sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
121 	req->r_oid_len = strlen(req->r_oid);
122 }
123 
124 /*
125  * requests
126  */
127 void ceph_osdc_release_request(struct kref *kref)
128 {
129 	struct ceph_osd_request *req = container_of(kref,
130 						    struct ceph_osd_request,
131 						    r_kref);
132 
133 	if (req->r_request)
134 		ceph_msg_put(req->r_request);
135 	if (req->r_reply)
136 		ceph_msg_put(req->r_reply);
137 	if (req->r_con_filling_msg) {
138 		dout("release_request revoking pages %p from con %p\n",
139 		     req->r_pages, req->r_con_filling_msg);
140 		ceph_con_revoke_message(req->r_con_filling_msg,
141 				      req->r_reply);
142 		ceph_con_put(req->r_con_filling_msg);
143 	}
144 	if (req->r_own_pages)
145 		ceph_release_page_vector(req->r_pages,
146 					 req->r_num_pages);
147 #ifdef CONFIG_BLOCK
148 	if (req->r_bio)
149 		bio_put(req->r_bio);
150 #endif
151 	ceph_put_snap_context(req->r_snapc);
152 	if (req->r_trail) {
153 		ceph_pagelist_release(req->r_trail);
154 		kfree(req->r_trail);
155 	}
156 	if (req->r_mempool)
157 		mempool_free(req, req->r_osdc->req_mempool);
158 	else
159 		kfree(req);
160 }
161 EXPORT_SYMBOL(ceph_osdc_release_request);
162 
163 static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
164 {
165 	int i = 0;
166 
167 	if (needs_trail)
168 		*needs_trail = 0;
169 	while (ops[i].op) {
170 		if (needs_trail && op_needs_trail(ops[i].op))
171 			*needs_trail = 1;
172 		i++;
173 	}
174 
175 	return i;
176 }
177 
178 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
179 					       int flags,
180 					       struct ceph_snap_context *snapc,
181 					       struct ceph_osd_req_op *ops,
182 					       bool use_mempool,
183 					       gfp_t gfp_flags,
184 					       struct page **pages,
185 					       struct bio *bio)
186 {
187 	struct ceph_osd_request *req;
188 	struct ceph_msg *msg;
189 	int needs_trail;
190 	int num_op = get_num_ops(ops, &needs_trail);
191 	size_t msg_size = sizeof(struct ceph_osd_request_head);
192 
193 	msg_size += num_op*sizeof(struct ceph_osd_op);
194 
195 	if (use_mempool) {
196 		req = mempool_alloc(osdc->req_mempool, gfp_flags);
197 		memset(req, 0, sizeof(*req));
198 	} else {
199 		req = kzalloc(sizeof(*req), gfp_flags);
200 	}
201 	if (req == NULL)
202 		return NULL;
203 
204 	req->r_osdc = osdc;
205 	req->r_mempool = use_mempool;
206 
207 	kref_init(&req->r_kref);
208 	init_completion(&req->r_completion);
209 	init_completion(&req->r_safe_completion);
210 	INIT_LIST_HEAD(&req->r_unsafe_item);
211 	req->r_flags = flags;
212 
213 	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
214 
215 	/* create reply message */
216 	if (use_mempool)
217 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
218 	else
219 		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
220 				   OSD_OPREPLY_FRONT_LEN, gfp_flags);
221 	if (!msg) {
222 		ceph_osdc_put_request(req);
223 		return NULL;
224 	}
225 	req->r_reply = msg;
226 
227 	/* allocate space for the trailing data */
228 	if (needs_trail) {
229 		req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
230 		if (!req->r_trail) {
231 			ceph_osdc_put_request(req);
232 			return NULL;
233 		}
234 		ceph_pagelist_init(req->r_trail);
235 	}
236 	/* create request message; allow space for oid */
237 	msg_size += 40;
238 	if (snapc)
239 		msg_size += sizeof(u64) * snapc->num_snaps;
240 	if (use_mempool)
241 		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
242 	else
243 		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
244 	if (!msg) {
245 		ceph_osdc_put_request(req);
246 		return NULL;
247 	}
248 
249 	msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
250 	memset(msg->front.iov_base, 0, msg->front.iov_len);
251 
252 	req->r_request = msg;
253 	req->r_pages = pages;
254 #ifdef CONFIG_BLOCK
255 	if (bio) {
256 		req->r_bio = bio;
257 		bio_get(req->r_bio);
258 	}
259 #endif
260 
261 	return req;
262 }
263 EXPORT_SYMBOL(ceph_osdc_alloc_request);
264 
265 static void osd_req_encode_op(struct ceph_osd_request *req,
266 			      struct ceph_osd_op *dst,
267 			      struct ceph_osd_req_op *src)
268 {
269 	dst->op = cpu_to_le16(src->op);
270 
271 	switch (dst->op) {
272 	case CEPH_OSD_OP_READ:
273 	case CEPH_OSD_OP_WRITE:
274 		dst->extent.offset =
275 			cpu_to_le64(src->extent.offset);
276 		dst->extent.length =
277 			cpu_to_le64(src->extent.length);
278 		dst->extent.truncate_size =
279 			cpu_to_le64(src->extent.truncate_size);
280 		dst->extent.truncate_seq =
281 			cpu_to_le32(src->extent.truncate_seq);
282 		break;
283 
284 	case CEPH_OSD_OP_GETXATTR:
285 	case CEPH_OSD_OP_SETXATTR:
286 	case CEPH_OSD_OP_CMPXATTR:
287 		BUG_ON(!req->r_trail);
288 
289 		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
290 		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
291 		dst->xattr.cmp_op = src->xattr.cmp_op;
292 		dst->xattr.cmp_mode = src->xattr.cmp_mode;
293 		ceph_pagelist_append(req->r_trail, src->xattr.name,
294 				     src->xattr.name_len);
295 		ceph_pagelist_append(req->r_trail, src->xattr.val,
296 				     src->xattr.value_len);
297 		break;
298 	case CEPH_OSD_OP_CALL:
299 		BUG_ON(!req->r_trail);
300 
301 		dst->cls.class_len = src->cls.class_len;
302 		dst->cls.method_len = src->cls.method_len;
303 		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
304 
305 		ceph_pagelist_append(req->r_trail, src->cls.class_name,
306 				     src->cls.class_len);
307 		ceph_pagelist_append(req->r_trail, src->cls.method_name,
308 				     src->cls.method_len);
309 		ceph_pagelist_append(req->r_trail, src->cls.indata,
310 				     src->cls.indata_len);
311 		break;
312 	case CEPH_OSD_OP_ROLLBACK:
313 		dst->snap.snapid = cpu_to_le64(src->snap.snapid);
314 		break;
315 	case CEPH_OSD_OP_STARTSYNC:
316 		break;
317 	default:
318 		pr_err("unrecognized osd opcode %d\n", dst->op);
319 		WARN_ON(1);
320 		break;
321 	}
322 	dst->payload_len = cpu_to_le32(src->payload_len);
323 }
324 
325 /*
326  * build new request AND message
327  *
328  */
329 void ceph_osdc_build_request(struct ceph_osd_request *req,
330 			     u64 off, u64 *plen,
331 			     struct ceph_osd_req_op *src_ops,
332 			     struct ceph_snap_context *snapc,
333 			     struct timespec *mtime,
334 			     const char *oid,
335 			     int oid_len)
336 {
337 	struct ceph_msg *msg = req->r_request;
338 	struct ceph_osd_request_head *head;
339 	struct ceph_osd_req_op *src_op;
340 	struct ceph_osd_op *op;
341 	void *p;
342 	int num_op = get_num_ops(src_ops, NULL);
343 	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
344 	int flags = req->r_flags;
345 	u64 data_len = 0;
346 	int i;
347 
348 	head = msg->front.iov_base;
349 	op = (void *)(head + 1);
350 	p = (void *)(op + num_op);
351 
352 	req->r_snapc = ceph_get_snap_context(snapc);
353 
354 	head->client_inc = cpu_to_le32(1); /* always, for now. */
355 	head->flags = cpu_to_le32(flags);
356 	if (flags & CEPH_OSD_FLAG_WRITE)
357 		ceph_encode_timespec(&head->mtime, mtime);
358 	head->num_ops = cpu_to_le16(num_op);
359 
360 
361 	/* fill in oid */
362 	head->object_len = cpu_to_le32(oid_len);
363 	memcpy(p, oid, oid_len);
364 	p += oid_len;
365 
366 	src_op = src_ops;
367 	while (src_op->op) {
368 		osd_req_encode_op(req, op, src_op);
369 		src_op++;
370 		op++;
371 	}
372 
373 	if (req->r_trail)
374 		data_len += req->r_trail->length;
375 
376 	if (snapc) {
377 		head->snap_seq = cpu_to_le64(snapc->seq);
378 		head->num_snaps = cpu_to_le32(snapc->num_snaps);
379 		for (i = 0; i < snapc->num_snaps; i++) {
380 			put_unaligned_le64(snapc->snaps[i], p);
381 			p += sizeof(u64);
382 		}
383 	}
384 
385 	if (flags & CEPH_OSD_FLAG_WRITE) {
386 		req->r_request->hdr.data_off = cpu_to_le16(off);
387 		req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
388 	} else if (data_len) {
389 		req->r_request->hdr.data_off = 0;
390 		req->r_request->hdr.data_len = cpu_to_le32(data_len);
391 	}
392 
393 	req->r_request->page_alignment = req->r_page_alignment;
394 
395 	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
396 	msg_size = p - msg->front.iov_base;
397 	msg->front.iov_len = msg_size;
398 	msg->hdr.front_len = cpu_to_le32(msg_size);
399 	return;
400 }
401 EXPORT_SYMBOL(ceph_osdc_build_request);
402 
403 /*
404  * build new request AND message, calculate layout, and adjust file
405  * extent as needed.
406  *
407  * if the file was recently truncated, we include information about its
408  * old and new size so that the object can be updated appropriately.  (we
409  * avoid synchronously deleting truncated objects because it's slow.)
410  *
411  * if @do_sync, include a 'startsync' command so that the osd will flush
412  * data quickly.
413  */
414 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
415 					       struct ceph_file_layout *layout,
416 					       struct ceph_vino vino,
417 					       u64 off, u64 *plen,
418 					       int opcode, int flags,
419 					       struct ceph_snap_context *snapc,
420 					       int do_sync,
421 					       u32 truncate_seq,
422 					       u64 truncate_size,
423 					       struct timespec *mtime,
424 					       bool use_mempool, int num_reply,
425 					       int page_align)
426 {
427 	struct ceph_osd_req_op ops[3];
428 	struct ceph_osd_request *req;
429 
430 	ops[0].op = opcode;
431 	ops[0].extent.truncate_seq = truncate_seq;
432 	ops[0].extent.truncate_size = truncate_size;
433 	ops[0].payload_len = 0;
434 
435 	if (do_sync) {
436 		ops[1].op = CEPH_OSD_OP_STARTSYNC;
437 		ops[1].payload_len = 0;
438 		ops[2].op = 0;
439 	} else
440 		ops[1].op = 0;
441 
442 	req = ceph_osdc_alloc_request(osdc, flags,
443 					 snapc, ops,
444 					 use_mempool,
445 					 GFP_NOFS, NULL, NULL);
446 	if (IS_ERR(req))
447 		return req;
448 
449 	/* calculate max write size */
450 	calc_layout(osdc, vino, layout, off, plen, req, ops);
451 	req->r_file_layout = *layout;  /* keep a copy */
452 
453 	/* in case it differs from natural alignment that calc_layout
454 	   filled in for us */
455 	req->r_page_alignment = page_align;
456 
457 	ceph_osdc_build_request(req, off, plen, ops,
458 				snapc,
459 				mtime,
460 				req->r_oid, req->r_oid_len);
461 
462 	return req;
463 }
464 EXPORT_SYMBOL(ceph_osdc_new_request);
465 
466 /*
467  * We keep osd requests in an rbtree, sorted by ->r_tid.
468  */
469 static void __insert_request(struct ceph_osd_client *osdc,
470 			     struct ceph_osd_request *new)
471 {
472 	struct rb_node **p = &osdc->requests.rb_node;
473 	struct rb_node *parent = NULL;
474 	struct ceph_osd_request *req = NULL;
475 
476 	while (*p) {
477 		parent = *p;
478 		req = rb_entry(parent, struct ceph_osd_request, r_node);
479 		if (new->r_tid < req->r_tid)
480 			p = &(*p)->rb_left;
481 		else if (new->r_tid > req->r_tid)
482 			p = &(*p)->rb_right;
483 		else
484 			BUG();
485 	}
486 
487 	rb_link_node(&new->r_node, parent, p);
488 	rb_insert_color(&new->r_node, &osdc->requests);
489 }
490 
491 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
492 						 u64 tid)
493 {
494 	struct ceph_osd_request *req;
495 	struct rb_node *n = osdc->requests.rb_node;
496 
497 	while (n) {
498 		req = rb_entry(n, struct ceph_osd_request, r_node);
499 		if (tid < req->r_tid)
500 			n = n->rb_left;
501 		else if (tid > req->r_tid)
502 			n = n->rb_right;
503 		else
504 			return req;
505 	}
506 	return NULL;
507 }
508 
509 static struct ceph_osd_request *
510 __lookup_request_ge(struct ceph_osd_client *osdc,
511 		    u64 tid)
512 {
513 	struct ceph_osd_request *req;
514 	struct rb_node *n = osdc->requests.rb_node;
515 
516 	while (n) {
517 		req = rb_entry(n, struct ceph_osd_request, r_node);
518 		if (tid < req->r_tid) {
519 			if (!n->rb_left)
520 				return req;
521 			n = n->rb_left;
522 		} else if (tid > req->r_tid) {
523 			n = n->rb_right;
524 		} else {
525 			return req;
526 		}
527 	}
528 	return NULL;
529 }
530 
531 /*
532  * Resubmit requests pending on the given osd.
533  */
534 static void __kick_osd_requests(struct ceph_osd_client *osdc,
535 				struct ceph_osd *osd)
536 {
537 	struct ceph_osd_request *req;
538 	int err;
539 
540 	dout("__kick_osd_requests osd%d\n", osd->o_osd);
541 	err = __reset_osd(osdc, osd);
542 	if (err == -EAGAIN)
543 		return;
544 
545 	list_for_each_entry(req, &osd->o_requests, r_osd_item) {
546 		list_move(&req->r_req_lru_item, &osdc->req_unsent);
547 		dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
548 		     osd->o_osd);
549 		req->r_flags |= CEPH_OSD_FLAG_RETRY;
550 	}
551 }
552 
553 static void kick_osd_requests(struct ceph_osd_client *osdc,
554 			      struct ceph_osd *kickosd)
555 {
556 	mutex_lock(&osdc->request_mutex);
557 	__kick_osd_requests(osdc, kickosd);
558 	mutex_unlock(&osdc->request_mutex);
559 }
560 
561 /*
562  * If the osd connection drops, we need to resubmit all requests.
563  */
564 static void osd_reset(struct ceph_connection *con)
565 {
566 	struct ceph_osd *osd = con->private;
567 	struct ceph_osd_client *osdc;
568 
569 	if (!osd)
570 		return;
571 	dout("osd_reset osd%d\n", osd->o_osd);
572 	osdc = osd->o_osdc;
573 	down_read(&osdc->map_sem);
574 	kick_osd_requests(osdc, osd);
575 	send_queued(osdc);
576 	up_read(&osdc->map_sem);
577 }
578 
579 /*
580  * Track open sessions with osds.
581  */
582 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
583 {
584 	struct ceph_osd *osd;
585 
586 	osd = kzalloc(sizeof(*osd), GFP_NOFS);
587 	if (!osd)
588 		return NULL;
589 
590 	atomic_set(&osd->o_ref, 1);
591 	osd->o_osdc = osdc;
592 	INIT_LIST_HEAD(&osd->o_requests);
593 	INIT_LIST_HEAD(&osd->o_osd_lru);
594 	osd->o_incarnation = 1;
595 
596 	ceph_con_init(osdc->client->msgr, &osd->o_con);
597 	osd->o_con.private = osd;
598 	osd->o_con.ops = &osd_con_ops;
599 	osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
600 
601 	INIT_LIST_HEAD(&osd->o_keepalive_item);
602 	return osd;
603 }
604 
605 static struct ceph_osd *get_osd(struct ceph_osd *osd)
606 {
607 	if (atomic_inc_not_zero(&osd->o_ref)) {
608 		dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
609 		     atomic_read(&osd->o_ref));
610 		return osd;
611 	} else {
612 		dout("get_osd %p FAIL\n", osd);
613 		return NULL;
614 	}
615 }
616 
617 static void put_osd(struct ceph_osd *osd)
618 {
619 	dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
620 	     atomic_read(&osd->o_ref) - 1);
621 	if (atomic_dec_and_test(&osd->o_ref)) {
622 		struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
623 
624 		if (osd->o_authorizer)
625 			ac->ops->destroy_authorizer(ac, osd->o_authorizer);
626 		kfree(osd);
627 	}
628 }
629 
630 /*
631  * remove an osd from our map
632  */
633 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
634 {
635 	dout("__remove_osd %p\n", osd);
636 	BUG_ON(!list_empty(&osd->o_requests));
637 	rb_erase(&osd->o_node, &osdc->osds);
638 	list_del_init(&osd->o_osd_lru);
639 	ceph_con_close(&osd->o_con);
640 	put_osd(osd);
641 }
642 
643 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
644 			      struct ceph_osd *osd)
645 {
646 	dout("__move_osd_to_lru %p\n", osd);
647 	BUG_ON(!list_empty(&osd->o_osd_lru));
648 	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
649 	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
650 }
651 
652 static void __remove_osd_from_lru(struct ceph_osd *osd)
653 {
654 	dout("__remove_osd_from_lru %p\n", osd);
655 	if (!list_empty(&osd->o_osd_lru))
656 		list_del_init(&osd->o_osd_lru);
657 }
658 
659 static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
660 {
661 	struct ceph_osd *osd, *nosd;
662 
663 	dout("__remove_old_osds %p\n", osdc);
664 	mutex_lock(&osdc->request_mutex);
665 	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
666 		if (!remove_all && time_before(jiffies, osd->lru_ttl))
667 			break;
668 		__remove_osd(osdc, osd);
669 	}
670 	mutex_unlock(&osdc->request_mutex);
671 }
672 
673 /*
674  * reset osd connect
675  */
676 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
677 {
678 	struct ceph_osd_request *req;
679 	int ret = 0;
680 
681 	dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
682 	if (list_empty(&osd->o_requests)) {
683 		__remove_osd(osdc, osd);
684 	} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
685 			  &osd->o_con.peer_addr,
686 			  sizeof(osd->o_con.peer_addr)) == 0 &&
687 		   !ceph_con_opened(&osd->o_con)) {
688 		dout(" osd addr hasn't changed and connection never opened,"
689 		     " letting msgr retry");
690 		/* touch each r_stamp for handle_timeout()'s benfit */
691 		list_for_each_entry(req, &osd->o_requests, r_osd_item)
692 			req->r_stamp = jiffies;
693 		ret = -EAGAIN;
694 	} else {
695 		ceph_con_close(&osd->o_con);
696 		ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
697 		osd->o_incarnation++;
698 	}
699 	return ret;
700 }
701 
702 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
703 {
704 	struct rb_node **p = &osdc->osds.rb_node;
705 	struct rb_node *parent = NULL;
706 	struct ceph_osd *osd = NULL;
707 
708 	while (*p) {
709 		parent = *p;
710 		osd = rb_entry(parent, struct ceph_osd, o_node);
711 		if (new->o_osd < osd->o_osd)
712 			p = &(*p)->rb_left;
713 		else if (new->o_osd > osd->o_osd)
714 			p = &(*p)->rb_right;
715 		else
716 			BUG();
717 	}
718 
719 	rb_link_node(&new->o_node, parent, p);
720 	rb_insert_color(&new->o_node, &osdc->osds);
721 }
722 
723 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
724 {
725 	struct ceph_osd *osd;
726 	struct rb_node *n = osdc->osds.rb_node;
727 
728 	while (n) {
729 		osd = rb_entry(n, struct ceph_osd, o_node);
730 		if (o < osd->o_osd)
731 			n = n->rb_left;
732 		else if (o > osd->o_osd)
733 			n = n->rb_right;
734 		else
735 			return osd;
736 	}
737 	return NULL;
738 }
739 
740 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
741 {
742 	schedule_delayed_work(&osdc->timeout_work,
743 			osdc->client->options->osd_keepalive_timeout * HZ);
744 }
745 
746 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
747 {
748 	cancel_delayed_work(&osdc->timeout_work);
749 }
750 
751 /*
752  * Register request, assign tid.  If this is the first request, set up
753  * the timeout event.
754  */
755 static void register_request(struct ceph_osd_client *osdc,
756 			     struct ceph_osd_request *req)
757 {
758 	mutex_lock(&osdc->request_mutex);
759 	req->r_tid = ++osdc->last_tid;
760 	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
761 	INIT_LIST_HEAD(&req->r_req_lru_item);
762 
763 	dout("register_request %p tid %lld\n", req, req->r_tid);
764 	__insert_request(osdc, req);
765 	ceph_osdc_get_request(req);
766 	osdc->num_requests++;
767 
768 	if (osdc->num_requests == 1) {
769 		dout(" first request, scheduling timeout\n");
770 		__schedule_osd_timeout(osdc);
771 	}
772 	mutex_unlock(&osdc->request_mutex);
773 }
774 
775 /*
776  * called under osdc->request_mutex
777  */
778 static void __unregister_request(struct ceph_osd_client *osdc,
779 				 struct ceph_osd_request *req)
780 {
781 	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
782 	rb_erase(&req->r_node, &osdc->requests);
783 	osdc->num_requests--;
784 
785 	if (req->r_osd) {
786 		/* make sure the original request isn't in flight. */
787 		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
788 
789 		list_del_init(&req->r_osd_item);
790 		if (list_empty(&req->r_osd->o_requests))
791 			__move_osd_to_lru(osdc, req->r_osd);
792 		req->r_osd = NULL;
793 	}
794 
795 	ceph_osdc_put_request(req);
796 
797 	list_del_init(&req->r_req_lru_item);
798 	if (osdc->num_requests == 0) {
799 		dout(" no requests, canceling timeout\n");
800 		__cancel_osd_timeout(osdc);
801 	}
802 }
803 
804 /*
805  * Cancel a previously queued request message
806  */
807 static void __cancel_request(struct ceph_osd_request *req)
808 {
809 	if (req->r_sent && req->r_osd) {
810 		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
811 		req->r_sent = 0;
812 	}
813 }
814 
815 /*
816  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
817  * (as needed), and set the request r_osd appropriately.  If there is
818  * no up osd, set r_osd to NULL.  Move the request to the appropiate list
819  * (unsent, homeless) or leave on in-flight lru.
820  *
821  * Return 0 if unchanged, 1 if changed, or negative on error.
822  *
823  * Caller should hold map_sem for read and request_mutex.
824  */
825 static int __map_request(struct ceph_osd_client *osdc,
826 			 struct ceph_osd_request *req)
827 {
828 	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
829 	struct ceph_pg pgid;
830 	int acting[CEPH_PG_MAX_SIZE];
831 	int o = -1, num = 0;
832 	int err;
833 
834 	dout("map_request %p tid %lld\n", req, req->r_tid);
835 	err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
836 				      &req->r_file_layout, osdc->osdmap);
837 	if (err) {
838 		list_move(&req->r_req_lru_item, &osdc->req_notarget);
839 		return err;
840 	}
841 	pgid = reqhead->layout.ol_pgid;
842 	req->r_pgid = pgid;
843 
844 	err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
845 	if (err > 0) {
846 		o = acting[0];
847 		num = err;
848 	}
849 
850 	if ((req->r_osd && req->r_osd->o_osd == o &&
851 	     req->r_sent >= req->r_osd->o_incarnation &&
852 	     req->r_num_pg_osds == num &&
853 	     memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
854 	    (req->r_osd == NULL && o == -1))
855 		return 0;  /* no change */
856 
857 	dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
858 	     req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
859 	     req->r_osd ? req->r_osd->o_osd : -1);
860 
861 	/* record full pg acting set */
862 	memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
863 	req->r_num_pg_osds = num;
864 
865 	if (req->r_osd) {
866 		__cancel_request(req);
867 		list_del_init(&req->r_osd_item);
868 		req->r_osd = NULL;
869 	}
870 
871 	req->r_osd = __lookup_osd(osdc, o);
872 	if (!req->r_osd && o >= 0) {
873 		err = -ENOMEM;
874 		req->r_osd = create_osd(osdc);
875 		if (!req->r_osd) {
876 			list_move(&req->r_req_lru_item, &osdc->req_notarget);
877 			goto out;
878 		}
879 
880 		dout("map_request osd %p is osd%d\n", req->r_osd, o);
881 		req->r_osd->o_osd = o;
882 		req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
883 		__insert_osd(osdc, req->r_osd);
884 
885 		ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
886 	}
887 
888 	if (req->r_osd) {
889 		__remove_osd_from_lru(req->r_osd);
890 		list_add(&req->r_osd_item, &req->r_osd->o_requests);
891 		list_move(&req->r_req_lru_item, &osdc->req_unsent);
892 	} else {
893 		list_move(&req->r_req_lru_item, &osdc->req_notarget);
894 	}
895 	err = 1;   /* osd or pg changed */
896 
897 out:
898 	return err;
899 }
900 
901 /*
902  * caller should hold map_sem (for read) and request_mutex
903  */
904 static int __send_request(struct ceph_osd_client *osdc,
905 			  struct ceph_osd_request *req)
906 {
907 	struct ceph_osd_request_head *reqhead;
908 
909 	dout("send_request %p tid %llu to osd%d flags %d\n",
910 	     req, req->r_tid, req->r_osd->o_osd, req->r_flags);
911 
912 	reqhead = req->r_request->front.iov_base;
913 	reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
914 	reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
915 	reqhead->reassert_version = req->r_reassert_version;
916 
917 	req->r_stamp = jiffies;
918 	list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
919 
920 	ceph_msg_get(req->r_request); /* send consumes a ref */
921 	ceph_con_send(&req->r_osd->o_con, req->r_request);
922 	req->r_sent = req->r_osd->o_incarnation;
923 	return 0;
924 }
925 
926 /*
927  * Send any requests in the queue (req_unsent).
928  */
929 static void send_queued(struct ceph_osd_client *osdc)
930 {
931 	struct ceph_osd_request *req, *tmp;
932 
933 	dout("send_queued\n");
934 	mutex_lock(&osdc->request_mutex);
935 	list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
936 		__send_request(osdc, req);
937 	}
938 	mutex_unlock(&osdc->request_mutex);
939 }
940 
941 /*
942  * Timeout callback, called every N seconds when 1 or more osd
943  * requests has been active for more than N seconds.  When this
944  * happens, we ping all OSDs with requests who have timed out to
945  * ensure any communications channel reset is detected.  Reset the
946  * request timeouts another N seconds in the future as we go.
947  * Reschedule the timeout event another N seconds in future (unless
948  * there are no open requests).
949  */
950 static void handle_timeout(struct work_struct *work)
951 {
952 	struct ceph_osd_client *osdc =
953 		container_of(work, struct ceph_osd_client, timeout_work.work);
954 	struct ceph_osd_request *req, *last_req = NULL;
955 	struct ceph_osd *osd;
956 	unsigned long timeout = osdc->client->options->osd_timeout * HZ;
957 	unsigned long keepalive =
958 		osdc->client->options->osd_keepalive_timeout * HZ;
959 	unsigned long last_stamp = 0;
960 	struct list_head slow_osds;
961 
962 	dout("timeout\n");
963 	down_read(&osdc->map_sem);
964 
965 	ceph_monc_request_next_osdmap(&osdc->client->monc);
966 
967 	mutex_lock(&osdc->request_mutex);
968 
969 	/*
970 	 * reset osds that appear to be _really_ unresponsive.  this
971 	 * is a failsafe measure.. we really shouldn't be getting to
972 	 * this point if the system is working properly.  the monitors
973 	 * should mark the osd as failed and we should find out about
974 	 * it from an updated osd map.
975 	 */
976 	while (timeout && !list_empty(&osdc->req_lru)) {
977 		req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
978 				 r_req_lru_item);
979 
980 		if (time_before(jiffies, req->r_stamp + timeout))
981 			break;
982 
983 		BUG_ON(req == last_req && req->r_stamp == last_stamp);
984 		last_req = req;
985 		last_stamp = req->r_stamp;
986 
987 		osd = req->r_osd;
988 		BUG_ON(!osd);
989 		pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
990 			   req->r_tid, osd->o_osd);
991 		__kick_osd_requests(osdc, osd);
992 	}
993 
994 	/*
995 	 * ping osds that are a bit slow.  this ensures that if there
996 	 * is a break in the TCP connection we will notice, and reopen
997 	 * a connection with that osd (from the fault callback).
998 	 */
999 	INIT_LIST_HEAD(&slow_osds);
1000 	list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1001 		if (time_before(jiffies, req->r_stamp + keepalive))
1002 			break;
1003 
1004 		osd = req->r_osd;
1005 		BUG_ON(!osd);
1006 		dout(" tid %llu is slow, will send keepalive on osd%d\n",
1007 		     req->r_tid, osd->o_osd);
1008 		list_move_tail(&osd->o_keepalive_item, &slow_osds);
1009 	}
1010 	while (!list_empty(&slow_osds)) {
1011 		osd = list_entry(slow_osds.next, struct ceph_osd,
1012 				 o_keepalive_item);
1013 		list_del_init(&osd->o_keepalive_item);
1014 		ceph_con_keepalive(&osd->o_con);
1015 	}
1016 
1017 	__schedule_osd_timeout(osdc);
1018 	mutex_unlock(&osdc->request_mutex);
1019 	send_queued(osdc);
1020 	up_read(&osdc->map_sem);
1021 }
1022 
1023 static void handle_osds_timeout(struct work_struct *work)
1024 {
1025 	struct ceph_osd_client *osdc =
1026 		container_of(work, struct ceph_osd_client,
1027 			     osds_timeout_work.work);
1028 	unsigned long delay =
1029 		osdc->client->options->osd_idle_ttl * HZ >> 2;
1030 
1031 	dout("osds timeout\n");
1032 	down_read(&osdc->map_sem);
1033 	remove_old_osds(osdc, 0);
1034 	up_read(&osdc->map_sem);
1035 
1036 	schedule_delayed_work(&osdc->osds_timeout_work,
1037 			      round_jiffies_relative(delay));
1038 }
1039 
1040 /*
1041  * handle osd op reply.  either call the callback if it is specified,
1042  * or do the completion to wake up the waiting thread.
1043  */
1044 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1045 			 struct ceph_connection *con)
1046 {
1047 	struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1048 	struct ceph_osd_request *req;
1049 	u64 tid;
1050 	int numops, object_len, flags;
1051 	s32 result;
1052 
1053 	tid = le64_to_cpu(msg->hdr.tid);
1054 	if (msg->front.iov_len < sizeof(*rhead))
1055 		goto bad;
1056 	numops = le32_to_cpu(rhead->num_ops);
1057 	object_len = le32_to_cpu(rhead->object_len);
1058 	result = le32_to_cpu(rhead->result);
1059 	if (msg->front.iov_len != sizeof(*rhead) + object_len +
1060 	    numops * sizeof(struct ceph_osd_op))
1061 		goto bad;
1062 	dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1063 
1064 	/* lookup */
1065 	mutex_lock(&osdc->request_mutex);
1066 	req = __lookup_request(osdc, tid);
1067 	if (req == NULL) {
1068 		dout("handle_reply tid %llu dne\n", tid);
1069 		mutex_unlock(&osdc->request_mutex);
1070 		return;
1071 	}
1072 	ceph_osdc_get_request(req);
1073 	flags = le32_to_cpu(rhead->flags);
1074 
1075 	/*
1076 	 * if this connection filled our message, drop our reference now, to
1077 	 * avoid a (safe but slower) revoke later.
1078 	 */
1079 	if (req->r_con_filling_msg == con && req->r_reply == msg) {
1080 		dout(" dropping con_filling_msg ref %p\n", con);
1081 		req->r_con_filling_msg = NULL;
1082 		ceph_con_put(con);
1083 	}
1084 
1085 	if (!req->r_got_reply) {
1086 		unsigned bytes;
1087 
1088 		req->r_result = le32_to_cpu(rhead->result);
1089 		bytes = le32_to_cpu(msg->hdr.data_len);
1090 		dout("handle_reply result %d bytes %d\n", req->r_result,
1091 		     bytes);
1092 		if (req->r_result == 0)
1093 			req->r_result = bytes;
1094 
1095 		/* in case this is a write and we need to replay, */
1096 		req->r_reassert_version = rhead->reassert_version;
1097 
1098 		req->r_got_reply = 1;
1099 	} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1100 		dout("handle_reply tid %llu dup ack\n", tid);
1101 		mutex_unlock(&osdc->request_mutex);
1102 		goto done;
1103 	}
1104 
1105 	dout("handle_reply tid %llu flags %d\n", tid, flags);
1106 
1107 	/* either this is a read, or we got the safe response */
1108 	if (result < 0 ||
1109 	    (flags & CEPH_OSD_FLAG_ONDISK) ||
1110 	    ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1111 		__unregister_request(osdc, req);
1112 
1113 	mutex_unlock(&osdc->request_mutex);
1114 
1115 	if (req->r_callback)
1116 		req->r_callback(req, msg);
1117 	else
1118 		complete_all(&req->r_completion);
1119 
1120 	if (flags & CEPH_OSD_FLAG_ONDISK) {
1121 		if (req->r_safe_callback)
1122 			req->r_safe_callback(req, msg);
1123 		complete_all(&req->r_safe_completion);  /* fsync waiter */
1124 	}
1125 
1126 done:
1127 	ceph_osdc_put_request(req);
1128 	return;
1129 
1130 bad:
1131 	pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1132 	       (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1133 	       (int)sizeof(*rhead));
1134 	ceph_msg_dump(msg);
1135 }
1136 
1137 static void reset_changed_osds(struct ceph_osd_client *osdc)
1138 {
1139 	struct rb_node *p, *n;
1140 
1141 	for (p = rb_first(&osdc->osds); p; p = n) {
1142 		struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1143 
1144 		n = rb_next(p);
1145 		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1146 		    memcmp(&osd->o_con.peer_addr,
1147 			   ceph_osd_addr(osdc->osdmap,
1148 					 osd->o_osd),
1149 			   sizeof(struct ceph_entity_addr)) != 0)
1150 			__reset_osd(osdc, osd);
1151 	}
1152 }
1153 
1154 /*
1155  * Requeue requests whose mapping to an OSD has changed.  If requests map to
1156  * no osd, request a new map.
1157  *
1158  * Caller should hold map_sem for read and request_mutex.
1159  */
1160 static void kick_requests(struct ceph_osd_client *osdc)
1161 {
1162 	struct ceph_osd_request *req;
1163 	struct rb_node *p;
1164 	int needmap = 0;
1165 	int err;
1166 
1167 	dout("kick_requests\n");
1168 	mutex_lock(&osdc->request_mutex);
1169 	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1170 		req = rb_entry(p, struct ceph_osd_request, r_node);
1171 		err = __map_request(osdc, req);
1172 		if (err < 0)
1173 			continue;  /* error */
1174 		if (req->r_osd == NULL) {
1175 			dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1176 			needmap++;  /* request a newer map */
1177 		} else if (err > 0) {
1178 			dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
1179 			     req->r_osd ? req->r_osd->o_osd : -1);
1180 			req->r_flags |= CEPH_OSD_FLAG_RETRY;
1181 		}
1182 	}
1183 	mutex_unlock(&osdc->request_mutex);
1184 
1185 	if (needmap) {
1186 		dout("%d requests for down osds, need new map\n", needmap);
1187 		ceph_monc_request_next_osdmap(&osdc->client->monc);
1188 	}
1189 }
1190 
1191 
1192 /*
1193  * Process updated osd map.
1194  *
1195  * The message contains any number of incremental and full maps, normally
1196  * indicating some sort of topology change in the cluster.  Kick requests
1197  * off to different OSDs as needed.
1198  */
1199 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1200 {
1201 	void *p, *end, *next;
1202 	u32 nr_maps, maplen;
1203 	u32 epoch;
1204 	struct ceph_osdmap *newmap = NULL, *oldmap;
1205 	int err;
1206 	struct ceph_fsid fsid;
1207 
1208 	dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1209 	p = msg->front.iov_base;
1210 	end = p + msg->front.iov_len;
1211 
1212 	/* verify fsid */
1213 	ceph_decode_need(&p, end, sizeof(fsid), bad);
1214 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
1215 	if (ceph_check_fsid(osdc->client, &fsid) < 0)
1216 		return;
1217 
1218 	down_write(&osdc->map_sem);
1219 
1220 	/* incremental maps */
1221 	ceph_decode_32_safe(&p, end, nr_maps, bad);
1222 	dout(" %d inc maps\n", nr_maps);
1223 	while (nr_maps > 0) {
1224 		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1225 		epoch = ceph_decode_32(&p);
1226 		maplen = ceph_decode_32(&p);
1227 		ceph_decode_need(&p, end, maplen, bad);
1228 		next = p + maplen;
1229 		if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1230 			dout("applying incremental map %u len %d\n",
1231 			     epoch, maplen);
1232 			newmap = osdmap_apply_incremental(&p, next,
1233 							  osdc->osdmap,
1234 							  osdc->client->msgr);
1235 			if (IS_ERR(newmap)) {
1236 				err = PTR_ERR(newmap);
1237 				goto bad;
1238 			}
1239 			BUG_ON(!newmap);
1240 			if (newmap != osdc->osdmap) {
1241 				ceph_osdmap_destroy(osdc->osdmap);
1242 				osdc->osdmap = newmap;
1243 			}
1244 			kick_requests(osdc);
1245 			reset_changed_osds(osdc);
1246 		} else {
1247 			dout("ignoring incremental map %u len %d\n",
1248 			     epoch, maplen);
1249 		}
1250 		p = next;
1251 		nr_maps--;
1252 	}
1253 	if (newmap)
1254 		goto done;
1255 
1256 	/* full maps */
1257 	ceph_decode_32_safe(&p, end, nr_maps, bad);
1258 	dout(" %d full maps\n", nr_maps);
1259 	while (nr_maps) {
1260 		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1261 		epoch = ceph_decode_32(&p);
1262 		maplen = ceph_decode_32(&p);
1263 		ceph_decode_need(&p, end, maplen, bad);
1264 		if (nr_maps > 1) {
1265 			dout("skipping non-latest full map %u len %d\n",
1266 			     epoch, maplen);
1267 		} else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1268 			dout("skipping full map %u len %d, "
1269 			     "older than our %u\n", epoch, maplen,
1270 			     osdc->osdmap->epoch);
1271 		} else {
1272 			dout("taking full map %u len %d\n", epoch, maplen);
1273 			newmap = osdmap_decode(&p, p+maplen);
1274 			if (IS_ERR(newmap)) {
1275 				err = PTR_ERR(newmap);
1276 				goto bad;
1277 			}
1278 			BUG_ON(!newmap);
1279 			oldmap = osdc->osdmap;
1280 			osdc->osdmap = newmap;
1281 			if (oldmap)
1282 				ceph_osdmap_destroy(oldmap);
1283 			kick_requests(osdc);
1284 		}
1285 		p += maplen;
1286 		nr_maps--;
1287 	}
1288 
1289 done:
1290 	downgrade_write(&osdc->map_sem);
1291 	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1292 	send_queued(osdc);
1293 	up_read(&osdc->map_sem);
1294 	wake_up_all(&osdc->client->auth_wq);
1295 	return;
1296 
1297 bad:
1298 	pr_err("osdc handle_map corrupt msg\n");
1299 	ceph_msg_dump(msg);
1300 	up_write(&osdc->map_sem);
1301 	return;
1302 }
1303 
1304 /*
1305  * Register request, send initial attempt.
1306  */
1307 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1308 			    struct ceph_osd_request *req,
1309 			    bool nofail)
1310 {
1311 	int rc = 0;
1312 
1313 	req->r_request->pages = req->r_pages;
1314 	req->r_request->nr_pages = req->r_num_pages;
1315 #ifdef CONFIG_BLOCK
1316 	req->r_request->bio = req->r_bio;
1317 #endif
1318 	req->r_request->trail = req->r_trail;
1319 
1320 	register_request(osdc, req);
1321 
1322 	down_read(&osdc->map_sem);
1323 	mutex_lock(&osdc->request_mutex);
1324 	/*
1325 	 * a racing kick_requests() may have sent the message for us
1326 	 * while we dropped request_mutex above, so only send now if
1327 	 * the request still han't been touched yet.
1328 	 */
1329 	if (req->r_sent == 0) {
1330 		rc = __map_request(osdc, req);
1331 		if (rc < 0)
1332 			return rc;
1333 		if (req->r_osd == NULL) {
1334 			dout("send_request %p no up osds in pg\n", req);
1335 			ceph_monc_request_next_osdmap(&osdc->client->monc);
1336 		} else {
1337 			rc = __send_request(osdc, req);
1338 			if (rc) {
1339 				if (nofail) {
1340 					dout("osdc_start_request failed send, "
1341 					     " will retry %lld\n", req->r_tid);
1342 					rc = 0;
1343 				} else {
1344 					__unregister_request(osdc, req);
1345 				}
1346 			}
1347 		}
1348 	}
1349 	mutex_unlock(&osdc->request_mutex);
1350 	up_read(&osdc->map_sem);
1351 	return rc;
1352 }
1353 EXPORT_SYMBOL(ceph_osdc_start_request);
1354 
1355 /*
1356  * wait for a request to complete
1357  */
1358 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1359 			   struct ceph_osd_request *req)
1360 {
1361 	int rc;
1362 
1363 	rc = wait_for_completion_interruptible(&req->r_completion);
1364 	if (rc < 0) {
1365 		mutex_lock(&osdc->request_mutex);
1366 		__cancel_request(req);
1367 		__unregister_request(osdc, req);
1368 		mutex_unlock(&osdc->request_mutex);
1369 		dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1370 		return rc;
1371 	}
1372 
1373 	dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1374 	return req->r_result;
1375 }
1376 EXPORT_SYMBOL(ceph_osdc_wait_request);
1377 
1378 /*
1379  * sync - wait for all in-flight requests to flush.  avoid starvation.
1380  */
1381 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1382 {
1383 	struct ceph_osd_request *req;
1384 	u64 last_tid, next_tid = 0;
1385 
1386 	mutex_lock(&osdc->request_mutex);
1387 	last_tid = osdc->last_tid;
1388 	while (1) {
1389 		req = __lookup_request_ge(osdc, next_tid);
1390 		if (!req)
1391 			break;
1392 		if (req->r_tid > last_tid)
1393 			break;
1394 
1395 		next_tid = req->r_tid + 1;
1396 		if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1397 			continue;
1398 
1399 		ceph_osdc_get_request(req);
1400 		mutex_unlock(&osdc->request_mutex);
1401 		dout("sync waiting on tid %llu (last is %llu)\n",
1402 		     req->r_tid, last_tid);
1403 		wait_for_completion(&req->r_safe_completion);
1404 		mutex_lock(&osdc->request_mutex);
1405 		ceph_osdc_put_request(req);
1406 	}
1407 	mutex_unlock(&osdc->request_mutex);
1408 	dout("sync done (thru tid %llu)\n", last_tid);
1409 }
1410 EXPORT_SYMBOL(ceph_osdc_sync);
1411 
1412 /*
1413  * init, shutdown
1414  */
1415 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1416 {
1417 	int err;
1418 
1419 	dout("init\n");
1420 	osdc->client = client;
1421 	osdc->osdmap = NULL;
1422 	init_rwsem(&osdc->map_sem);
1423 	init_completion(&osdc->map_waiters);
1424 	osdc->last_requested_map = 0;
1425 	mutex_init(&osdc->request_mutex);
1426 	osdc->last_tid = 0;
1427 	osdc->osds = RB_ROOT;
1428 	INIT_LIST_HEAD(&osdc->osd_lru);
1429 	osdc->requests = RB_ROOT;
1430 	INIT_LIST_HEAD(&osdc->req_lru);
1431 	INIT_LIST_HEAD(&osdc->req_unsent);
1432 	INIT_LIST_HEAD(&osdc->req_notarget);
1433 	osdc->num_requests = 0;
1434 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1435 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1436 
1437 	schedule_delayed_work(&osdc->osds_timeout_work,
1438 	   round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1439 
1440 	err = -ENOMEM;
1441 	osdc->req_mempool = mempool_create_kmalloc_pool(10,
1442 					sizeof(struct ceph_osd_request));
1443 	if (!osdc->req_mempool)
1444 		goto out;
1445 
1446 	err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1447 				"osd_op");
1448 	if (err < 0)
1449 		goto out_mempool;
1450 	err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1451 				OSD_OPREPLY_FRONT_LEN, 10, true,
1452 				"osd_op_reply");
1453 	if (err < 0)
1454 		goto out_msgpool;
1455 	return 0;
1456 
1457 out_msgpool:
1458 	ceph_msgpool_destroy(&osdc->msgpool_op);
1459 out_mempool:
1460 	mempool_destroy(osdc->req_mempool);
1461 out:
1462 	return err;
1463 }
1464 EXPORT_SYMBOL(ceph_osdc_init);
1465 
1466 void ceph_osdc_stop(struct ceph_osd_client *osdc)
1467 {
1468 	cancel_delayed_work_sync(&osdc->timeout_work);
1469 	cancel_delayed_work_sync(&osdc->osds_timeout_work);
1470 	if (osdc->osdmap) {
1471 		ceph_osdmap_destroy(osdc->osdmap);
1472 		osdc->osdmap = NULL;
1473 	}
1474 	remove_old_osds(osdc, 1);
1475 	mempool_destroy(osdc->req_mempool);
1476 	ceph_msgpool_destroy(&osdc->msgpool_op);
1477 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1478 }
1479 EXPORT_SYMBOL(ceph_osdc_stop);
1480 
1481 /*
1482  * Read some contiguous pages.  If we cross a stripe boundary, shorten
1483  * *plen.  Return number of bytes read, or error.
1484  */
1485 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1486 			struct ceph_vino vino, struct ceph_file_layout *layout,
1487 			u64 off, u64 *plen,
1488 			u32 truncate_seq, u64 truncate_size,
1489 			struct page **pages, int num_pages, int page_align)
1490 {
1491 	struct ceph_osd_request *req;
1492 	int rc = 0;
1493 
1494 	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1495 	     vino.snap, off, *plen);
1496 	req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1497 				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1498 				    NULL, 0, truncate_seq, truncate_size, NULL,
1499 				    false, 1, page_align);
1500 	if (!req)
1501 		return -ENOMEM;
1502 
1503 	/* it may be a short read due to an object boundary */
1504 	req->r_pages = pages;
1505 
1506 	dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
1507 	     off, *plen, req->r_num_pages, page_align);
1508 
1509 	rc = ceph_osdc_start_request(osdc, req, false);
1510 	if (!rc)
1511 		rc = ceph_osdc_wait_request(osdc, req);
1512 
1513 	ceph_osdc_put_request(req);
1514 	dout("readpages result %d\n", rc);
1515 	return rc;
1516 }
1517 EXPORT_SYMBOL(ceph_osdc_readpages);
1518 
1519 /*
1520  * do a synchronous write on N pages
1521  */
1522 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1523 			 struct ceph_file_layout *layout,
1524 			 struct ceph_snap_context *snapc,
1525 			 u64 off, u64 len,
1526 			 u32 truncate_seq, u64 truncate_size,
1527 			 struct timespec *mtime,
1528 			 struct page **pages, int num_pages,
1529 			 int flags, int do_sync, bool nofail)
1530 {
1531 	struct ceph_osd_request *req;
1532 	int rc = 0;
1533 	int page_align = off & ~PAGE_MASK;
1534 
1535 	BUG_ON(vino.snap != CEPH_NOSNAP);
1536 	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1537 				    CEPH_OSD_OP_WRITE,
1538 				    flags | CEPH_OSD_FLAG_ONDISK |
1539 					    CEPH_OSD_FLAG_WRITE,
1540 				    snapc, do_sync,
1541 				    truncate_seq, truncate_size, mtime,
1542 				    nofail, 1, page_align);
1543 	if (!req)
1544 		return -ENOMEM;
1545 
1546 	/* it may be a short write due to an object boundary */
1547 	req->r_pages = pages;
1548 	dout("writepages %llu~%llu (%d pages)\n", off, len,
1549 	     req->r_num_pages);
1550 
1551 	rc = ceph_osdc_start_request(osdc, req, nofail);
1552 	if (!rc)
1553 		rc = ceph_osdc_wait_request(osdc, req);
1554 
1555 	ceph_osdc_put_request(req);
1556 	if (rc == 0)
1557 		rc = len;
1558 	dout("writepages result %d\n", rc);
1559 	return rc;
1560 }
1561 EXPORT_SYMBOL(ceph_osdc_writepages);
1562 
1563 /*
1564  * handle incoming message
1565  */
1566 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1567 {
1568 	struct ceph_osd *osd = con->private;
1569 	struct ceph_osd_client *osdc;
1570 	int type = le16_to_cpu(msg->hdr.type);
1571 
1572 	if (!osd)
1573 		goto out;
1574 	osdc = osd->o_osdc;
1575 
1576 	switch (type) {
1577 	case CEPH_MSG_OSD_MAP:
1578 		ceph_osdc_handle_map(osdc, msg);
1579 		break;
1580 	case CEPH_MSG_OSD_OPREPLY:
1581 		handle_reply(osdc, msg, con);
1582 		break;
1583 
1584 	default:
1585 		pr_err("received unknown message type %d %s\n", type,
1586 		       ceph_msg_type_name(type));
1587 	}
1588 out:
1589 	ceph_msg_put(msg);
1590 }
1591 
1592 /*
1593  * lookup and return message for incoming reply.  set up reply message
1594  * pages.
1595  */
1596 static struct ceph_msg *get_reply(struct ceph_connection *con,
1597 				  struct ceph_msg_header *hdr,
1598 				  int *skip)
1599 {
1600 	struct ceph_osd *osd = con->private;
1601 	struct ceph_osd_client *osdc = osd->o_osdc;
1602 	struct ceph_msg *m;
1603 	struct ceph_osd_request *req;
1604 	int front = le32_to_cpu(hdr->front_len);
1605 	int data_len = le32_to_cpu(hdr->data_len);
1606 	u64 tid;
1607 
1608 	tid = le64_to_cpu(hdr->tid);
1609 	mutex_lock(&osdc->request_mutex);
1610 	req = __lookup_request(osdc, tid);
1611 	if (!req) {
1612 		*skip = 1;
1613 		m = NULL;
1614 		pr_info("get_reply unknown tid %llu from osd%d\n", tid,
1615 			osd->o_osd);
1616 		goto out;
1617 	}
1618 
1619 	if (req->r_con_filling_msg) {
1620 		dout("get_reply revoking msg %p from old con %p\n",
1621 		     req->r_reply, req->r_con_filling_msg);
1622 		ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1623 		ceph_con_put(req->r_con_filling_msg);
1624 		req->r_con_filling_msg = NULL;
1625 	}
1626 
1627 	if (front > req->r_reply->front.iov_len) {
1628 		pr_warning("get_reply front %d > preallocated %d\n",
1629 			   front, (int)req->r_reply->front.iov_len);
1630 		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
1631 		if (!m)
1632 			goto out;
1633 		ceph_msg_put(req->r_reply);
1634 		req->r_reply = m;
1635 	}
1636 	m = ceph_msg_get(req->r_reply);
1637 
1638 	if (data_len > 0) {
1639 		int want = calc_pages_for(req->r_page_alignment, data_len);
1640 
1641 		if (unlikely(req->r_num_pages < want)) {
1642 			pr_warning("tid %lld reply %d > expected %d pages\n",
1643 				   tid, want, m->nr_pages);
1644 			*skip = 1;
1645 			ceph_msg_put(m);
1646 			m = NULL;
1647 			goto out;
1648 		}
1649 		m->pages = req->r_pages;
1650 		m->nr_pages = req->r_num_pages;
1651 		m->page_alignment = req->r_page_alignment;
1652 #ifdef CONFIG_BLOCK
1653 		m->bio = req->r_bio;
1654 #endif
1655 	}
1656 	*skip = 0;
1657 	req->r_con_filling_msg = ceph_con_get(con);
1658 	dout("get_reply tid %lld %p\n", tid, m);
1659 
1660 out:
1661 	mutex_unlock(&osdc->request_mutex);
1662 	return m;
1663 
1664 }
1665 
1666 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1667 				  struct ceph_msg_header *hdr,
1668 				  int *skip)
1669 {
1670 	struct ceph_osd *osd = con->private;
1671 	int type = le16_to_cpu(hdr->type);
1672 	int front = le32_to_cpu(hdr->front_len);
1673 
1674 	switch (type) {
1675 	case CEPH_MSG_OSD_MAP:
1676 		return ceph_msg_new(type, front, GFP_NOFS);
1677 	case CEPH_MSG_OSD_OPREPLY:
1678 		return get_reply(con, hdr, skip);
1679 	default:
1680 		pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1681 			osd->o_osd);
1682 		*skip = 1;
1683 		return NULL;
1684 	}
1685 }
1686 
1687 /*
1688  * Wrappers to refcount containing ceph_osd struct
1689  */
1690 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1691 {
1692 	struct ceph_osd *osd = con->private;
1693 	if (get_osd(osd))
1694 		return con;
1695 	return NULL;
1696 }
1697 
1698 static void put_osd_con(struct ceph_connection *con)
1699 {
1700 	struct ceph_osd *osd = con->private;
1701 	put_osd(osd);
1702 }
1703 
1704 /*
1705  * authentication
1706  */
1707 static int get_authorizer(struct ceph_connection *con,
1708 			  void **buf, int *len, int *proto,
1709 			  void **reply_buf, int *reply_len, int force_new)
1710 {
1711 	struct ceph_osd *o = con->private;
1712 	struct ceph_osd_client *osdc = o->o_osdc;
1713 	struct ceph_auth_client *ac = osdc->client->monc.auth;
1714 	int ret = 0;
1715 
1716 	if (force_new && o->o_authorizer) {
1717 		ac->ops->destroy_authorizer(ac, o->o_authorizer);
1718 		o->o_authorizer = NULL;
1719 	}
1720 	if (o->o_authorizer == NULL) {
1721 		ret = ac->ops->create_authorizer(
1722 			ac, CEPH_ENTITY_TYPE_OSD,
1723 			&o->o_authorizer,
1724 			&o->o_authorizer_buf,
1725 			&o->o_authorizer_buf_len,
1726 			&o->o_authorizer_reply_buf,
1727 			&o->o_authorizer_reply_buf_len);
1728 		if (ret)
1729 			return ret;
1730 	}
1731 
1732 	*proto = ac->protocol;
1733 	*buf = o->o_authorizer_buf;
1734 	*len = o->o_authorizer_buf_len;
1735 	*reply_buf = o->o_authorizer_reply_buf;
1736 	*reply_len = o->o_authorizer_reply_buf_len;
1737 	return 0;
1738 }
1739 
1740 
1741 static int verify_authorizer_reply(struct ceph_connection *con, int len)
1742 {
1743 	struct ceph_osd *o = con->private;
1744 	struct ceph_osd_client *osdc = o->o_osdc;
1745 	struct ceph_auth_client *ac = osdc->client->monc.auth;
1746 
1747 	return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1748 }
1749 
1750 static int invalidate_authorizer(struct ceph_connection *con)
1751 {
1752 	struct ceph_osd *o = con->private;
1753 	struct ceph_osd_client *osdc = o->o_osdc;
1754 	struct ceph_auth_client *ac = osdc->client->monc.auth;
1755 
1756 	if (ac->ops->invalidate_authorizer)
1757 		ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1758 
1759 	return ceph_monc_validate_auth(&osdc->client->monc);
1760 }
1761 
1762 static const struct ceph_connection_operations osd_con_ops = {
1763 	.get = get_osd_con,
1764 	.put = put_osd_con,
1765 	.dispatch = dispatch,
1766 	.get_authorizer = get_authorizer,
1767 	.verify_authorizer_reply = verify_authorizer_reply,
1768 	.invalidate_authorizer = invalidate_authorizer,
1769 	.alloc_msg = alloc_msg,
1770 	.fault = osd_reset,
1771 };
1772