xref: /openbmc/linux/net/ceph/osd_client.c (revision 4800cd83)
1 #include <linux/ceph/ceph_debug.h>
2 
3 #include <linux/module.h>
4 #include <linux/err.h>
5 #include <linux/highmem.h>
6 #include <linux/mm.h>
7 #include <linux/pagemap.h>
8 #include <linux/slab.h>
9 #include <linux/uaccess.h>
10 #ifdef CONFIG_BLOCK
11 #include <linux/bio.h>
12 #endif
13 
14 #include <linux/ceph/libceph.h>
15 #include <linux/ceph/osd_client.h>
16 #include <linux/ceph/messenger.h>
17 #include <linux/ceph/decode.h>
18 #include <linux/ceph/auth.h>
19 #include <linux/ceph/pagelist.h>
20 
21 #define OSD_OP_FRONT_LEN	4096
22 #define OSD_OPREPLY_FRONT_LEN	512
23 
24 static const struct ceph_connection_operations osd_con_ops;
25 static int __kick_requests(struct ceph_osd_client *osdc,
26 			  struct ceph_osd *kickosd);
27 
28 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
29 
30 static int op_needs_trail(int op)
31 {
32 	switch (op) {
33 	case CEPH_OSD_OP_GETXATTR:
34 	case CEPH_OSD_OP_SETXATTR:
35 	case CEPH_OSD_OP_CMPXATTR:
36 	case CEPH_OSD_OP_CALL:
37 		return 1;
38 	default:
39 		return 0;
40 	}
41 }
42 
43 static int op_has_extent(int op)
44 {
45 	return (op == CEPH_OSD_OP_READ ||
46 		op == CEPH_OSD_OP_WRITE);
47 }
48 
49 void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
50 			struct ceph_file_layout *layout,
51 			u64 snapid,
52 			u64 off, u64 *plen, u64 *bno,
53 			struct ceph_osd_request *req,
54 			struct ceph_osd_req_op *op)
55 {
56 	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
57 	u64 orig_len = *plen;
58 	u64 objoff, objlen;    /* extent in object */
59 
60 	reqhead->snapid = cpu_to_le64(snapid);
61 
62 	/* object extent? */
63 	ceph_calc_file_object_mapping(layout, off, plen, bno,
64 				      &objoff, &objlen);
65 	if (*plen < orig_len)
66 		dout(" skipping last %llu, final file extent %llu~%llu\n",
67 		     orig_len - *plen, off, *plen);
68 
69 	if (op_has_extent(op->op)) {
70 		op->extent.offset = objoff;
71 		op->extent.length = objlen;
72 	}
73 	req->r_num_pages = calc_pages_for(off, *plen);
74 	req->r_page_alignment = off & ~PAGE_MASK;
75 	if (op->op == CEPH_OSD_OP_WRITE)
76 		op->payload_len = *plen;
77 
78 	dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
79 	     *bno, objoff, objlen, req->r_num_pages);
80 
81 }
82 EXPORT_SYMBOL(ceph_calc_raw_layout);
83 
84 /*
85  * Implement client access to distributed object storage cluster.
86  *
87  * All data objects are stored within a cluster/cloud of OSDs, or
88  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
89  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
90  * remote daemons serving up and coordinating consistent and safe
91  * access to storage.
92  *
93  * Cluster membership and the mapping of data objects onto storage devices
94  * are described by the osd map.
95  *
96  * We keep track of pending OSD requests (read, write), resubmit
97  * requests to different OSDs when the cluster topology/data layout
98  * change, or retry the affected requests when the communications
99  * channel with an OSD is reset.
100  */
101 
102 /*
103  * calculate the mapping of a file extent onto an object, and fill out the
104  * request accordingly.  shorten extent as necessary if it crosses an
105  * object boundary.
106  *
107  * fill osd op in request message.
108  */
109 static void calc_layout(struct ceph_osd_client *osdc,
110 			struct ceph_vino vino,
111 			struct ceph_file_layout *layout,
112 			u64 off, u64 *plen,
113 			struct ceph_osd_request *req,
114 			struct ceph_osd_req_op *op)
115 {
116 	u64 bno;
117 
118 	ceph_calc_raw_layout(osdc, layout, vino.snap, off,
119 			     plen, &bno, req, op);
120 
121 	sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
122 	req->r_oid_len = strlen(req->r_oid);
123 }
124 
125 /*
126  * requests
127  */
128 void ceph_osdc_release_request(struct kref *kref)
129 {
130 	struct ceph_osd_request *req = container_of(kref,
131 						    struct ceph_osd_request,
132 						    r_kref);
133 
134 	if (req->r_request)
135 		ceph_msg_put(req->r_request);
136 	if (req->r_reply)
137 		ceph_msg_put(req->r_reply);
138 	if (req->r_con_filling_msg) {
139 		dout("release_request revoking pages %p from con %p\n",
140 		     req->r_pages, req->r_con_filling_msg);
141 		ceph_con_revoke_message(req->r_con_filling_msg,
142 				      req->r_reply);
143 		ceph_con_put(req->r_con_filling_msg);
144 	}
145 	if (req->r_own_pages)
146 		ceph_release_page_vector(req->r_pages,
147 					 req->r_num_pages);
148 #ifdef CONFIG_BLOCK
149 	if (req->r_bio)
150 		bio_put(req->r_bio);
151 #endif
152 	ceph_put_snap_context(req->r_snapc);
153 	if (req->r_trail) {
154 		ceph_pagelist_release(req->r_trail);
155 		kfree(req->r_trail);
156 	}
157 	if (req->r_mempool)
158 		mempool_free(req, req->r_osdc->req_mempool);
159 	else
160 		kfree(req);
161 }
162 EXPORT_SYMBOL(ceph_osdc_release_request);
163 
164 static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
165 {
166 	int i = 0;
167 
168 	if (needs_trail)
169 		*needs_trail = 0;
170 	while (ops[i].op) {
171 		if (needs_trail && op_needs_trail(ops[i].op))
172 			*needs_trail = 1;
173 		i++;
174 	}
175 
176 	return i;
177 }
178 
179 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
180 					       int flags,
181 					       struct ceph_snap_context *snapc,
182 					       struct ceph_osd_req_op *ops,
183 					       bool use_mempool,
184 					       gfp_t gfp_flags,
185 					       struct page **pages,
186 					       struct bio *bio)
187 {
188 	struct ceph_osd_request *req;
189 	struct ceph_msg *msg;
190 	int needs_trail;
191 	int num_op = get_num_ops(ops, &needs_trail);
192 	size_t msg_size = sizeof(struct ceph_osd_request_head);
193 
194 	msg_size += num_op*sizeof(struct ceph_osd_op);
195 
196 	if (use_mempool) {
197 		req = mempool_alloc(osdc->req_mempool, gfp_flags);
198 		memset(req, 0, sizeof(*req));
199 	} else {
200 		req = kzalloc(sizeof(*req), gfp_flags);
201 	}
202 	if (req == NULL)
203 		return NULL;
204 
205 	req->r_osdc = osdc;
206 	req->r_mempool = use_mempool;
207 
208 	kref_init(&req->r_kref);
209 	init_completion(&req->r_completion);
210 	init_completion(&req->r_safe_completion);
211 	INIT_LIST_HEAD(&req->r_unsafe_item);
212 	req->r_flags = flags;
213 
214 	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
215 
216 	/* create reply message */
217 	if (use_mempool)
218 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
219 	else
220 		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
221 				   OSD_OPREPLY_FRONT_LEN, gfp_flags);
222 	if (!msg) {
223 		ceph_osdc_put_request(req);
224 		return NULL;
225 	}
226 	req->r_reply = msg;
227 
228 	/* allocate space for the trailing data */
229 	if (needs_trail) {
230 		req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
231 		if (!req->r_trail) {
232 			ceph_osdc_put_request(req);
233 			return NULL;
234 		}
235 		ceph_pagelist_init(req->r_trail);
236 	}
237 	/* create request message; allow space for oid */
238 	msg_size += 40;
239 	if (snapc)
240 		msg_size += sizeof(u64) * snapc->num_snaps;
241 	if (use_mempool)
242 		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
243 	else
244 		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
245 	if (!msg) {
246 		ceph_osdc_put_request(req);
247 		return NULL;
248 	}
249 
250 	msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
251 	memset(msg->front.iov_base, 0, msg->front.iov_len);
252 
253 	req->r_request = msg;
254 	req->r_pages = pages;
255 #ifdef CONFIG_BLOCK
256 	if (bio) {
257 		req->r_bio = bio;
258 		bio_get(req->r_bio);
259 	}
260 #endif
261 
262 	return req;
263 }
264 EXPORT_SYMBOL(ceph_osdc_alloc_request);
265 
266 static void osd_req_encode_op(struct ceph_osd_request *req,
267 			      struct ceph_osd_op *dst,
268 			      struct ceph_osd_req_op *src)
269 {
270 	dst->op = cpu_to_le16(src->op);
271 
272 	switch (dst->op) {
273 	case CEPH_OSD_OP_READ:
274 	case CEPH_OSD_OP_WRITE:
275 		dst->extent.offset =
276 			cpu_to_le64(src->extent.offset);
277 		dst->extent.length =
278 			cpu_to_le64(src->extent.length);
279 		dst->extent.truncate_size =
280 			cpu_to_le64(src->extent.truncate_size);
281 		dst->extent.truncate_seq =
282 			cpu_to_le32(src->extent.truncate_seq);
283 		break;
284 
285 	case CEPH_OSD_OP_GETXATTR:
286 	case CEPH_OSD_OP_SETXATTR:
287 	case CEPH_OSD_OP_CMPXATTR:
288 		BUG_ON(!req->r_trail);
289 
290 		dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
291 		dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
292 		dst->xattr.cmp_op = src->xattr.cmp_op;
293 		dst->xattr.cmp_mode = src->xattr.cmp_mode;
294 		ceph_pagelist_append(req->r_trail, src->xattr.name,
295 				     src->xattr.name_len);
296 		ceph_pagelist_append(req->r_trail, src->xattr.val,
297 				     src->xattr.value_len);
298 		break;
299 	case CEPH_OSD_OP_CALL:
300 		BUG_ON(!req->r_trail);
301 
302 		dst->cls.class_len = src->cls.class_len;
303 		dst->cls.method_len = src->cls.method_len;
304 		dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
305 
306 		ceph_pagelist_append(req->r_trail, src->cls.class_name,
307 				     src->cls.class_len);
308 		ceph_pagelist_append(req->r_trail, src->cls.method_name,
309 				     src->cls.method_len);
310 		ceph_pagelist_append(req->r_trail, src->cls.indata,
311 				     src->cls.indata_len);
312 		break;
313 	case CEPH_OSD_OP_ROLLBACK:
314 		dst->snap.snapid = cpu_to_le64(src->snap.snapid);
315 		break;
316 	case CEPH_OSD_OP_STARTSYNC:
317 		break;
318 	default:
319 		pr_err("unrecognized osd opcode %d\n", dst->op);
320 		WARN_ON(1);
321 		break;
322 	}
323 	dst->payload_len = cpu_to_le32(src->payload_len);
324 }
325 
326 /*
327  * build new request AND message
328  *
329  */
330 void ceph_osdc_build_request(struct ceph_osd_request *req,
331 			     u64 off, u64 *plen,
332 			     struct ceph_osd_req_op *src_ops,
333 			     struct ceph_snap_context *snapc,
334 			     struct timespec *mtime,
335 			     const char *oid,
336 			     int oid_len)
337 {
338 	struct ceph_msg *msg = req->r_request;
339 	struct ceph_osd_request_head *head;
340 	struct ceph_osd_req_op *src_op;
341 	struct ceph_osd_op *op;
342 	void *p;
343 	int num_op = get_num_ops(src_ops, NULL);
344 	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
345 	int flags = req->r_flags;
346 	u64 data_len = 0;
347 	int i;
348 
349 	head = msg->front.iov_base;
350 	op = (void *)(head + 1);
351 	p = (void *)(op + num_op);
352 
353 	req->r_snapc = ceph_get_snap_context(snapc);
354 
355 	head->client_inc = cpu_to_le32(1); /* always, for now. */
356 	head->flags = cpu_to_le32(flags);
357 	if (flags & CEPH_OSD_FLAG_WRITE)
358 		ceph_encode_timespec(&head->mtime, mtime);
359 	head->num_ops = cpu_to_le16(num_op);
360 
361 
362 	/* fill in oid */
363 	head->object_len = cpu_to_le32(oid_len);
364 	memcpy(p, oid, oid_len);
365 	p += oid_len;
366 
367 	src_op = src_ops;
368 	while (src_op->op) {
369 		osd_req_encode_op(req, op, src_op);
370 		src_op++;
371 		op++;
372 	}
373 
374 	if (req->r_trail)
375 		data_len += req->r_trail->length;
376 
377 	if (snapc) {
378 		head->snap_seq = cpu_to_le64(snapc->seq);
379 		head->num_snaps = cpu_to_le32(snapc->num_snaps);
380 		for (i = 0; i < snapc->num_snaps; i++) {
381 			put_unaligned_le64(snapc->snaps[i], p);
382 			p += sizeof(u64);
383 		}
384 	}
385 
386 	if (flags & CEPH_OSD_FLAG_WRITE) {
387 		req->r_request->hdr.data_off = cpu_to_le16(off);
388 		req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
389 	} else if (data_len) {
390 		req->r_request->hdr.data_off = 0;
391 		req->r_request->hdr.data_len = cpu_to_le32(data_len);
392 	}
393 
394 	req->r_request->page_alignment = req->r_page_alignment;
395 
396 	BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
397 	msg_size = p - msg->front.iov_base;
398 	msg->front.iov_len = msg_size;
399 	msg->hdr.front_len = cpu_to_le32(msg_size);
400 	return;
401 }
402 EXPORT_SYMBOL(ceph_osdc_build_request);
403 
404 /*
405  * build new request AND message, calculate layout, and adjust file
406  * extent as needed.
407  *
408  * if the file was recently truncated, we include information about its
409  * old and new size so that the object can be updated appropriately.  (we
410  * avoid synchronously deleting truncated objects because it's slow.)
411  *
412  * if @do_sync, include a 'startsync' command so that the osd will flush
413  * data quickly.
414  */
415 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
416 					       struct ceph_file_layout *layout,
417 					       struct ceph_vino vino,
418 					       u64 off, u64 *plen,
419 					       int opcode, int flags,
420 					       struct ceph_snap_context *snapc,
421 					       int do_sync,
422 					       u32 truncate_seq,
423 					       u64 truncate_size,
424 					       struct timespec *mtime,
425 					       bool use_mempool, int num_reply,
426 					       int page_align)
427 {
428 	struct ceph_osd_req_op ops[3];
429 	struct ceph_osd_request *req;
430 
431 	ops[0].op = opcode;
432 	ops[0].extent.truncate_seq = truncate_seq;
433 	ops[0].extent.truncate_size = truncate_size;
434 	ops[0].payload_len = 0;
435 
436 	if (do_sync) {
437 		ops[1].op = CEPH_OSD_OP_STARTSYNC;
438 		ops[1].payload_len = 0;
439 		ops[2].op = 0;
440 	} else
441 		ops[1].op = 0;
442 
443 	req = ceph_osdc_alloc_request(osdc, flags,
444 					 snapc, ops,
445 					 use_mempool,
446 					 GFP_NOFS, NULL, NULL);
447 	if (IS_ERR(req))
448 		return req;
449 
450 	/* calculate max write size */
451 	calc_layout(osdc, vino, layout, off, plen, req, ops);
452 	req->r_file_layout = *layout;  /* keep a copy */
453 
454 	/* in case it differs from natural alignment that calc_layout
455 	   filled in for us */
456 	req->r_page_alignment = page_align;
457 
458 	ceph_osdc_build_request(req, off, plen, ops,
459 				snapc,
460 				mtime,
461 				req->r_oid, req->r_oid_len);
462 
463 	return req;
464 }
465 EXPORT_SYMBOL(ceph_osdc_new_request);
466 
467 /*
468  * We keep osd requests in an rbtree, sorted by ->r_tid.
469  */
470 static void __insert_request(struct ceph_osd_client *osdc,
471 			     struct ceph_osd_request *new)
472 {
473 	struct rb_node **p = &osdc->requests.rb_node;
474 	struct rb_node *parent = NULL;
475 	struct ceph_osd_request *req = NULL;
476 
477 	while (*p) {
478 		parent = *p;
479 		req = rb_entry(parent, struct ceph_osd_request, r_node);
480 		if (new->r_tid < req->r_tid)
481 			p = &(*p)->rb_left;
482 		else if (new->r_tid > req->r_tid)
483 			p = &(*p)->rb_right;
484 		else
485 			BUG();
486 	}
487 
488 	rb_link_node(&new->r_node, parent, p);
489 	rb_insert_color(&new->r_node, &osdc->requests);
490 }
491 
492 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
493 						 u64 tid)
494 {
495 	struct ceph_osd_request *req;
496 	struct rb_node *n = osdc->requests.rb_node;
497 
498 	while (n) {
499 		req = rb_entry(n, struct ceph_osd_request, r_node);
500 		if (tid < req->r_tid)
501 			n = n->rb_left;
502 		else if (tid > req->r_tid)
503 			n = n->rb_right;
504 		else
505 			return req;
506 	}
507 	return NULL;
508 }
509 
510 static struct ceph_osd_request *
511 __lookup_request_ge(struct ceph_osd_client *osdc,
512 		    u64 tid)
513 {
514 	struct ceph_osd_request *req;
515 	struct rb_node *n = osdc->requests.rb_node;
516 
517 	while (n) {
518 		req = rb_entry(n, struct ceph_osd_request, r_node);
519 		if (tid < req->r_tid) {
520 			if (!n->rb_left)
521 				return req;
522 			n = n->rb_left;
523 		} else if (tid > req->r_tid) {
524 			n = n->rb_right;
525 		} else {
526 			return req;
527 		}
528 	}
529 	return NULL;
530 }
531 
532 
533 /*
534  * If the osd connection drops, we need to resubmit all requests.
535  */
536 static void osd_reset(struct ceph_connection *con)
537 {
538 	struct ceph_osd *osd = con->private;
539 	struct ceph_osd_client *osdc;
540 
541 	if (!osd)
542 		return;
543 	dout("osd_reset osd%d\n", osd->o_osd);
544 	osdc = osd->o_osdc;
545 	down_read(&osdc->map_sem);
546 	kick_requests(osdc, osd);
547 	up_read(&osdc->map_sem);
548 }
549 
550 /*
551  * Track open sessions with osds.
552  */
553 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
554 {
555 	struct ceph_osd *osd;
556 
557 	osd = kzalloc(sizeof(*osd), GFP_NOFS);
558 	if (!osd)
559 		return NULL;
560 
561 	atomic_set(&osd->o_ref, 1);
562 	osd->o_osdc = osdc;
563 	INIT_LIST_HEAD(&osd->o_requests);
564 	INIT_LIST_HEAD(&osd->o_osd_lru);
565 	osd->o_incarnation = 1;
566 
567 	ceph_con_init(osdc->client->msgr, &osd->o_con);
568 	osd->o_con.private = osd;
569 	osd->o_con.ops = &osd_con_ops;
570 	osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
571 
572 	INIT_LIST_HEAD(&osd->o_keepalive_item);
573 	return osd;
574 }
575 
576 static struct ceph_osd *get_osd(struct ceph_osd *osd)
577 {
578 	if (atomic_inc_not_zero(&osd->o_ref)) {
579 		dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
580 		     atomic_read(&osd->o_ref));
581 		return osd;
582 	} else {
583 		dout("get_osd %p FAIL\n", osd);
584 		return NULL;
585 	}
586 }
587 
588 static void put_osd(struct ceph_osd *osd)
589 {
590 	dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
591 	     atomic_read(&osd->o_ref) - 1);
592 	if (atomic_dec_and_test(&osd->o_ref)) {
593 		struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
594 
595 		if (osd->o_authorizer)
596 			ac->ops->destroy_authorizer(ac, osd->o_authorizer);
597 		kfree(osd);
598 	}
599 }
600 
601 /*
602  * remove an osd from our map
603  */
604 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
605 {
606 	dout("__remove_osd %p\n", osd);
607 	BUG_ON(!list_empty(&osd->o_requests));
608 	rb_erase(&osd->o_node, &osdc->osds);
609 	list_del_init(&osd->o_osd_lru);
610 	ceph_con_close(&osd->o_con);
611 	put_osd(osd);
612 }
613 
614 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
615 			      struct ceph_osd *osd)
616 {
617 	dout("__move_osd_to_lru %p\n", osd);
618 	BUG_ON(!list_empty(&osd->o_osd_lru));
619 	list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
620 	osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
621 }
622 
623 static void __remove_osd_from_lru(struct ceph_osd *osd)
624 {
625 	dout("__remove_osd_from_lru %p\n", osd);
626 	if (!list_empty(&osd->o_osd_lru))
627 		list_del_init(&osd->o_osd_lru);
628 }
629 
630 static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
631 {
632 	struct ceph_osd *osd, *nosd;
633 
634 	dout("__remove_old_osds %p\n", osdc);
635 	mutex_lock(&osdc->request_mutex);
636 	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
637 		if (!remove_all && time_before(jiffies, osd->lru_ttl))
638 			break;
639 		__remove_osd(osdc, osd);
640 	}
641 	mutex_unlock(&osdc->request_mutex);
642 }
643 
644 /*
645  * reset osd connect
646  */
647 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
648 {
649 	struct ceph_osd_request *req;
650 	int ret = 0;
651 
652 	dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
653 	if (list_empty(&osd->o_requests)) {
654 		__remove_osd(osdc, osd);
655 	} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
656 			  &osd->o_con.peer_addr,
657 			  sizeof(osd->o_con.peer_addr)) == 0 &&
658 		   !ceph_con_opened(&osd->o_con)) {
659 		dout(" osd addr hasn't changed and connection never opened,"
660 		     " letting msgr retry");
661 		/* touch each r_stamp for handle_timeout()'s benfit */
662 		list_for_each_entry(req, &osd->o_requests, r_osd_item)
663 			req->r_stamp = jiffies;
664 		ret = -EAGAIN;
665 	} else {
666 		ceph_con_close(&osd->o_con);
667 		ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
668 		osd->o_incarnation++;
669 	}
670 	return ret;
671 }
672 
673 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
674 {
675 	struct rb_node **p = &osdc->osds.rb_node;
676 	struct rb_node *parent = NULL;
677 	struct ceph_osd *osd = NULL;
678 
679 	while (*p) {
680 		parent = *p;
681 		osd = rb_entry(parent, struct ceph_osd, o_node);
682 		if (new->o_osd < osd->o_osd)
683 			p = &(*p)->rb_left;
684 		else if (new->o_osd > osd->o_osd)
685 			p = &(*p)->rb_right;
686 		else
687 			BUG();
688 	}
689 
690 	rb_link_node(&new->o_node, parent, p);
691 	rb_insert_color(&new->o_node, &osdc->osds);
692 }
693 
694 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
695 {
696 	struct ceph_osd *osd;
697 	struct rb_node *n = osdc->osds.rb_node;
698 
699 	while (n) {
700 		osd = rb_entry(n, struct ceph_osd, o_node);
701 		if (o < osd->o_osd)
702 			n = n->rb_left;
703 		else if (o > osd->o_osd)
704 			n = n->rb_right;
705 		else
706 			return osd;
707 	}
708 	return NULL;
709 }
710 
711 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
712 {
713 	schedule_delayed_work(&osdc->timeout_work,
714 			osdc->client->options->osd_keepalive_timeout * HZ);
715 }
716 
717 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
718 {
719 	cancel_delayed_work(&osdc->timeout_work);
720 }
721 
722 /*
723  * Register request, assign tid.  If this is the first request, set up
724  * the timeout event.
725  */
726 static void register_request(struct ceph_osd_client *osdc,
727 			     struct ceph_osd_request *req)
728 {
729 	mutex_lock(&osdc->request_mutex);
730 	req->r_tid = ++osdc->last_tid;
731 	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
732 	INIT_LIST_HEAD(&req->r_req_lru_item);
733 
734 	dout("register_request %p tid %lld\n", req, req->r_tid);
735 	__insert_request(osdc, req);
736 	ceph_osdc_get_request(req);
737 	osdc->num_requests++;
738 
739 	if (osdc->num_requests == 1) {
740 		dout(" first request, scheduling timeout\n");
741 		__schedule_osd_timeout(osdc);
742 	}
743 	mutex_unlock(&osdc->request_mutex);
744 }
745 
746 /*
747  * called under osdc->request_mutex
748  */
749 static void __unregister_request(struct ceph_osd_client *osdc,
750 				 struct ceph_osd_request *req)
751 {
752 	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
753 	rb_erase(&req->r_node, &osdc->requests);
754 	osdc->num_requests--;
755 
756 	if (req->r_osd) {
757 		/* make sure the original request isn't in flight. */
758 		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
759 
760 		list_del_init(&req->r_osd_item);
761 		if (list_empty(&req->r_osd->o_requests))
762 			__move_osd_to_lru(osdc, req->r_osd);
763 		req->r_osd = NULL;
764 	}
765 
766 	ceph_osdc_put_request(req);
767 
768 	list_del_init(&req->r_req_lru_item);
769 	if (osdc->num_requests == 0) {
770 		dout(" no requests, canceling timeout\n");
771 		__cancel_osd_timeout(osdc);
772 	}
773 }
774 
775 /*
776  * Cancel a previously queued request message
777  */
778 static void __cancel_request(struct ceph_osd_request *req)
779 {
780 	if (req->r_sent && req->r_osd) {
781 		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
782 		req->r_sent = 0;
783 	}
784 	list_del_init(&req->r_req_lru_item);
785 }
786 
787 /*
788  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
789  * (as needed), and set the request r_osd appropriately.  If there is
790  * no up osd, set r_osd to NULL.
791  *
792  * Return 0 if unchanged, 1 if changed, or negative on error.
793  *
794  * Caller should hold map_sem for read and request_mutex.
795  */
796 static int __map_osds(struct ceph_osd_client *osdc,
797 		      struct ceph_osd_request *req)
798 {
799 	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
800 	struct ceph_pg pgid;
801 	int acting[CEPH_PG_MAX_SIZE];
802 	int o = -1, num = 0;
803 	int err;
804 
805 	dout("map_osds %p tid %lld\n", req, req->r_tid);
806 	err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
807 				      &req->r_file_layout, osdc->osdmap);
808 	if (err)
809 		return err;
810 	pgid = reqhead->layout.ol_pgid;
811 	req->r_pgid = pgid;
812 
813 	err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
814 	if (err > 0) {
815 		o = acting[0];
816 		num = err;
817 	}
818 
819 	if ((req->r_osd && req->r_osd->o_osd == o &&
820 	     req->r_sent >= req->r_osd->o_incarnation &&
821 	     req->r_num_pg_osds == num &&
822 	     memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
823 	    (req->r_osd == NULL && o == -1))
824 		return 0;  /* no change */
825 
826 	dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
827 	     req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
828 	     req->r_osd ? req->r_osd->o_osd : -1);
829 
830 	/* record full pg acting set */
831 	memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
832 	req->r_num_pg_osds = num;
833 
834 	if (req->r_osd) {
835 		__cancel_request(req);
836 		list_del_init(&req->r_osd_item);
837 		req->r_osd = NULL;
838 	}
839 
840 	req->r_osd = __lookup_osd(osdc, o);
841 	if (!req->r_osd && o >= 0) {
842 		err = -ENOMEM;
843 		req->r_osd = create_osd(osdc);
844 		if (!req->r_osd)
845 			goto out;
846 
847 		dout("map_osds osd %p is osd%d\n", req->r_osd, o);
848 		req->r_osd->o_osd = o;
849 		req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
850 		__insert_osd(osdc, req->r_osd);
851 
852 		ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
853 	}
854 
855 	if (req->r_osd) {
856 		__remove_osd_from_lru(req->r_osd);
857 		list_add(&req->r_osd_item, &req->r_osd->o_requests);
858 	}
859 	err = 1;   /* osd or pg changed */
860 
861 out:
862 	return err;
863 }
864 
865 /*
866  * caller should hold map_sem (for read) and request_mutex
867  */
868 static int __send_request(struct ceph_osd_client *osdc,
869 			  struct ceph_osd_request *req)
870 {
871 	struct ceph_osd_request_head *reqhead;
872 	int err;
873 
874 	err = __map_osds(osdc, req);
875 	if (err < 0)
876 		return err;
877 	if (req->r_osd == NULL) {
878 		dout("send_request %p no up osds in pg\n", req);
879 		ceph_monc_request_next_osdmap(&osdc->client->monc);
880 		return 0;
881 	}
882 
883 	dout("send_request %p tid %llu to osd%d flags %d\n",
884 	     req, req->r_tid, req->r_osd->o_osd, req->r_flags);
885 
886 	reqhead = req->r_request->front.iov_base;
887 	reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
888 	reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
889 	reqhead->reassert_version = req->r_reassert_version;
890 
891 	req->r_stamp = jiffies;
892 	list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
893 
894 	ceph_msg_get(req->r_request); /* send consumes a ref */
895 	ceph_con_send(&req->r_osd->o_con, req->r_request);
896 	req->r_sent = req->r_osd->o_incarnation;
897 	return 0;
898 }
899 
900 /*
901  * Timeout callback, called every N seconds when 1 or more osd
902  * requests has been active for more than N seconds.  When this
903  * happens, we ping all OSDs with requests who have timed out to
904  * ensure any communications channel reset is detected.  Reset the
905  * request timeouts another N seconds in the future as we go.
906  * Reschedule the timeout event another N seconds in future (unless
907  * there are no open requests).
908  */
909 static void handle_timeout(struct work_struct *work)
910 {
911 	struct ceph_osd_client *osdc =
912 		container_of(work, struct ceph_osd_client, timeout_work.work);
913 	struct ceph_osd_request *req, *last_req = NULL;
914 	struct ceph_osd *osd;
915 	unsigned long timeout = osdc->client->options->osd_timeout * HZ;
916 	unsigned long keepalive =
917 		osdc->client->options->osd_keepalive_timeout * HZ;
918 	unsigned long last_stamp = 0;
919 	struct rb_node *p;
920 	struct list_head slow_osds;
921 
922 	dout("timeout\n");
923 	down_read(&osdc->map_sem);
924 
925 	ceph_monc_request_next_osdmap(&osdc->client->monc);
926 
927 	mutex_lock(&osdc->request_mutex);
928 	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
929 		req = rb_entry(p, struct ceph_osd_request, r_node);
930 
931 		if (req->r_resend) {
932 			int err;
933 
934 			dout("osdc resending prev failed %lld\n", req->r_tid);
935 			err = __send_request(osdc, req);
936 			if (err)
937 				dout("osdc failed again on %lld\n", req->r_tid);
938 			else
939 				req->r_resend = false;
940 			continue;
941 		}
942 	}
943 
944 	/*
945 	 * reset osds that appear to be _really_ unresponsive.  this
946 	 * is a failsafe measure.. we really shouldn't be getting to
947 	 * this point if the system is working properly.  the monitors
948 	 * should mark the osd as failed and we should find out about
949 	 * it from an updated osd map.
950 	 */
951 	while (timeout && !list_empty(&osdc->req_lru)) {
952 		req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
953 				 r_req_lru_item);
954 
955 		if (time_before(jiffies, req->r_stamp + timeout))
956 			break;
957 
958 		BUG_ON(req == last_req && req->r_stamp == last_stamp);
959 		last_req = req;
960 		last_stamp = req->r_stamp;
961 
962 		osd = req->r_osd;
963 		BUG_ON(!osd);
964 		pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
965 			   req->r_tid, osd->o_osd);
966 		__kick_requests(osdc, osd);
967 	}
968 
969 	/*
970 	 * ping osds that are a bit slow.  this ensures that if there
971 	 * is a break in the TCP connection we will notice, and reopen
972 	 * a connection with that osd (from the fault callback).
973 	 */
974 	INIT_LIST_HEAD(&slow_osds);
975 	list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
976 		if (time_before(jiffies, req->r_stamp + keepalive))
977 			break;
978 
979 		osd = req->r_osd;
980 		BUG_ON(!osd);
981 		dout(" tid %llu is slow, will send keepalive on osd%d\n",
982 		     req->r_tid, osd->o_osd);
983 		list_move_tail(&osd->o_keepalive_item, &slow_osds);
984 	}
985 	while (!list_empty(&slow_osds)) {
986 		osd = list_entry(slow_osds.next, struct ceph_osd,
987 				 o_keepalive_item);
988 		list_del_init(&osd->o_keepalive_item);
989 		ceph_con_keepalive(&osd->o_con);
990 	}
991 
992 	__schedule_osd_timeout(osdc);
993 	mutex_unlock(&osdc->request_mutex);
994 
995 	up_read(&osdc->map_sem);
996 }
997 
998 static void handle_osds_timeout(struct work_struct *work)
999 {
1000 	struct ceph_osd_client *osdc =
1001 		container_of(work, struct ceph_osd_client,
1002 			     osds_timeout_work.work);
1003 	unsigned long delay =
1004 		osdc->client->options->osd_idle_ttl * HZ >> 2;
1005 
1006 	dout("osds timeout\n");
1007 	down_read(&osdc->map_sem);
1008 	remove_old_osds(osdc, 0);
1009 	up_read(&osdc->map_sem);
1010 
1011 	schedule_delayed_work(&osdc->osds_timeout_work,
1012 			      round_jiffies_relative(delay));
1013 }
1014 
1015 /*
1016  * handle osd op reply.  either call the callback if it is specified,
1017  * or do the completion to wake up the waiting thread.
1018  */
1019 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1020 			 struct ceph_connection *con)
1021 {
1022 	struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1023 	struct ceph_osd_request *req;
1024 	u64 tid;
1025 	int numops, object_len, flags;
1026 	s32 result;
1027 
1028 	tid = le64_to_cpu(msg->hdr.tid);
1029 	if (msg->front.iov_len < sizeof(*rhead))
1030 		goto bad;
1031 	numops = le32_to_cpu(rhead->num_ops);
1032 	object_len = le32_to_cpu(rhead->object_len);
1033 	result = le32_to_cpu(rhead->result);
1034 	if (msg->front.iov_len != sizeof(*rhead) + object_len +
1035 	    numops * sizeof(struct ceph_osd_op))
1036 		goto bad;
1037 	dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1038 
1039 	/* lookup */
1040 	mutex_lock(&osdc->request_mutex);
1041 	req = __lookup_request(osdc, tid);
1042 	if (req == NULL) {
1043 		dout("handle_reply tid %llu dne\n", tid);
1044 		mutex_unlock(&osdc->request_mutex);
1045 		return;
1046 	}
1047 	ceph_osdc_get_request(req);
1048 	flags = le32_to_cpu(rhead->flags);
1049 
1050 	/*
1051 	 * if this connection filled our message, drop our reference now, to
1052 	 * avoid a (safe but slower) revoke later.
1053 	 */
1054 	if (req->r_con_filling_msg == con && req->r_reply == msg) {
1055 		dout(" dropping con_filling_msg ref %p\n", con);
1056 		req->r_con_filling_msg = NULL;
1057 		ceph_con_put(con);
1058 	}
1059 
1060 	if (!req->r_got_reply) {
1061 		unsigned bytes;
1062 
1063 		req->r_result = le32_to_cpu(rhead->result);
1064 		bytes = le32_to_cpu(msg->hdr.data_len);
1065 		dout("handle_reply result %d bytes %d\n", req->r_result,
1066 		     bytes);
1067 		if (req->r_result == 0)
1068 			req->r_result = bytes;
1069 
1070 		/* in case this is a write and we need to replay, */
1071 		req->r_reassert_version = rhead->reassert_version;
1072 
1073 		req->r_got_reply = 1;
1074 	} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1075 		dout("handle_reply tid %llu dup ack\n", tid);
1076 		mutex_unlock(&osdc->request_mutex);
1077 		goto done;
1078 	}
1079 
1080 	dout("handle_reply tid %llu flags %d\n", tid, flags);
1081 
1082 	/* either this is a read, or we got the safe response */
1083 	if (result < 0 ||
1084 	    (flags & CEPH_OSD_FLAG_ONDISK) ||
1085 	    ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1086 		__unregister_request(osdc, req);
1087 
1088 	mutex_unlock(&osdc->request_mutex);
1089 
1090 	if (req->r_callback)
1091 		req->r_callback(req, msg);
1092 	else
1093 		complete_all(&req->r_completion);
1094 
1095 	if (flags & CEPH_OSD_FLAG_ONDISK) {
1096 		if (req->r_safe_callback)
1097 			req->r_safe_callback(req, msg);
1098 		complete_all(&req->r_safe_completion);  /* fsync waiter */
1099 	}
1100 
1101 done:
1102 	ceph_osdc_put_request(req);
1103 	return;
1104 
1105 bad:
1106 	pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1107 	       (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1108 	       (int)sizeof(*rhead));
1109 	ceph_msg_dump(msg);
1110 }
1111 
1112 
1113 static int __kick_requests(struct ceph_osd_client *osdc,
1114 			  struct ceph_osd *kickosd)
1115 {
1116 	struct ceph_osd_request *req;
1117 	struct rb_node *p, *n;
1118 	int needmap = 0;
1119 	int err;
1120 
1121 	dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
1122 	if (kickosd) {
1123 		err = __reset_osd(osdc, kickosd);
1124 		if (err == -EAGAIN)
1125 			return 1;
1126 	} else {
1127 		for (p = rb_first(&osdc->osds); p; p = n) {
1128 			struct ceph_osd *osd =
1129 				rb_entry(p, struct ceph_osd, o_node);
1130 
1131 			n = rb_next(p);
1132 			if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1133 			    memcmp(&osd->o_con.peer_addr,
1134 				   ceph_osd_addr(osdc->osdmap,
1135 						 osd->o_osd),
1136 				   sizeof(struct ceph_entity_addr)) != 0)
1137 				__reset_osd(osdc, osd);
1138 		}
1139 	}
1140 
1141 	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1142 		req = rb_entry(p, struct ceph_osd_request, r_node);
1143 
1144 		if (req->r_resend) {
1145 			dout(" r_resend set on tid %llu\n", req->r_tid);
1146 			__cancel_request(req);
1147 			goto kick;
1148 		}
1149 		if (req->r_osd && kickosd == req->r_osd) {
1150 			__cancel_request(req);
1151 			goto kick;
1152 		}
1153 
1154 		err = __map_osds(osdc, req);
1155 		if (err == 0)
1156 			continue;  /* no change */
1157 		if (err < 0) {
1158 			/*
1159 			 * FIXME: really, we should set the request
1160 			 * error and fail if this isn't a 'nofail'
1161 			 * request, but that's a fair bit more
1162 			 * complicated to do.  So retry!
1163 			 */
1164 			dout(" setting r_resend on %llu\n", req->r_tid);
1165 			req->r_resend = true;
1166 			continue;
1167 		}
1168 		if (req->r_osd == NULL) {
1169 			dout("tid %llu maps to no valid osd\n", req->r_tid);
1170 			needmap++;  /* request a newer map */
1171 			continue;
1172 		}
1173 
1174 kick:
1175 		dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
1176 		     req->r_osd ? req->r_osd->o_osd : -1);
1177 		req->r_flags |= CEPH_OSD_FLAG_RETRY;
1178 		err = __send_request(osdc, req);
1179 		if (err) {
1180 			dout(" setting r_resend on %llu\n", req->r_tid);
1181 			req->r_resend = true;
1182 		}
1183 	}
1184 
1185 	return needmap;
1186 }
1187 
1188 /*
1189  * Resubmit osd requests whose osd or osd address has changed.  Request
1190  * a new osd map if osds are down, or we are otherwise unable to determine
1191  * how to direct a request.
1192  *
1193  * Close connections to down osds.
1194  *
1195  * If @who is specified, resubmit requests for that specific osd.
1196  *
1197  * Caller should hold map_sem for read and request_mutex.
1198  */
1199 static void kick_requests(struct ceph_osd_client *osdc,
1200 			  struct ceph_osd *kickosd)
1201 {
1202 	int needmap;
1203 
1204 	mutex_lock(&osdc->request_mutex);
1205 	needmap = __kick_requests(osdc, kickosd);
1206 	mutex_unlock(&osdc->request_mutex);
1207 
1208 	if (needmap) {
1209 		dout("%d requests for down osds, need new map\n", needmap);
1210 		ceph_monc_request_next_osdmap(&osdc->client->monc);
1211 	}
1212 
1213 }
1214 /*
1215  * Process updated osd map.
1216  *
1217  * The message contains any number of incremental and full maps, normally
1218  * indicating some sort of topology change in the cluster.  Kick requests
1219  * off to different OSDs as needed.
1220  */
1221 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1222 {
1223 	void *p, *end, *next;
1224 	u32 nr_maps, maplen;
1225 	u32 epoch;
1226 	struct ceph_osdmap *newmap = NULL, *oldmap;
1227 	int err;
1228 	struct ceph_fsid fsid;
1229 
1230 	dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1231 	p = msg->front.iov_base;
1232 	end = p + msg->front.iov_len;
1233 
1234 	/* verify fsid */
1235 	ceph_decode_need(&p, end, sizeof(fsid), bad);
1236 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
1237 	if (ceph_check_fsid(osdc->client, &fsid) < 0)
1238 		return;
1239 
1240 	down_write(&osdc->map_sem);
1241 
1242 	/* incremental maps */
1243 	ceph_decode_32_safe(&p, end, nr_maps, bad);
1244 	dout(" %d inc maps\n", nr_maps);
1245 	while (nr_maps > 0) {
1246 		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1247 		epoch = ceph_decode_32(&p);
1248 		maplen = ceph_decode_32(&p);
1249 		ceph_decode_need(&p, end, maplen, bad);
1250 		next = p + maplen;
1251 		if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1252 			dout("applying incremental map %u len %d\n",
1253 			     epoch, maplen);
1254 			newmap = osdmap_apply_incremental(&p, next,
1255 							  osdc->osdmap,
1256 							  osdc->client->msgr);
1257 			if (IS_ERR(newmap)) {
1258 				err = PTR_ERR(newmap);
1259 				goto bad;
1260 			}
1261 			BUG_ON(!newmap);
1262 			if (newmap != osdc->osdmap) {
1263 				ceph_osdmap_destroy(osdc->osdmap);
1264 				osdc->osdmap = newmap;
1265 			}
1266 		} else {
1267 			dout("ignoring incremental map %u len %d\n",
1268 			     epoch, maplen);
1269 		}
1270 		p = next;
1271 		nr_maps--;
1272 	}
1273 	if (newmap)
1274 		goto done;
1275 
1276 	/* full maps */
1277 	ceph_decode_32_safe(&p, end, nr_maps, bad);
1278 	dout(" %d full maps\n", nr_maps);
1279 	while (nr_maps) {
1280 		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1281 		epoch = ceph_decode_32(&p);
1282 		maplen = ceph_decode_32(&p);
1283 		ceph_decode_need(&p, end, maplen, bad);
1284 		if (nr_maps > 1) {
1285 			dout("skipping non-latest full map %u len %d\n",
1286 			     epoch, maplen);
1287 		} else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1288 			dout("skipping full map %u len %d, "
1289 			     "older than our %u\n", epoch, maplen,
1290 			     osdc->osdmap->epoch);
1291 		} else {
1292 			dout("taking full map %u len %d\n", epoch, maplen);
1293 			newmap = osdmap_decode(&p, p+maplen);
1294 			if (IS_ERR(newmap)) {
1295 				err = PTR_ERR(newmap);
1296 				goto bad;
1297 			}
1298 			BUG_ON(!newmap);
1299 			oldmap = osdc->osdmap;
1300 			osdc->osdmap = newmap;
1301 			if (oldmap)
1302 				ceph_osdmap_destroy(oldmap);
1303 		}
1304 		p += maplen;
1305 		nr_maps--;
1306 	}
1307 
1308 done:
1309 	downgrade_write(&osdc->map_sem);
1310 	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1311 	if (newmap)
1312 		kick_requests(osdc, NULL);
1313 	up_read(&osdc->map_sem);
1314 	wake_up_all(&osdc->client->auth_wq);
1315 	return;
1316 
1317 bad:
1318 	pr_err("osdc handle_map corrupt msg\n");
1319 	ceph_msg_dump(msg);
1320 	up_write(&osdc->map_sem);
1321 	return;
1322 }
1323 
1324 /*
1325  * Register request, send initial attempt.
1326  */
1327 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1328 			    struct ceph_osd_request *req,
1329 			    bool nofail)
1330 {
1331 	int rc = 0;
1332 
1333 	req->r_request->pages = req->r_pages;
1334 	req->r_request->nr_pages = req->r_num_pages;
1335 #ifdef CONFIG_BLOCK
1336 	req->r_request->bio = req->r_bio;
1337 #endif
1338 	req->r_request->trail = req->r_trail;
1339 
1340 	register_request(osdc, req);
1341 
1342 	down_read(&osdc->map_sem);
1343 	mutex_lock(&osdc->request_mutex);
1344 	/*
1345 	 * a racing kick_requests() may have sent the message for us
1346 	 * while we dropped request_mutex above, so only send now if
1347 	 * the request still han't been touched yet.
1348 	 */
1349 	if (req->r_sent == 0) {
1350 		rc = __send_request(osdc, req);
1351 		if (rc) {
1352 			if (nofail) {
1353 				dout("osdc_start_request failed send, "
1354 				     " marking %lld\n", req->r_tid);
1355 				req->r_resend = true;
1356 				rc = 0;
1357 			} else {
1358 				__unregister_request(osdc, req);
1359 			}
1360 		}
1361 	}
1362 	mutex_unlock(&osdc->request_mutex);
1363 	up_read(&osdc->map_sem);
1364 	return rc;
1365 }
1366 EXPORT_SYMBOL(ceph_osdc_start_request);
1367 
1368 /*
1369  * wait for a request to complete
1370  */
1371 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1372 			   struct ceph_osd_request *req)
1373 {
1374 	int rc;
1375 
1376 	rc = wait_for_completion_interruptible(&req->r_completion);
1377 	if (rc < 0) {
1378 		mutex_lock(&osdc->request_mutex);
1379 		__cancel_request(req);
1380 		__unregister_request(osdc, req);
1381 		mutex_unlock(&osdc->request_mutex);
1382 		dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1383 		return rc;
1384 	}
1385 
1386 	dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1387 	return req->r_result;
1388 }
1389 EXPORT_SYMBOL(ceph_osdc_wait_request);
1390 
1391 /*
1392  * sync - wait for all in-flight requests to flush.  avoid starvation.
1393  */
1394 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1395 {
1396 	struct ceph_osd_request *req;
1397 	u64 last_tid, next_tid = 0;
1398 
1399 	mutex_lock(&osdc->request_mutex);
1400 	last_tid = osdc->last_tid;
1401 	while (1) {
1402 		req = __lookup_request_ge(osdc, next_tid);
1403 		if (!req)
1404 			break;
1405 		if (req->r_tid > last_tid)
1406 			break;
1407 
1408 		next_tid = req->r_tid + 1;
1409 		if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1410 			continue;
1411 
1412 		ceph_osdc_get_request(req);
1413 		mutex_unlock(&osdc->request_mutex);
1414 		dout("sync waiting on tid %llu (last is %llu)\n",
1415 		     req->r_tid, last_tid);
1416 		wait_for_completion(&req->r_safe_completion);
1417 		mutex_lock(&osdc->request_mutex);
1418 		ceph_osdc_put_request(req);
1419 	}
1420 	mutex_unlock(&osdc->request_mutex);
1421 	dout("sync done (thru tid %llu)\n", last_tid);
1422 }
1423 EXPORT_SYMBOL(ceph_osdc_sync);
1424 
1425 /*
1426  * init, shutdown
1427  */
1428 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1429 {
1430 	int err;
1431 
1432 	dout("init\n");
1433 	osdc->client = client;
1434 	osdc->osdmap = NULL;
1435 	init_rwsem(&osdc->map_sem);
1436 	init_completion(&osdc->map_waiters);
1437 	osdc->last_requested_map = 0;
1438 	mutex_init(&osdc->request_mutex);
1439 	osdc->last_tid = 0;
1440 	osdc->osds = RB_ROOT;
1441 	INIT_LIST_HEAD(&osdc->osd_lru);
1442 	osdc->requests = RB_ROOT;
1443 	INIT_LIST_HEAD(&osdc->req_lru);
1444 	osdc->num_requests = 0;
1445 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1446 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1447 
1448 	schedule_delayed_work(&osdc->osds_timeout_work,
1449 	   round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1450 
1451 	err = -ENOMEM;
1452 	osdc->req_mempool = mempool_create_kmalloc_pool(10,
1453 					sizeof(struct ceph_osd_request));
1454 	if (!osdc->req_mempool)
1455 		goto out;
1456 
1457 	err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1458 				"osd_op");
1459 	if (err < 0)
1460 		goto out_mempool;
1461 	err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1462 				OSD_OPREPLY_FRONT_LEN, 10, true,
1463 				"osd_op_reply");
1464 	if (err < 0)
1465 		goto out_msgpool;
1466 	return 0;
1467 
1468 out_msgpool:
1469 	ceph_msgpool_destroy(&osdc->msgpool_op);
1470 out_mempool:
1471 	mempool_destroy(osdc->req_mempool);
1472 out:
1473 	return err;
1474 }
1475 EXPORT_SYMBOL(ceph_osdc_init);
1476 
1477 void ceph_osdc_stop(struct ceph_osd_client *osdc)
1478 {
1479 	cancel_delayed_work_sync(&osdc->timeout_work);
1480 	cancel_delayed_work_sync(&osdc->osds_timeout_work);
1481 	if (osdc->osdmap) {
1482 		ceph_osdmap_destroy(osdc->osdmap);
1483 		osdc->osdmap = NULL;
1484 	}
1485 	remove_old_osds(osdc, 1);
1486 	mempool_destroy(osdc->req_mempool);
1487 	ceph_msgpool_destroy(&osdc->msgpool_op);
1488 	ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1489 }
1490 EXPORT_SYMBOL(ceph_osdc_stop);
1491 
1492 /*
1493  * Read some contiguous pages.  If we cross a stripe boundary, shorten
1494  * *plen.  Return number of bytes read, or error.
1495  */
1496 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1497 			struct ceph_vino vino, struct ceph_file_layout *layout,
1498 			u64 off, u64 *plen,
1499 			u32 truncate_seq, u64 truncate_size,
1500 			struct page **pages, int num_pages, int page_align)
1501 {
1502 	struct ceph_osd_request *req;
1503 	int rc = 0;
1504 
1505 	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1506 	     vino.snap, off, *plen);
1507 	req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1508 				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1509 				    NULL, 0, truncate_seq, truncate_size, NULL,
1510 				    false, 1, page_align);
1511 	if (!req)
1512 		return -ENOMEM;
1513 
1514 	/* it may be a short read due to an object boundary */
1515 	req->r_pages = pages;
1516 
1517 	dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
1518 	     off, *plen, req->r_num_pages, page_align);
1519 
1520 	rc = ceph_osdc_start_request(osdc, req, false);
1521 	if (!rc)
1522 		rc = ceph_osdc_wait_request(osdc, req);
1523 
1524 	ceph_osdc_put_request(req);
1525 	dout("readpages result %d\n", rc);
1526 	return rc;
1527 }
1528 EXPORT_SYMBOL(ceph_osdc_readpages);
1529 
1530 /*
1531  * do a synchronous write on N pages
1532  */
1533 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1534 			 struct ceph_file_layout *layout,
1535 			 struct ceph_snap_context *snapc,
1536 			 u64 off, u64 len,
1537 			 u32 truncate_seq, u64 truncate_size,
1538 			 struct timespec *mtime,
1539 			 struct page **pages, int num_pages,
1540 			 int flags, int do_sync, bool nofail)
1541 {
1542 	struct ceph_osd_request *req;
1543 	int rc = 0;
1544 	int page_align = off & ~PAGE_MASK;
1545 
1546 	BUG_ON(vino.snap != CEPH_NOSNAP);
1547 	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1548 				    CEPH_OSD_OP_WRITE,
1549 				    flags | CEPH_OSD_FLAG_ONDISK |
1550 					    CEPH_OSD_FLAG_WRITE,
1551 				    snapc, do_sync,
1552 				    truncate_seq, truncate_size, mtime,
1553 				    nofail, 1, page_align);
1554 	if (!req)
1555 		return -ENOMEM;
1556 
1557 	/* it may be a short write due to an object boundary */
1558 	req->r_pages = pages;
1559 	dout("writepages %llu~%llu (%d pages)\n", off, len,
1560 	     req->r_num_pages);
1561 
1562 	rc = ceph_osdc_start_request(osdc, req, nofail);
1563 	if (!rc)
1564 		rc = ceph_osdc_wait_request(osdc, req);
1565 
1566 	ceph_osdc_put_request(req);
1567 	if (rc == 0)
1568 		rc = len;
1569 	dout("writepages result %d\n", rc);
1570 	return rc;
1571 }
1572 EXPORT_SYMBOL(ceph_osdc_writepages);
1573 
1574 /*
1575  * handle incoming message
1576  */
1577 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1578 {
1579 	struct ceph_osd *osd = con->private;
1580 	struct ceph_osd_client *osdc;
1581 	int type = le16_to_cpu(msg->hdr.type);
1582 
1583 	if (!osd)
1584 		goto out;
1585 	osdc = osd->o_osdc;
1586 
1587 	switch (type) {
1588 	case CEPH_MSG_OSD_MAP:
1589 		ceph_osdc_handle_map(osdc, msg);
1590 		break;
1591 	case CEPH_MSG_OSD_OPREPLY:
1592 		handle_reply(osdc, msg, con);
1593 		break;
1594 
1595 	default:
1596 		pr_err("received unknown message type %d %s\n", type,
1597 		       ceph_msg_type_name(type));
1598 	}
1599 out:
1600 	ceph_msg_put(msg);
1601 }
1602 
1603 /*
1604  * lookup and return message for incoming reply.  set up reply message
1605  * pages.
1606  */
1607 static struct ceph_msg *get_reply(struct ceph_connection *con,
1608 				  struct ceph_msg_header *hdr,
1609 				  int *skip)
1610 {
1611 	struct ceph_osd *osd = con->private;
1612 	struct ceph_osd_client *osdc = osd->o_osdc;
1613 	struct ceph_msg *m;
1614 	struct ceph_osd_request *req;
1615 	int front = le32_to_cpu(hdr->front_len);
1616 	int data_len = le32_to_cpu(hdr->data_len);
1617 	u64 tid;
1618 
1619 	tid = le64_to_cpu(hdr->tid);
1620 	mutex_lock(&osdc->request_mutex);
1621 	req = __lookup_request(osdc, tid);
1622 	if (!req) {
1623 		*skip = 1;
1624 		m = NULL;
1625 		pr_info("get_reply unknown tid %llu from osd%d\n", tid,
1626 			osd->o_osd);
1627 		goto out;
1628 	}
1629 
1630 	if (req->r_con_filling_msg) {
1631 		dout("get_reply revoking msg %p from old con %p\n",
1632 		     req->r_reply, req->r_con_filling_msg);
1633 		ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1634 		ceph_con_put(req->r_con_filling_msg);
1635 		req->r_con_filling_msg = NULL;
1636 	}
1637 
1638 	if (front > req->r_reply->front.iov_len) {
1639 		pr_warning("get_reply front %d > preallocated %d\n",
1640 			   front, (int)req->r_reply->front.iov_len);
1641 		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
1642 		if (!m)
1643 			goto out;
1644 		ceph_msg_put(req->r_reply);
1645 		req->r_reply = m;
1646 	}
1647 	m = ceph_msg_get(req->r_reply);
1648 
1649 	if (data_len > 0) {
1650 		int want = calc_pages_for(req->r_page_alignment, data_len);
1651 
1652 		if (unlikely(req->r_num_pages < want)) {
1653 			pr_warning("tid %lld reply %d > expected %d pages\n",
1654 				   tid, want, m->nr_pages);
1655 			*skip = 1;
1656 			ceph_msg_put(m);
1657 			m = NULL;
1658 			goto out;
1659 		}
1660 		m->pages = req->r_pages;
1661 		m->nr_pages = req->r_num_pages;
1662 		m->page_alignment = req->r_page_alignment;
1663 #ifdef CONFIG_BLOCK
1664 		m->bio = req->r_bio;
1665 #endif
1666 	}
1667 	*skip = 0;
1668 	req->r_con_filling_msg = ceph_con_get(con);
1669 	dout("get_reply tid %lld %p\n", tid, m);
1670 
1671 out:
1672 	mutex_unlock(&osdc->request_mutex);
1673 	return m;
1674 
1675 }
1676 
1677 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1678 				  struct ceph_msg_header *hdr,
1679 				  int *skip)
1680 {
1681 	struct ceph_osd *osd = con->private;
1682 	int type = le16_to_cpu(hdr->type);
1683 	int front = le32_to_cpu(hdr->front_len);
1684 
1685 	switch (type) {
1686 	case CEPH_MSG_OSD_MAP:
1687 		return ceph_msg_new(type, front, GFP_NOFS);
1688 	case CEPH_MSG_OSD_OPREPLY:
1689 		return get_reply(con, hdr, skip);
1690 	default:
1691 		pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1692 			osd->o_osd);
1693 		*skip = 1;
1694 		return NULL;
1695 	}
1696 }
1697 
1698 /*
1699  * Wrappers to refcount containing ceph_osd struct
1700  */
1701 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1702 {
1703 	struct ceph_osd *osd = con->private;
1704 	if (get_osd(osd))
1705 		return con;
1706 	return NULL;
1707 }
1708 
1709 static void put_osd_con(struct ceph_connection *con)
1710 {
1711 	struct ceph_osd *osd = con->private;
1712 	put_osd(osd);
1713 }
1714 
1715 /*
1716  * authentication
1717  */
1718 static int get_authorizer(struct ceph_connection *con,
1719 			  void **buf, int *len, int *proto,
1720 			  void **reply_buf, int *reply_len, int force_new)
1721 {
1722 	struct ceph_osd *o = con->private;
1723 	struct ceph_osd_client *osdc = o->o_osdc;
1724 	struct ceph_auth_client *ac = osdc->client->monc.auth;
1725 	int ret = 0;
1726 
1727 	if (force_new && o->o_authorizer) {
1728 		ac->ops->destroy_authorizer(ac, o->o_authorizer);
1729 		o->o_authorizer = NULL;
1730 	}
1731 	if (o->o_authorizer == NULL) {
1732 		ret = ac->ops->create_authorizer(
1733 			ac, CEPH_ENTITY_TYPE_OSD,
1734 			&o->o_authorizer,
1735 			&o->o_authorizer_buf,
1736 			&o->o_authorizer_buf_len,
1737 			&o->o_authorizer_reply_buf,
1738 			&o->o_authorizer_reply_buf_len);
1739 		if (ret)
1740 			return ret;
1741 	}
1742 
1743 	*proto = ac->protocol;
1744 	*buf = o->o_authorizer_buf;
1745 	*len = o->o_authorizer_buf_len;
1746 	*reply_buf = o->o_authorizer_reply_buf;
1747 	*reply_len = o->o_authorizer_reply_buf_len;
1748 	return 0;
1749 }
1750 
1751 
1752 static int verify_authorizer_reply(struct ceph_connection *con, int len)
1753 {
1754 	struct ceph_osd *o = con->private;
1755 	struct ceph_osd_client *osdc = o->o_osdc;
1756 	struct ceph_auth_client *ac = osdc->client->monc.auth;
1757 
1758 	return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1759 }
1760 
1761 static int invalidate_authorizer(struct ceph_connection *con)
1762 {
1763 	struct ceph_osd *o = con->private;
1764 	struct ceph_osd_client *osdc = o->o_osdc;
1765 	struct ceph_auth_client *ac = osdc->client->monc.auth;
1766 
1767 	if (ac->ops->invalidate_authorizer)
1768 		ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1769 
1770 	return ceph_monc_validate_auth(&osdc->client->monc);
1771 }
1772 
1773 static const struct ceph_connection_operations osd_con_ops = {
1774 	.get = get_osd_con,
1775 	.put = put_osd_con,
1776 	.dispatch = dispatch,
1777 	.get_authorizer = get_authorizer,
1778 	.verify_authorizer_reply = verify_authorizer_reply,
1779 	.invalidate_authorizer = invalidate_authorizer,
1780 	.alloc_msg = alloc_msg,
1781 	.fault = osd_reset,
1782 };
1783