xref: /openbmc/linux/fs/ceph/mds_client.c (revision 7ce05074)
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3 
4 #include <linux/fs.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
7 #include <linux/gfp.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12 #include <linux/bits.h>
13 #include <linux/ktime.h>
14 
15 #include "super.h"
16 #include "mds_client.h"
17 
18 #include <linux/ceph/ceph_features.h>
19 #include <linux/ceph/messenger.h>
20 #include <linux/ceph/decode.h>
21 #include <linux/ceph/pagelist.h>
22 #include <linux/ceph/auth.h>
23 #include <linux/ceph/debugfs.h>
24 
25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
26 
27 /*
28  * A cluster of MDS (metadata server) daemons is responsible for
29  * managing the file system namespace (the directory hierarchy and
30  * inodes) and for coordinating shared access to storage.  Metadata is
31  * partitioning hierarchically across a number of servers, and that
32  * partition varies over time as the cluster adjusts the distribution
33  * in order to balance load.
34  *
35  * The MDS client is primarily responsible to managing synchronous
36  * metadata requests for operations like open, unlink, and so forth.
37  * If there is a MDS failure, we find out about it when we (possibly
38  * request and) receive a new MDS map, and can resubmit affected
39  * requests.
40  *
41  * For the most part, though, we take advantage of a lossless
42  * communications channel to the MDS, and do not need to worry about
43  * timing out or resubmitting requests.
44  *
45  * We maintain a stateful "session" with each MDS we interact with.
46  * Within each session, we sent periodic heartbeat messages to ensure
47  * any capabilities or leases we have been issues remain valid.  If
48  * the session times out and goes stale, our leases and capabilities
49  * are no longer valid.
50  */
51 
52 struct ceph_reconnect_state {
53 	struct ceph_mds_session *session;
54 	int nr_caps, nr_realms;
55 	struct ceph_pagelist *pagelist;
56 	unsigned msg_version;
57 	bool allow_multi;
58 };
59 
60 static void __wake_requests(struct ceph_mds_client *mdsc,
61 			    struct list_head *head);
62 static void ceph_cap_release_work(struct work_struct *work);
63 static void ceph_cap_reclaim_work(struct work_struct *work);
64 
65 static const struct ceph_connection_operations mds_con_ops;
66 
67 
68 /*
69  * mds reply parsing
70  */
71 
72 static int parse_reply_info_quota(void **p, void *end,
73 				  struct ceph_mds_reply_info_in *info)
74 {
75 	u8 struct_v, struct_compat;
76 	u32 struct_len;
77 
78 	ceph_decode_8_safe(p, end, struct_v, bad);
79 	ceph_decode_8_safe(p, end, struct_compat, bad);
80 	/* struct_v is expected to be >= 1. we only
81 	 * understand encoding with struct_compat == 1. */
82 	if (!struct_v || struct_compat != 1)
83 		goto bad;
84 	ceph_decode_32_safe(p, end, struct_len, bad);
85 	ceph_decode_need(p, end, struct_len, bad);
86 	end = *p + struct_len;
87 	ceph_decode_64_safe(p, end, info->max_bytes, bad);
88 	ceph_decode_64_safe(p, end, info->max_files, bad);
89 	*p = end;
90 	return 0;
91 bad:
92 	return -EIO;
93 }
94 
95 /*
96  * parse individual inode info
97  */
98 static int parse_reply_info_in(void **p, void *end,
99 			       struct ceph_mds_reply_info_in *info,
100 			       u64 features)
101 {
102 	int err = 0;
103 	u8 struct_v = 0;
104 
105 	if (features == (u64)-1) {
106 		u32 struct_len;
107 		u8 struct_compat;
108 		ceph_decode_8_safe(p, end, struct_v, bad);
109 		ceph_decode_8_safe(p, end, struct_compat, bad);
110 		/* struct_v is expected to be >= 1. we only understand
111 		 * encoding with struct_compat == 1. */
112 		if (!struct_v || struct_compat != 1)
113 			goto bad;
114 		ceph_decode_32_safe(p, end, struct_len, bad);
115 		ceph_decode_need(p, end, struct_len, bad);
116 		end = *p + struct_len;
117 	}
118 
119 	ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
120 	info->in = *p;
121 	*p += sizeof(struct ceph_mds_reply_inode) +
122 		sizeof(*info->in->fragtree.splits) *
123 		le32_to_cpu(info->in->fragtree.nsplits);
124 
125 	ceph_decode_32_safe(p, end, info->symlink_len, bad);
126 	ceph_decode_need(p, end, info->symlink_len, bad);
127 	info->symlink = *p;
128 	*p += info->symlink_len;
129 
130 	ceph_decode_copy_safe(p, end, &info->dir_layout,
131 			      sizeof(info->dir_layout), bad);
132 	ceph_decode_32_safe(p, end, info->xattr_len, bad);
133 	ceph_decode_need(p, end, info->xattr_len, bad);
134 	info->xattr_data = *p;
135 	*p += info->xattr_len;
136 
137 	if (features == (u64)-1) {
138 		/* inline data */
139 		ceph_decode_64_safe(p, end, info->inline_version, bad);
140 		ceph_decode_32_safe(p, end, info->inline_len, bad);
141 		ceph_decode_need(p, end, info->inline_len, bad);
142 		info->inline_data = *p;
143 		*p += info->inline_len;
144 		/* quota */
145 		err = parse_reply_info_quota(p, end, info);
146 		if (err < 0)
147 			goto out_bad;
148 		/* pool namespace */
149 		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
150 		if (info->pool_ns_len > 0) {
151 			ceph_decode_need(p, end, info->pool_ns_len, bad);
152 			info->pool_ns_data = *p;
153 			*p += info->pool_ns_len;
154 		}
155 
156 		/* btime */
157 		ceph_decode_need(p, end, sizeof(info->btime), bad);
158 		ceph_decode_copy(p, &info->btime, sizeof(info->btime));
159 
160 		/* change attribute */
161 		ceph_decode_64_safe(p, end, info->change_attr, bad);
162 
163 		/* dir pin */
164 		if (struct_v >= 2) {
165 			ceph_decode_32_safe(p, end, info->dir_pin, bad);
166 		} else {
167 			info->dir_pin = -ENODATA;
168 		}
169 
170 		/* snapshot birth time, remains zero for v<=2 */
171 		if (struct_v >= 3) {
172 			ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
173 			ceph_decode_copy(p, &info->snap_btime,
174 					 sizeof(info->snap_btime));
175 		} else {
176 			memset(&info->snap_btime, 0, sizeof(info->snap_btime));
177 		}
178 
179 		/* snapshot count, remains zero for v<=3 */
180 		if (struct_v >= 4) {
181 			ceph_decode_64_safe(p, end, info->rsnaps, bad);
182 		} else {
183 			info->rsnaps = 0;
184 		}
185 
186 		*p = end;
187 	} else {
188 		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
189 			ceph_decode_64_safe(p, end, info->inline_version, bad);
190 			ceph_decode_32_safe(p, end, info->inline_len, bad);
191 			ceph_decode_need(p, end, info->inline_len, bad);
192 			info->inline_data = *p;
193 			*p += info->inline_len;
194 		} else
195 			info->inline_version = CEPH_INLINE_NONE;
196 
197 		if (features & CEPH_FEATURE_MDS_QUOTA) {
198 			err = parse_reply_info_quota(p, end, info);
199 			if (err < 0)
200 				goto out_bad;
201 		} else {
202 			info->max_bytes = 0;
203 			info->max_files = 0;
204 		}
205 
206 		info->pool_ns_len = 0;
207 		info->pool_ns_data = NULL;
208 		if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
209 			ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
210 			if (info->pool_ns_len > 0) {
211 				ceph_decode_need(p, end, info->pool_ns_len, bad);
212 				info->pool_ns_data = *p;
213 				*p += info->pool_ns_len;
214 			}
215 		}
216 
217 		if (features & CEPH_FEATURE_FS_BTIME) {
218 			ceph_decode_need(p, end, sizeof(info->btime), bad);
219 			ceph_decode_copy(p, &info->btime, sizeof(info->btime));
220 			ceph_decode_64_safe(p, end, info->change_attr, bad);
221 		}
222 
223 		info->dir_pin = -ENODATA;
224 		/* info->snap_btime and info->rsnaps remain zero */
225 	}
226 	return 0;
227 bad:
228 	err = -EIO;
229 out_bad:
230 	return err;
231 }
232 
233 static int parse_reply_info_dir(void **p, void *end,
234 				struct ceph_mds_reply_dirfrag **dirfrag,
235 				u64 features)
236 {
237 	if (features == (u64)-1) {
238 		u8 struct_v, struct_compat;
239 		u32 struct_len;
240 		ceph_decode_8_safe(p, end, struct_v, bad);
241 		ceph_decode_8_safe(p, end, struct_compat, bad);
242 		/* struct_v is expected to be >= 1. we only understand
243 		 * encoding whose struct_compat == 1. */
244 		if (!struct_v || struct_compat != 1)
245 			goto bad;
246 		ceph_decode_32_safe(p, end, struct_len, bad);
247 		ceph_decode_need(p, end, struct_len, bad);
248 		end = *p + struct_len;
249 	}
250 
251 	ceph_decode_need(p, end, sizeof(**dirfrag), bad);
252 	*dirfrag = *p;
253 	*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
254 	if (unlikely(*p > end))
255 		goto bad;
256 	if (features == (u64)-1)
257 		*p = end;
258 	return 0;
259 bad:
260 	return -EIO;
261 }
262 
263 static int parse_reply_info_lease(void **p, void *end,
264 				  struct ceph_mds_reply_lease **lease,
265 				  u64 features)
266 {
267 	if (features == (u64)-1) {
268 		u8 struct_v, struct_compat;
269 		u32 struct_len;
270 		ceph_decode_8_safe(p, end, struct_v, bad);
271 		ceph_decode_8_safe(p, end, struct_compat, bad);
272 		/* struct_v is expected to be >= 1. we only understand
273 		 * encoding whose struct_compat == 1. */
274 		if (!struct_v || struct_compat != 1)
275 			goto bad;
276 		ceph_decode_32_safe(p, end, struct_len, bad);
277 		ceph_decode_need(p, end, struct_len, bad);
278 		end = *p + struct_len;
279 	}
280 
281 	ceph_decode_need(p, end, sizeof(**lease), bad);
282 	*lease = *p;
283 	*p += sizeof(**lease);
284 	if (features == (u64)-1)
285 		*p = end;
286 	return 0;
287 bad:
288 	return -EIO;
289 }
290 
291 /*
292  * parse a normal reply, which may contain a (dir+)dentry and/or a
293  * target inode.
294  */
295 static int parse_reply_info_trace(void **p, void *end,
296 				  struct ceph_mds_reply_info_parsed *info,
297 				  u64 features)
298 {
299 	int err;
300 
301 	if (info->head->is_dentry) {
302 		err = parse_reply_info_in(p, end, &info->diri, features);
303 		if (err < 0)
304 			goto out_bad;
305 
306 		err = parse_reply_info_dir(p, end, &info->dirfrag, features);
307 		if (err < 0)
308 			goto out_bad;
309 
310 		ceph_decode_32_safe(p, end, info->dname_len, bad);
311 		ceph_decode_need(p, end, info->dname_len, bad);
312 		info->dname = *p;
313 		*p += info->dname_len;
314 
315 		err = parse_reply_info_lease(p, end, &info->dlease, features);
316 		if (err < 0)
317 			goto out_bad;
318 	}
319 
320 	if (info->head->is_target) {
321 		err = parse_reply_info_in(p, end, &info->targeti, features);
322 		if (err < 0)
323 			goto out_bad;
324 	}
325 
326 	if (unlikely(*p != end))
327 		goto bad;
328 	return 0;
329 
330 bad:
331 	err = -EIO;
332 out_bad:
333 	pr_err("problem parsing mds trace %d\n", err);
334 	return err;
335 }
336 
337 /*
338  * parse readdir results
339  */
340 static int parse_reply_info_readdir(void **p, void *end,
341 				struct ceph_mds_reply_info_parsed *info,
342 				u64 features)
343 {
344 	u32 num, i = 0;
345 	int err;
346 
347 	err = parse_reply_info_dir(p, end, &info->dir_dir, features);
348 	if (err < 0)
349 		goto out_bad;
350 
351 	ceph_decode_need(p, end, sizeof(num) + 2, bad);
352 	num = ceph_decode_32(p);
353 	{
354 		u16 flags = ceph_decode_16(p);
355 		info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
356 		info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
357 		info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
358 		info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
359 	}
360 	if (num == 0)
361 		goto done;
362 
363 	BUG_ON(!info->dir_entries);
364 	if ((unsigned long)(info->dir_entries + num) >
365 	    (unsigned long)info->dir_entries + info->dir_buf_size) {
366 		pr_err("dir contents are larger than expected\n");
367 		WARN_ON(1);
368 		goto bad;
369 	}
370 
371 	info->dir_nr = num;
372 	while (num) {
373 		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
374 		/* dentry */
375 		ceph_decode_32_safe(p, end, rde->name_len, bad);
376 		ceph_decode_need(p, end, rde->name_len, bad);
377 		rde->name = *p;
378 		*p += rde->name_len;
379 		dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
380 
381 		/* dentry lease */
382 		err = parse_reply_info_lease(p, end, &rde->lease, features);
383 		if (err)
384 			goto out_bad;
385 		/* inode */
386 		err = parse_reply_info_in(p, end, &rde->inode, features);
387 		if (err < 0)
388 			goto out_bad;
389 		/* ceph_readdir_prepopulate() will update it */
390 		rde->offset = 0;
391 		i++;
392 		num--;
393 	}
394 
395 done:
396 	/* Skip over any unrecognized fields */
397 	*p = end;
398 	return 0;
399 
400 bad:
401 	err = -EIO;
402 out_bad:
403 	pr_err("problem parsing dir contents %d\n", err);
404 	return err;
405 }
406 
407 /*
408  * parse fcntl F_GETLK results
409  */
410 static int parse_reply_info_filelock(void **p, void *end,
411 				     struct ceph_mds_reply_info_parsed *info,
412 				     u64 features)
413 {
414 	if (*p + sizeof(*info->filelock_reply) > end)
415 		goto bad;
416 
417 	info->filelock_reply = *p;
418 
419 	/* Skip over any unrecognized fields */
420 	*p = end;
421 	return 0;
422 bad:
423 	return -EIO;
424 }
425 
426 
427 #if BITS_PER_LONG == 64
428 
429 #define DELEGATED_INO_AVAILABLE		xa_mk_value(1)
430 
431 static int ceph_parse_deleg_inos(void **p, void *end,
432 				 struct ceph_mds_session *s)
433 {
434 	u32 sets;
435 
436 	ceph_decode_32_safe(p, end, sets, bad);
437 	dout("got %u sets of delegated inodes\n", sets);
438 	while (sets--) {
439 		u64 start, len, ino;
440 
441 		ceph_decode_64_safe(p, end, start, bad);
442 		ceph_decode_64_safe(p, end, len, bad);
443 
444 		/* Don't accept a delegation of system inodes */
445 		if (start < CEPH_INO_SYSTEM_BASE) {
446 			pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
447 					start, len);
448 			continue;
449 		}
450 		while (len--) {
451 			int err = xa_insert(&s->s_delegated_inos, ino = start++,
452 					    DELEGATED_INO_AVAILABLE,
453 					    GFP_KERNEL);
454 			if (!err) {
455 				dout("added delegated inode 0x%llx\n",
456 				     start - 1);
457 			} else if (err == -EBUSY) {
458 				pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
459 					start - 1);
460 			} else {
461 				return err;
462 			}
463 		}
464 	}
465 	return 0;
466 bad:
467 	return -EIO;
468 }
469 
470 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
471 {
472 	unsigned long ino;
473 	void *val;
474 
475 	xa_for_each(&s->s_delegated_inos, ino, val) {
476 		val = xa_erase(&s->s_delegated_inos, ino);
477 		if (val == DELEGATED_INO_AVAILABLE)
478 			return ino;
479 	}
480 	return 0;
481 }
482 
483 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
484 {
485 	return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
486 			 GFP_KERNEL);
487 }
488 #else /* BITS_PER_LONG == 64 */
489 /*
490  * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
491  * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
492  * and bottom words?
493  */
494 static int ceph_parse_deleg_inos(void **p, void *end,
495 				 struct ceph_mds_session *s)
496 {
497 	u32 sets;
498 
499 	ceph_decode_32_safe(p, end, sets, bad);
500 	if (sets)
501 		ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
502 	return 0;
503 bad:
504 	return -EIO;
505 }
506 
507 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
508 {
509 	return 0;
510 }
511 
512 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
513 {
514 	return 0;
515 }
516 #endif /* BITS_PER_LONG == 64 */
517 
518 /*
519  * parse create results
520  */
521 static int parse_reply_info_create(void **p, void *end,
522 				  struct ceph_mds_reply_info_parsed *info,
523 				  u64 features, struct ceph_mds_session *s)
524 {
525 	int ret;
526 
527 	if (features == (u64)-1 ||
528 	    (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
529 		if (*p == end) {
530 			/* Malformed reply? */
531 			info->has_create_ino = false;
532 		} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
533 			info->has_create_ino = true;
534 			/* struct_v, struct_compat, and len */
535 			ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
536 			ceph_decode_64_safe(p, end, info->ino, bad);
537 			ret = ceph_parse_deleg_inos(p, end, s);
538 			if (ret)
539 				return ret;
540 		} else {
541 			/* legacy */
542 			ceph_decode_64_safe(p, end, info->ino, bad);
543 			info->has_create_ino = true;
544 		}
545 	} else {
546 		if (*p != end)
547 			goto bad;
548 	}
549 
550 	/* Skip over any unrecognized fields */
551 	*p = end;
552 	return 0;
553 bad:
554 	return -EIO;
555 }
556 
557 /*
558  * parse extra results
559  */
560 static int parse_reply_info_extra(void **p, void *end,
561 				  struct ceph_mds_reply_info_parsed *info,
562 				  u64 features, struct ceph_mds_session *s)
563 {
564 	u32 op = le32_to_cpu(info->head->op);
565 
566 	if (op == CEPH_MDS_OP_GETFILELOCK)
567 		return parse_reply_info_filelock(p, end, info, features);
568 	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
569 		return parse_reply_info_readdir(p, end, info, features);
570 	else if (op == CEPH_MDS_OP_CREATE)
571 		return parse_reply_info_create(p, end, info, features, s);
572 	else
573 		return -EIO;
574 }
575 
576 /*
577  * parse entire mds reply
578  */
579 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
580 			    struct ceph_mds_reply_info_parsed *info,
581 			    u64 features)
582 {
583 	void *p, *end;
584 	u32 len;
585 	int err;
586 
587 	info->head = msg->front.iov_base;
588 	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
589 	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
590 
591 	/* trace */
592 	ceph_decode_32_safe(&p, end, len, bad);
593 	if (len > 0) {
594 		ceph_decode_need(&p, end, len, bad);
595 		err = parse_reply_info_trace(&p, p+len, info, features);
596 		if (err < 0)
597 			goto out_bad;
598 	}
599 
600 	/* extra */
601 	ceph_decode_32_safe(&p, end, len, bad);
602 	if (len > 0) {
603 		ceph_decode_need(&p, end, len, bad);
604 		err = parse_reply_info_extra(&p, p+len, info, features, s);
605 		if (err < 0)
606 			goto out_bad;
607 	}
608 
609 	/* snap blob */
610 	ceph_decode_32_safe(&p, end, len, bad);
611 	info->snapblob_len = len;
612 	info->snapblob = p;
613 	p += len;
614 
615 	if (p != end)
616 		goto bad;
617 	return 0;
618 
619 bad:
620 	err = -EIO;
621 out_bad:
622 	pr_err("mds parse_reply err %d\n", err);
623 	return err;
624 }
625 
626 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
627 {
628 	if (!info->dir_entries)
629 		return;
630 	free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
631 }
632 
633 
634 /*
635  * sessions
636  */
637 const char *ceph_session_state_name(int s)
638 {
639 	switch (s) {
640 	case CEPH_MDS_SESSION_NEW: return "new";
641 	case CEPH_MDS_SESSION_OPENING: return "opening";
642 	case CEPH_MDS_SESSION_OPEN: return "open";
643 	case CEPH_MDS_SESSION_HUNG: return "hung";
644 	case CEPH_MDS_SESSION_CLOSING: return "closing";
645 	case CEPH_MDS_SESSION_CLOSED: return "closed";
646 	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
647 	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
648 	case CEPH_MDS_SESSION_REJECTED: return "rejected";
649 	default: return "???";
650 	}
651 }
652 
653 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
654 {
655 	if (refcount_inc_not_zero(&s->s_ref)) {
656 		dout("mdsc get_session %p %d -> %d\n", s,
657 		     refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
658 		return s;
659 	} else {
660 		dout("mdsc get_session %p 0 -- FAIL\n", s);
661 		return NULL;
662 	}
663 }
664 
665 void ceph_put_mds_session(struct ceph_mds_session *s)
666 {
667 	if (IS_ERR_OR_NULL(s))
668 		return;
669 
670 	dout("mdsc put_session %p %d -> %d\n", s,
671 	     refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
672 	if (refcount_dec_and_test(&s->s_ref)) {
673 		if (s->s_auth.authorizer)
674 			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
675 		WARN_ON(mutex_is_locked(&s->s_mutex));
676 		xa_destroy(&s->s_delegated_inos);
677 		kfree(s);
678 	}
679 }
680 
681 /*
682  * called under mdsc->mutex
683  */
684 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
685 						   int mds)
686 {
687 	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
688 		return NULL;
689 	return ceph_get_mds_session(mdsc->sessions[mds]);
690 }
691 
692 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
693 {
694 	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
695 		return false;
696 	else
697 		return true;
698 }
699 
700 static int __verify_registered_session(struct ceph_mds_client *mdsc,
701 				       struct ceph_mds_session *s)
702 {
703 	if (s->s_mds >= mdsc->max_sessions ||
704 	    mdsc->sessions[s->s_mds] != s)
705 		return -ENOENT;
706 	return 0;
707 }
708 
709 /*
710  * create+register a new session for given mds.
711  * called under mdsc->mutex.
712  */
713 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
714 						 int mds)
715 {
716 	struct ceph_mds_session *s;
717 
718 	if (mds >= mdsc->mdsmap->possible_max_rank)
719 		return ERR_PTR(-EINVAL);
720 
721 	s = kzalloc(sizeof(*s), GFP_NOFS);
722 	if (!s)
723 		return ERR_PTR(-ENOMEM);
724 
725 	if (mds >= mdsc->max_sessions) {
726 		int newmax = 1 << get_count_order(mds + 1);
727 		struct ceph_mds_session **sa;
728 
729 		dout("%s: realloc to %d\n", __func__, newmax);
730 		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
731 		if (!sa)
732 			goto fail_realloc;
733 		if (mdsc->sessions) {
734 			memcpy(sa, mdsc->sessions,
735 			       mdsc->max_sessions * sizeof(void *));
736 			kfree(mdsc->sessions);
737 		}
738 		mdsc->sessions = sa;
739 		mdsc->max_sessions = newmax;
740 	}
741 
742 	dout("%s: mds%d\n", __func__, mds);
743 	s->s_mdsc = mdsc;
744 	s->s_mds = mds;
745 	s->s_state = CEPH_MDS_SESSION_NEW;
746 	s->s_ttl = 0;
747 	s->s_seq = 0;
748 	mutex_init(&s->s_mutex);
749 
750 	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
751 
752 	atomic_set(&s->s_cap_gen, 1);
753 	s->s_cap_ttl = jiffies - 1;
754 
755 	spin_lock_init(&s->s_cap_lock);
756 	s->s_renew_requested = 0;
757 	s->s_renew_seq = 0;
758 	INIT_LIST_HEAD(&s->s_caps);
759 	s->s_nr_caps = 0;
760 	refcount_set(&s->s_ref, 1);
761 	INIT_LIST_HEAD(&s->s_waiting);
762 	INIT_LIST_HEAD(&s->s_unsafe);
763 	xa_init(&s->s_delegated_inos);
764 	s->s_num_cap_releases = 0;
765 	s->s_cap_reconnect = 0;
766 	s->s_cap_iterator = NULL;
767 	INIT_LIST_HEAD(&s->s_cap_releases);
768 	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
769 
770 	INIT_LIST_HEAD(&s->s_cap_dirty);
771 	INIT_LIST_HEAD(&s->s_cap_flushing);
772 
773 	mdsc->sessions[mds] = s;
774 	atomic_inc(&mdsc->num_sessions);
775 	refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
776 
777 	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
778 		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
779 
780 	return s;
781 
782 fail_realloc:
783 	kfree(s);
784 	return ERR_PTR(-ENOMEM);
785 }
786 
787 /*
788  * called under mdsc->mutex
789  */
790 static void __unregister_session(struct ceph_mds_client *mdsc,
791 			       struct ceph_mds_session *s)
792 {
793 	dout("__unregister_session mds%d %p\n", s->s_mds, s);
794 	BUG_ON(mdsc->sessions[s->s_mds] != s);
795 	mdsc->sessions[s->s_mds] = NULL;
796 	ceph_con_close(&s->s_con);
797 	ceph_put_mds_session(s);
798 	atomic_dec(&mdsc->num_sessions);
799 }
800 
801 /*
802  * drop session refs in request.
803  *
804  * should be last request ref, or hold mdsc->mutex
805  */
806 static void put_request_session(struct ceph_mds_request *req)
807 {
808 	if (req->r_session) {
809 		ceph_put_mds_session(req->r_session);
810 		req->r_session = NULL;
811 	}
812 }
813 
814 void ceph_mdsc_release_request(struct kref *kref)
815 {
816 	struct ceph_mds_request *req = container_of(kref,
817 						    struct ceph_mds_request,
818 						    r_kref);
819 	ceph_mdsc_release_dir_caps_no_check(req);
820 	destroy_reply_info(&req->r_reply_info);
821 	if (req->r_request)
822 		ceph_msg_put(req->r_request);
823 	if (req->r_reply)
824 		ceph_msg_put(req->r_reply);
825 	if (req->r_inode) {
826 		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
827 		iput(req->r_inode);
828 	}
829 	if (req->r_parent) {
830 		ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
831 		iput(req->r_parent);
832 	}
833 	iput(req->r_target_inode);
834 	if (req->r_dentry)
835 		dput(req->r_dentry);
836 	if (req->r_old_dentry)
837 		dput(req->r_old_dentry);
838 	if (req->r_old_dentry_dir) {
839 		/*
840 		 * track (and drop pins for) r_old_dentry_dir
841 		 * separately, since r_old_dentry's d_parent may have
842 		 * changed between the dir mutex being dropped and
843 		 * this request being freed.
844 		 */
845 		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
846 				  CEPH_CAP_PIN);
847 		iput(req->r_old_dentry_dir);
848 	}
849 	kfree(req->r_path1);
850 	kfree(req->r_path2);
851 	put_cred(req->r_cred);
852 	if (req->r_pagelist)
853 		ceph_pagelist_release(req->r_pagelist);
854 	put_request_session(req);
855 	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
856 	WARN_ON_ONCE(!list_empty(&req->r_wait));
857 	kmem_cache_free(ceph_mds_request_cachep, req);
858 }
859 
860 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
861 
862 /*
863  * lookup session, bump ref if found.
864  *
865  * called under mdsc->mutex.
866  */
867 static struct ceph_mds_request *
868 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
869 {
870 	struct ceph_mds_request *req;
871 
872 	req = lookup_request(&mdsc->request_tree, tid);
873 	if (req)
874 		ceph_mdsc_get_request(req);
875 
876 	return req;
877 }
878 
879 /*
880  * Register an in-flight request, and assign a tid.  Link to directory
881  * are modifying (if any).
882  *
883  * Called under mdsc->mutex.
884  */
885 static void __register_request(struct ceph_mds_client *mdsc,
886 			       struct ceph_mds_request *req,
887 			       struct inode *dir)
888 {
889 	int ret = 0;
890 
891 	req->r_tid = ++mdsc->last_tid;
892 	if (req->r_num_caps) {
893 		ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
894 					req->r_num_caps);
895 		if (ret < 0) {
896 			pr_err("__register_request %p "
897 			       "failed to reserve caps: %d\n", req, ret);
898 			/* set req->r_err to fail early from __do_request */
899 			req->r_err = ret;
900 			return;
901 		}
902 	}
903 	dout("__register_request %p tid %lld\n", req, req->r_tid);
904 	ceph_mdsc_get_request(req);
905 	insert_request(&mdsc->request_tree, req);
906 
907 	req->r_cred = get_current_cred();
908 
909 	if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
910 		mdsc->oldest_tid = req->r_tid;
911 
912 	if (dir) {
913 		struct ceph_inode_info *ci = ceph_inode(dir);
914 
915 		ihold(dir);
916 		req->r_unsafe_dir = dir;
917 		spin_lock(&ci->i_unsafe_lock);
918 		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
919 		spin_unlock(&ci->i_unsafe_lock);
920 	}
921 }
922 
923 static void __unregister_request(struct ceph_mds_client *mdsc,
924 				 struct ceph_mds_request *req)
925 {
926 	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
927 
928 	/* Never leave an unregistered request on an unsafe list! */
929 	list_del_init(&req->r_unsafe_item);
930 
931 	if (req->r_tid == mdsc->oldest_tid) {
932 		struct rb_node *p = rb_next(&req->r_node);
933 		mdsc->oldest_tid = 0;
934 		while (p) {
935 			struct ceph_mds_request *next_req =
936 				rb_entry(p, struct ceph_mds_request, r_node);
937 			if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
938 				mdsc->oldest_tid = next_req->r_tid;
939 				break;
940 			}
941 			p = rb_next(p);
942 		}
943 	}
944 
945 	erase_request(&mdsc->request_tree, req);
946 
947 	if (req->r_unsafe_dir) {
948 		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
949 		spin_lock(&ci->i_unsafe_lock);
950 		list_del_init(&req->r_unsafe_dir_item);
951 		spin_unlock(&ci->i_unsafe_lock);
952 	}
953 	if (req->r_target_inode &&
954 	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
955 		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
956 		spin_lock(&ci->i_unsafe_lock);
957 		list_del_init(&req->r_unsafe_target_item);
958 		spin_unlock(&ci->i_unsafe_lock);
959 	}
960 
961 	if (req->r_unsafe_dir) {
962 		iput(req->r_unsafe_dir);
963 		req->r_unsafe_dir = NULL;
964 	}
965 
966 	complete_all(&req->r_safe_completion);
967 
968 	ceph_mdsc_put_request(req);
969 }
970 
971 /*
972  * Walk back up the dentry tree until we hit a dentry representing a
973  * non-snapshot inode. We do this using the rcu_read_lock (which must be held
974  * when calling this) to ensure that the objects won't disappear while we're
975  * working with them. Once we hit a candidate dentry, we attempt to take a
976  * reference to it, and return that as the result.
977  */
978 static struct inode *get_nonsnap_parent(struct dentry *dentry)
979 {
980 	struct inode *inode = NULL;
981 
982 	while (dentry && !IS_ROOT(dentry)) {
983 		inode = d_inode_rcu(dentry);
984 		if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
985 			break;
986 		dentry = dentry->d_parent;
987 	}
988 	if (inode)
989 		inode = igrab(inode);
990 	return inode;
991 }
992 
993 /*
994  * Choose mds to send request to next.  If there is a hint set in the
995  * request (e.g., due to a prior forward hint from the mds), use that.
996  * Otherwise, consult frag tree and/or caps to identify the
997  * appropriate mds.  If all else fails, choose randomly.
998  *
999  * Called under mdsc->mutex.
1000  */
1001 static int __choose_mds(struct ceph_mds_client *mdsc,
1002 			struct ceph_mds_request *req,
1003 			bool *random)
1004 {
1005 	struct inode *inode;
1006 	struct ceph_inode_info *ci;
1007 	struct ceph_cap *cap;
1008 	int mode = req->r_direct_mode;
1009 	int mds = -1;
1010 	u32 hash = req->r_direct_hash;
1011 	bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1012 
1013 	if (random)
1014 		*random = false;
1015 
1016 	/*
1017 	 * is there a specific mds we should try?  ignore hint if we have
1018 	 * no session and the mds is not up (active or recovering).
1019 	 */
1020 	if (req->r_resend_mds >= 0 &&
1021 	    (__have_session(mdsc, req->r_resend_mds) ||
1022 	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1023 		dout("%s using resend_mds mds%d\n", __func__,
1024 		     req->r_resend_mds);
1025 		return req->r_resend_mds;
1026 	}
1027 
1028 	if (mode == USE_RANDOM_MDS)
1029 		goto random;
1030 
1031 	inode = NULL;
1032 	if (req->r_inode) {
1033 		if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1034 			inode = req->r_inode;
1035 			ihold(inode);
1036 		} else {
1037 			/* req->r_dentry is non-null for LSSNAP request */
1038 			rcu_read_lock();
1039 			inode = get_nonsnap_parent(req->r_dentry);
1040 			rcu_read_unlock();
1041 			dout("%s using snapdir's parent %p\n", __func__, inode);
1042 		}
1043 	} else if (req->r_dentry) {
1044 		/* ignore race with rename; old or new d_parent is okay */
1045 		struct dentry *parent;
1046 		struct inode *dir;
1047 
1048 		rcu_read_lock();
1049 		parent = READ_ONCE(req->r_dentry->d_parent);
1050 		dir = req->r_parent ? : d_inode_rcu(parent);
1051 
1052 		if (!dir || dir->i_sb != mdsc->fsc->sb) {
1053 			/*  not this fs or parent went negative */
1054 			inode = d_inode(req->r_dentry);
1055 			if (inode)
1056 				ihold(inode);
1057 		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
1058 			/* direct snapped/virtual snapdir requests
1059 			 * based on parent dir inode */
1060 			inode = get_nonsnap_parent(parent);
1061 			dout("%s using nonsnap parent %p\n", __func__, inode);
1062 		} else {
1063 			/* dentry target */
1064 			inode = d_inode(req->r_dentry);
1065 			if (!inode || mode == USE_AUTH_MDS) {
1066 				/* dir + name */
1067 				inode = igrab(dir);
1068 				hash = ceph_dentry_hash(dir, req->r_dentry);
1069 				is_hash = true;
1070 			} else {
1071 				ihold(inode);
1072 			}
1073 		}
1074 		rcu_read_unlock();
1075 	}
1076 
1077 	dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1078 	     hash, mode);
1079 	if (!inode)
1080 		goto random;
1081 	ci = ceph_inode(inode);
1082 
1083 	if (is_hash && S_ISDIR(inode->i_mode)) {
1084 		struct ceph_inode_frag frag;
1085 		int found;
1086 
1087 		ceph_choose_frag(ci, hash, &frag, &found);
1088 		if (found) {
1089 			if (mode == USE_ANY_MDS && frag.ndist > 0) {
1090 				u8 r;
1091 
1092 				/* choose a random replica */
1093 				get_random_bytes(&r, 1);
1094 				r %= frag.ndist;
1095 				mds = frag.dist[r];
1096 				dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1097 				     __func__, inode, ceph_vinop(inode),
1098 				     frag.frag, mds, (int)r, frag.ndist);
1099 				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1100 				    CEPH_MDS_STATE_ACTIVE &&
1101 				    !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1102 					goto out;
1103 			}
1104 
1105 			/* since this file/dir wasn't known to be
1106 			 * replicated, then we want to look for the
1107 			 * authoritative mds. */
1108 			if (frag.mds >= 0) {
1109 				/* choose auth mds */
1110 				mds = frag.mds;
1111 				dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1112 				     __func__, inode, ceph_vinop(inode),
1113 				     frag.frag, mds);
1114 				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1115 				    CEPH_MDS_STATE_ACTIVE) {
1116 					if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1117 								  mds))
1118 						goto out;
1119 				}
1120 			}
1121 			mode = USE_AUTH_MDS;
1122 		}
1123 	}
1124 
1125 	spin_lock(&ci->i_ceph_lock);
1126 	cap = NULL;
1127 	if (mode == USE_AUTH_MDS)
1128 		cap = ci->i_auth_cap;
1129 	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1130 		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1131 	if (!cap) {
1132 		spin_unlock(&ci->i_ceph_lock);
1133 		iput(inode);
1134 		goto random;
1135 	}
1136 	mds = cap->session->s_mds;
1137 	dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1138 	     inode, ceph_vinop(inode), mds,
1139 	     cap == ci->i_auth_cap ? "auth " : "", cap);
1140 	spin_unlock(&ci->i_ceph_lock);
1141 out:
1142 	iput(inode);
1143 	return mds;
1144 
1145 random:
1146 	if (random)
1147 		*random = true;
1148 
1149 	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1150 	dout("%s chose random mds%d\n", __func__, mds);
1151 	return mds;
1152 }
1153 
1154 
1155 /*
1156  * session messages
1157  */
1158 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
1159 {
1160 	struct ceph_msg *msg;
1161 	struct ceph_mds_session_head *h;
1162 
1163 	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1164 			   false);
1165 	if (!msg) {
1166 		pr_err("create_session_msg ENOMEM creating msg\n");
1167 		return NULL;
1168 	}
1169 	h = msg->front.iov_base;
1170 	h->op = cpu_to_le32(op);
1171 	h->seq = cpu_to_le64(seq);
1172 
1173 	return msg;
1174 }
1175 
1176 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1177 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1178 static int encode_supported_features(void **p, void *end)
1179 {
1180 	static const size_t count = ARRAY_SIZE(feature_bits);
1181 
1182 	if (count > 0) {
1183 		size_t i;
1184 		size_t size = FEATURE_BYTES(count);
1185 
1186 		if (WARN_ON_ONCE(*p + 4 + size > end))
1187 			return -ERANGE;
1188 
1189 		ceph_encode_32(p, size);
1190 		memset(*p, 0, size);
1191 		for (i = 0; i < count; i++)
1192 			((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
1193 		*p += size;
1194 	} else {
1195 		if (WARN_ON_ONCE(*p + 4 > end))
1196 			return -ERANGE;
1197 
1198 		ceph_encode_32(p, 0);
1199 	}
1200 
1201 	return 0;
1202 }
1203 
1204 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1205 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1206 static int encode_metric_spec(void **p, void *end)
1207 {
1208 	static const size_t count = ARRAY_SIZE(metric_bits);
1209 
1210 	/* header */
1211 	if (WARN_ON_ONCE(*p + 2 > end))
1212 		return -ERANGE;
1213 
1214 	ceph_encode_8(p, 1); /* version */
1215 	ceph_encode_8(p, 1); /* compat */
1216 
1217 	if (count > 0) {
1218 		size_t i;
1219 		size_t size = METRIC_BYTES(count);
1220 
1221 		if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1222 			return -ERANGE;
1223 
1224 		/* metric spec info length */
1225 		ceph_encode_32(p, 4 + size);
1226 
1227 		/* metric spec */
1228 		ceph_encode_32(p, size);
1229 		memset(*p, 0, size);
1230 		for (i = 0; i < count; i++)
1231 			((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1232 		*p += size;
1233 	} else {
1234 		if (WARN_ON_ONCE(*p + 4 + 4 > end))
1235 			return -ERANGE;
1236 
1237 		/* metric spec info length */
1238 		ceph_encode_32(p, 4);
1239 		/* metric spec */
1240 		ceph_encode_32(p, 0);
1241 	}
1242 
1243 	return 0;
1244 }
1245 
1246 /*
1247  * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1248  * to include additional client metadata fields.
1249  */
1250 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1251 {
1252 	struct ceph_msg *msg;
1253 	struct ceph_mds_session_head *h;
1254 	int i;
1255 	int extra_bytes = 0;
1256 	int metadata_key_count = 0;
1257 	struct ceph_options *opt = mdsc->fsc->client->options;
1258 	struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1259 	size_t size, count;
1260 	void *p, *end;
1261 	int ret;
1262 
1263 	const char* metadata[][2] = {
1264 		{"hostname", mdsc->nodename},
1265 		{"kernel_version", init_utsname()->release},
1266 		{"entity_id", opt->name ? : ""},
1267 		{"root", fsopt->server_path ? : "/"},
1268 		{NULL, NULL}
1269 	};
1270 
1271 	/* Calculate serialized length of metadata */
1272 	extra_bytes = 4;  /* map length */
1273 	for (i = 0; metadata[i][0]; ++i) {
1274 		extra_bytes += 8 + strlen(metadata[i][0]) +
1275 			strlen(metadata[i][1]);
1276 		metadata_key_count++;
1277 	}
1278 
1279 	/* supported feature */
1280 	size = 0;
1281 	count = ARRAY_SIZE(feature_bits);
1282 	if (count > 0)
1283 		size = FEATURE_BYTES(count);
1284 	extra_bytes += 4 + size;
1285 
1286 	/* metric spec */
1287 	size = 0;
1288 	count = ARRAY_SIZE(metric_bits);
1289 	if (count > 0)
1290 		size = METRIC_BYTES(count);
1291 	extra_bytes += 2 + 4 + 4 + size;
1292 
1293 	/* Allocate the message */
1294 	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1295 			   GFP_NOFS, false);
1296 	if (!msg) {
1297 		pr_err("create_session_msg ENOMEM creating msg\n");
1298 		return ERR_PTR(-ENOMEM);
1299 	}
1300 	p = msg->front.iov_base;
1301 	end = p + msg->front.iov_len;
1302 
1303 	h = p;
1304 	h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1305 	h->seq = cpu_to_le64(seq);
1306 
1307 	/*
1308 	 * Serialize client metadata into waiting buffer space, using
1309 	 * the format that userspace expects for map<string, string>
1310 	 *
1311 	 * ClientSession messages with metadata are v4
1312 	 */
1313 	msg->hdr.version = cpu_to_le16(4);
1314 	msg->hdr.compat_version = cpu_to_le16(1);
1315 
1316 	/* The write pointer, following the session_head structure */
1317 	p += sizeof(*h);
1318 
1319 	/* Number of entries in the map */
1320 	ceph_encode_32(&p, metadata_key_count);
1321 
1322 	/* Two length-prefixed strings for each entry in the map */
1323 	for (i = 0; metadata[i][0]; ++i) {
1324 		size_t const key_len = strlen(metadata[i][0]);
1325 		size_t const val_len = strlen(metadata[i][1]);
1326 
1327 		ceph_encode_32(&p, key_len);
1328 		memcpy(p, metadata[i][0], key_len);
1329 		p += key_len;
1330 		ceph_encode_32(&p, val_len);
1331 		memcpy(p, metadata[i][1], val_len);
1332 		p += val_len;
1333 	}
1334 
1335 	ret = encode_supported_features(&p, end);
1336 	if (ret) {
1337 		pr_err("encode_supported_features failed!\n");
1338 		ceph_msg_put(msg);
1339 		return ERR_PTR(ret);
1340 	}
1341 
1342 	ret = encode_metric_spec(&p, end);
1343 	if (ret) {
1344 		pr_err("encode_metric_spec failed!\n");
1345 		ceph_msg_put(msg);
1346 		return ERR_PTR(ret);
1347 	}
1348 
1349 	msg->front.iov_len = p - msg->front.iov_base;
1350 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1351 
1352 	return msg;
1353 }
1354 
1355 /*
1356  * send session open request.
1357  *
1358  * called under mdsc->mutex
1359  */
1360 static int __open_session(struct ceph_mds_client *mdsc,
1361 			  struct ceph_mds_session *session)
1362 {
1363 	struct ceph_msg *msg;
1364 	int mstate;
1365 	int mds = session->s_mds;
1366 
1367 	/* wait for mds to go active? */
1368 	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1369 	dout("open_session to mds%d (%s)\n", mds,
1370 	     ceph_mds_state_name(mstate));
1371 	session->s_state = CEPH_MDS_SESSION_OPENING;
1372 	session->s_renew_requested = jiffies;
1373 
1374 	/* send connect message */
1375 	msg = create_session_open_msg(mdsc, session->s_seq);
1376 	if (IS_ERR(msg))
1377 		return PTR_ERR(msg);
1378 	ceph_con_send(&session->s_con, msg);
1379 	return 0;
1380 }
1381 
1382 /*
1383  * open sessions for any export targets for the given mds
1384  *
1385  * called under mdsc->mutex
1386  */
1387 static struct ceph_mds_session *
1388 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1389 {
1390 	struct ceph_mds_session *session;
1391 	int ret;
1392 
1393 	session = __ceph_lookup_mds_session(mdsc, target);
1394 	if (!session) {
1395 		session = register_session(mdsc, target);
1396 		if (IS_ERR(session))
1397 			return session;
1398 	}
1399 	if (session->s_state == CEPH_MDS_SESSION_NEW ||
1400 	    session->s_state == CEPH_MDS_SESSION_CLOSING) {
1401 		ret = __open_session(mdsc, session);
1402 		if (ret)
1403 			return ERR_PTR(ret);
1404 	}
1405 
1406 	return session;
1407 }
1408 
1409 struct ceph_mds_session *
1410 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1411 {
1412 	struct ceph_mds_session *session;
1413 
1414 	dout("open_export_target_session to mds%d\n", target);
1415 
1416 	mutex_lock(&mdsc->mutex);
1417 	session = __open_export_target_session(mdsc, target);
1418 	mutex_unlock(&mdsc->mutex);
1419 
1420 	return session;
1421 }
1422 
1423 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1424 					  struct ceph_mds_session *session)
1425 {
1426 	struct ceph_mds_info *mi;
1427 	struct ceph_mds_session *ts;
1428 	int i, mds = session->s_mds;
1429 
1430 	if (mds >= mdsc->mdsmap->possible_max_rank)
1431 		return;
1432 
1433 	mi = &mdsc->mdsmap->m_info[mds];
1434 	dout("open_export_target_sessions for mds%d (%d targets)\n",
1435 	     session->s_mds, mi->num_export_targets);
1436 
1437 	for (i = 0; i < mi->num_export_targets; i++) {
1438 		ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1439 		ceph_put_mds_session(ts);
1440 	}
1441 }
1442 
1443 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1444 					   struct ceph_mds_session *session)
1445 {
1446 	mutex_lock(&mdsc->mutex);
1447 	__open_export_target_sessions(mdsc, session);
1448 	mutex_unlock(&mdsc->mutex);
1449 }
1450 
1451 /*
1452  * session caps
1453  */
1454 
1455 static void detach_cap_releases(struct ceph_mds_session *session,
1456 				struct list_head *target)
1457 {
1458 	lockdep_assert_held(&session->s_cap_lock);
1459 
1460 	list_splice_init(&session->s_cap_releases, target);
1461 	session->s_num_cap_releases = 0;
1462 	dout("dispose_cap_releases mds%d\n", session->s_mds);
1463 }
1464 
1465 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1466 				 struct list_head *dispose)
1467 {
1468 	while (!list_empty(dispose)) {
1469 		struct ceph_cap *cap;
1470 		/* zero out the in-progress message */
1471 		cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1472 		list_del(&cap->session_caps);
1473 		ceph_put_cap(mdsc, cap);
1474 	}
1475 }
1476 
1477 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1478 				     struct ceph_mds_session *session)
1479 {
1480 	struct ceph_mds_request *req;
1481 	struct rb_node *p;
1482 	struct ceph_inode_info *ci;
1483 
1484 	dout("cleanup_session_requests mds%d\n", session->s_mds);
1485 	mutex_lock(&mdsc->mutex);
1486 	while (!list_empty(&session->s_unsafe)) {
1487 		req = list_first_entry(&session->s_unsafe,
1488 				       struct ceph_mds_request, r_unsafe_item);
1489 		pr_warn_ratelimited(" dropping unsafe request %llu\n",
1490 				    req->r_tid);
1491 		if (req->r_target_inode) {
1492 			/* dropping unsafe change of inode's attributes */
1493 			ci = ceph_inode(req->r_target_inode);
1494 			errseq_set(&ci->i_meta_err, -EIO);
1495 		}
1496 		if (req->r_unsafe_dir) {
1497 			/* dropping unsafe directory operation */
1498 			ci = ceph_inode(req->r_unsafe_dir);
1499 			errseq_set(&ci->i_meta_err, -EIO);
1500 		}
1501 		__unregister_request(mdsc, req);
1502 	}
1503 	/* zero r_attempts, so kick_requests() will re-send requests */
1504 	p = rb_first(&mdsc->request_tree);
1505 	while (p) {
1506 		req = rb_entry(p, struct ceph_mds_request, r_node);
1507 		p = rb_next(p);
1508 		if (req->r_session &&
1509 		    req->r_session->s_mds == session->s_mds)
1510 			req->r_attempts = 0;
1511 	}
1512 	mutex_unlock(&mdsc->mutex);
1513 }
1514 
1515 /*
1516  * Helper to safely iterate over all caps associated with a session, with
1517  * special care taken to handle a racing __ceph_remove_cap().
1518  *
1519  * Caller must hold session s_mutex.
1520  */
1521 int ceph_iterate_session_caps(struct ceph_mds_session *session,
1522 			      int (*cb)(struct inode *, struct ceph_cap *,
1523 					void *), void *arg)
1524 {
1525 	struct list_head *p;
1526 	struct ceph_cap *cap;
1527 	struct inode *inode, *last_inode = NULL;
1528 	struct ceph_cap *old_cap = NULL;
1529 	int ret;
1530 
1531 	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1532 	spin_lock(&session->s_cap_lock);
1533 	p = session->s_caps.next;
1534 	while (p != &session->s_caps) {
1535 		cap = list_entry(p, struct ceph_cap, session_caps);
1536 		inode = igrab(&cap->ci->vfs_inode);
1537 		if (!inode) {
1538 			p = p->next;
1539 			continue;
1540 		}
1541 		session->s_cap_iterator = cap;
1542 		spin_unlock(&session->s_cap_lock);
1543 
1544 		if (last_inode) {
1545 			iput(last_inode);
1546 			last_inode = NULL;
1547 		}
1548 		if (old_cap) {
1549 			ceph_put_cap(session->s_mdsc, old_cap);
1550 			old_cap = NULL;
1551 		}
1552 
1553 		ret = cb(inode, cap, arg);
1554 		last_inode = inode;
1555 
1556 		spin_lock(&session->s_cap_lock);
1557 		p = p->next;
1558 		if (!cap->ci) {
1559 			dout("iterate_session_caps  finishing cap %p removal\n",
1560 			     cap);
1561 			BUG_ON(cap->session != session);
1562 			cap->session = NULL;
1563 			list_del_init(&cap->session_caps);
1564 			session->s_nr_caps--;
1565 			atomic64_dec(&session->s_mdsc->metric.total_caps);
1566 			if (cap->queue_release)
1567 				__ceph_queue_cap_release(session, cap);
1568 			else
1569 				old_cap = cap;  /* put_cap it w/o locks held */
1570 		}
1571 		if (ret < 0)
1572 			goto out;
1573 	}
1574 	ret = 0;
1575 out:
1576 	session->s_cap_iterator = NULL;
1577 	spin_unlock(&session->s_cap_lock);
1578 
1579 	iput(last_inode);
1580 	if (old_cap)
1581 		ceph_put_cap(session->s_mdsc, old_cap);
1582 
1583 	return ret;
1584 }
1585 
1586 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1587 				  void *arg)
1588 {
1589 	struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1590 	struct ceph_inode_info *ci = ceph_inode(inode);
1591 	LIST_HEAD(to_remove);
1592 	bool dirty_dropped = false;
1593 	bool invalidate = false;
1594 
1595 	dout("removing cap %p, ci is %p, inode is %p\n",
1596 	     cap, ci, &ci->vfs_inode);
1597 	spin_lock(&ci->i_ceph_lock);
1598 	__ceph_remove_cap(cap, false);
1599 	if (!ci->i_auth_cap) {
1600 		struct ceph_cap_flush *cf;
1601 		struct ceph_mds_client *mdsc = fsc->mdsc;
1602 
1603 		if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
1604 			if (inode->i_data.nrpages > 0)
1605 				invalidate = true;
1606 			if (ci->i_wrbuffer_ref > 0)
1607 				mapping_set_error(&inode->i_data, -EIO);
1608 		}
1609 
1610 		while (!list_empty(&ci->i_cap_flush_list)) {
1611 			cf = list_first_entry(&ci->i_cap_flush_list,
1612 					      struct ceph_cap_flush, i_list);
1613 			list_move(&cf->i_list, &to_remove);
1614 		}
1615 
1616 		spin_lock(&mdsc->cap_dirty_lock);
1617 
1618 		list_for_each_entry(cf, &to_remove, i_list)
1619 			list_del(&cf->g_list);
1620 
1621 		if (!list_empty(&ci->i_dirty_item)) {
1622 			pr_warn_ratelimited(
1623 				" dropping dirty %s state for %p %lld\n",
1624 				ceph_cap_string(ci->i_dirty_caps),
1625 				inode, ceph_ino(inode));
1626 			ci->i_dirty_caps = 0;
1627 			list_del_init(&ci->i_dirty_item);
1628 			dirty_dropped = true;
1629 		}
1630 		if (!list_empty(&ci->i_flushing_item)) {
1631 			pr_warn_ratelimited(
1632 				" dropping dirty+flushing %s state for %p %lld\n",
1633 				ceph_cap_string(ci->i_flushing_caps),
1634 				inode, ceph_ino(inode));
1635 			ci->i_flushing_caps = 0;
1636 			list_del_init(&ci->i_flushing_item);
1637 			mdsc->num_cap_flushing--;
1638 			dirty_dropped = true;
1639 		}
1640 		spin_unlock(&mdsc->cap_dirty_lock);
1641 
1642 		if (dirty_dropped) {
1643 			errseq_set(&ci->i_meta_err, -EIO);
1644 
1645 			if (ci->i_wrbuffer_ref_head == 0 &&
1646 			    ci->i_wr_ref == 0 &&
1647 			    ci->i_dirty_caps == 0 &&
1648 			    ci->i_flushing_caps == 0) {
1649 				ceph_put_snap_context(ci->i_head_snapc);
1650 				ci->i_head_snapc = NULL;
1651 			}
1652 		}
1653 
1654 		if (atomic_read(&ci->i_filelock_ref) > 0) {
1655 			/* make further file lock syscall return -EIO */
1656 			ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1657 			pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1658 					    inode, ceph_ino(inode));
1659 		}
1660 
1661 		if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1662 			list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1663 			ci->i_prealloc_cap_flush = NULL;
1664 		}
1665 	}
1666 	spin_unlock(&ci->i_ceph_lock);
1667 	while (!list_empty(&to_remove)) {
1668 		struct ceph_cap_flush *cf;
1669 		cf = list_first_entry(&to_remove,
1670 				      struct ceph_cap_flush, i_list);
1671 		list_del(&cf->i_list);
1672 		ceph_free_cap_flush(cf);
1673 	}
1674 
1675 	wake_up_all(&ci->i_cap_wq);
1676 	if (invalidate)
1677 		ceph_queue_invalidate(inode);
1678 	if (dirty_dropped)
1679 		iput(inode);
1680 	return 0;
1681 }
1682 
1683 /*
1684  * caller must hold session s_mutex
1685  */
1686 static void remove_session_caps(struct ceph_mds_session *session)
1687 {
1688 	struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1689 	struct super_block *sb = fsc->sb;
1690 	LIST_HEAD(dispose);
1691 
1692 	dout("remove_session_caps on %p\n", session);
1693 	ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1694 
1695 	wake_up_all(&fsc->mdsc->cap_flushing_wq);
1696 
1697 	spin_lock(&session->s_cap_lock);
1698 	if (session->s_nr_caps > 0) {
1699 		struct inode *inode;
1700 		struct ceph_cap *cap, *prev = NULL;
1701 		struct ceph_vino vino;
1702 		/*
1703 		 * iterate_session_caps() skips inodes that are being
1704 		 * deleted, we need to wait until deletions are complete.
1705 		 * __wait_on_freeing_inode() is designed for the job,
1706 		 * but it is not exported, so use lookup inode function
1707 		 * to access it.
1708 		 */
1709 		while (!list_empty(&session->s_caps)) {
1710 			cap = list_entry(session->s_caps.next,
1711 					 struct ceph_cap, session_caps);
1712 			if (cap == prev)
1713 				break;
1714 			prev = cap;
1715 			vino = cap->ci->i_vino;
1716 			spin_unlock(&session->s_cap_lock);
1717 
1718 			inode = ceph_find_inode(sb, vino);
1719 			iput(inode);
1720 
1721 			spin_lock(&session->s_cap_lock);
1722 		}
1723 	}
1724 
1725 	// drop cap expires and unlock s_cap_lock
1726 	detach_cap_releases(session, &dispose);
1727 
1728 	BUG_ON(session->s_nr_caps > 0);
1729 	BUG_ON(!list_empty(&session->s_cap_flushing));
1730 	spin_unlock(&session->s_cap_lock);
1731 	dispose_cap_releases(session->s_mdsc, &dispose);
1732 }
1733 
1734 enum {
1735 	RECONNECT,
1736 	RENEWCAPS,
1737 	FORCE_RO,
1738 };
1739 
1740 /*
1741  * wake up any threads waiting on this session's caps.  if the cap is
1742  * old (didn't get renewed on the client reconnect), remove it now.
1743  *
1744  * caller must hold s_mutex.
1745  */
1746 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1747 			      void *arg)
1748 {
1749 	struct ceph_inode_info *ci = ceph_inode(inode);
1750 	unsigned long ev = (unsigned long)arg;
1751 
1752 	if (ev == RECONNECT) {
1753 		spin_lock(&ci->i_ceph_lock);
1754 		ci->i_wanted_max_size = 0;
1755 		ci->i_requested_max_size = 0;
1756 		spin_unlock(&ci->i_ceph_lock);
1757 	} else if (ev == RENEWCAPS) {
1758 		if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) {
1759 			/* mds did not re-issue stale cap */
1760 			spin_lock(&ci->i_ceph_lock);
1761 			cap->issued = cap->implemented = CEPH_CAP_PIN;
1762 			spin_unlock(&ci->i_ceph_lock);
1763 		}
1764 	} else if (ev == FORCE_RO) {
1765 	}
1766 	wake_up_all(&ci->i_cap_wq);
1767 	return 0;
1768 }
1769 
1770 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1771 {
1772 	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1773 	ceph_iterate_session_caps(session, wake_up_session_cb,
1774 				  (void *)(unsigned long)ev);
1775 }
1776 
1777 /*
1778  * Send periodic message to MDS renewing all currently held caps.  The
1779  * ack will reset the expiration for all caps from this session.
1780  *
1781  * caller holds s_mutex
1782  */
1783 static int send_renew_caps(struct ceph_mds_client *mdsc,
1784 			   struct ceph_mds_session *session)
1785 {
1786 	struct ceph_msg *msg;
1787 	int state;
1788 
1789 	if (time_after_eq(jiffies, session->s_cap_ttl) &&
1790 	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1791 		pr_info("mds%d caps stale\n", session->s_mds);
1792 	session->s_renew_requested = jiffies;
1793 
1794 	/* do not try to renew caps until a recovering mds has reconnected
1795 	 * with its clients. */
1796 	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1797 	if (state < CEPH_MDS_STATE_RECONNECT) {
1798 		dout("send_renew_caps ignoring mds%d (%s)\n",
1799 		     session->s_mds, ceph_mds_state_name(state));
1800 		return 0;
1801 	}
1802 
1803 	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1804 		ceph_mds_state_name(state));
1805 	msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1806 				 ++session->s_renew_seq);
1807 	if (!msg)
1808 		return -ENOMEM;
1809 	ceph_con_send(&session->s_con, msg);
1810 	return 0;
1811 }
1812 
1813 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1814 			     struct ceph_mds_session *session, u64 seq)
1815 {
1816 	struct ceph_msg *msg;
1817 
1818 	dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1819 	     session->s_mds, ceph_session_state_name(session->s_state), seq);
1820 	msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1821 	if (!msg)
1822 		return -ENOMEM;
1823 	ceph_con_send(&session->s_con, msg);
1824 	return 0;
1825 }
1826 
1827 
1828 /*
1829  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1830  *
1831  * Called under session->s_mutex
1832  */
1833 static void renewed_caps(struct ceph_mds_client *mdsc,
1834 			 struct ceph_mds_session *session, int is_renew)
1835 {
1836 	int was_stale;
1837 	int wake = 0;
1838 
1839 	spin_lock(&session->s_cap_lock);
1840 	was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1841 
1842 	session->s_cap_ttl = session->s_renew_requested +
1843 		mdsc->mdsmap->m_session_timeout*HZ;
1844 
1845 	if (was_stale) {
1846 		if (time_before(jiffies, session->s_cap_ttl)) {
1847 			pr_info("mds%d caps renewed\n", session->s_mds);
1848 			wake = 1;
1849 		} else {
1850 			pr_info("mds%d caps still stale\n", session->s_mds);
1851 		}
1852 	}
1853 	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1854 	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1855 	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1856 	spin_unlock(&session->s_cap_lock);
1857 
1858 	if (wake)
1859 		wake_up_session_caps(session, RENEWCAPS);
1860 }
1861 
1862 /*
1863  * send a session close request
1864  */
1865 static int request_close_session(struct ceph_mds_session *session)
1866 {
1867 	struct ceph_msg *msg;
1868 
1869 	dout("request_close_session mds%d state %s seq %lld\n",
1870 	     session->s_mds, ceph_session_state_name(session->s_state),
1871 	     session->s_seq);
1872 	msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1873 	if (!msg)
1874 		return -ENOMEM;
1875 	ceph_con_send(&session->s_con, msg);
1876 	return 1;
1877 }
1878 
1879 /*
1880  * Called with s_mutex held.
1881  */
1882 static int __close_session(struct ceph_mds_client *mdsc,
1883 			 struct ceph_mds_session *session)
1884 {
1885 	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1886 		return 0;
1887 	session->s_state = CEPH_MDS_SESSION_CLOSING;
1888 	return request_close_session(session);
1889 }
1890 
1891 static bool drop_negative_children(struct dentry *dentry)
1892 {
1893 	struct dentry *child;
1894 	bool all_negative = true;
1895 
1896 	if (!d_is_dir(dentry))
1897 		goto out;
1898 
1899 	spin_lock(&dentry->d_lock);
1900 	list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1901 		if (d_really_is_positive(child)) {
1902 			all_negative = false;
1903 			break;
1904 		}
1905 	}
1906 	spin_unlock(&dentry->d_lock);
1907 
1908 	if (all_negative)
1909 		shrink_dcache_parent(dentry);
1910 out:
1911 	return all_negative;
1912 }
1913 
1914 /*
1915  * Trim old(er) caps.
1916  *
1917  * Because we can't cache an inode without one or more caps, we do
1918  * this indirectly: if a cap is unused, we prune its aliases, at which
1919  * point the inode will hopefully get dropped to.
1920  *
1921  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1922  * memory pressure from the MDS, though, so it needn't be perfect.
1923  */
1924 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1925 {
1926 	int *remaining = arg;
1927 	struct ceph_inode_info *ci = ceph_inode(inode);
1928 	int used, wanted, oissued, mine;
1929 
1930 	if (*remaining <= 0)
1931 		return -1;
1932 
1933 	spin_lock(&ci->i_ceph_lock);
1934 	mine = cap->issued | cap->implemented;
1935 	used = __ceph_caps_used(ci);
1936 	wanted = __ceph_caps_file_wanted(ci);
1937 	oissued = __ceph_caps_issued_other(ci, cap);
1938 
1939 	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1940 	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1941 	     ceph_cap_string(used), ceph_cap_string(wanted));
1942 	if (cap == ci->i_auth_cap) {
1943 		if (ci->i_dirty_caps || ci->i_flushing_caps ||
1944 		    !list_empty(&ci->i_cap_snaps))
1945 			goto out;
1946 		if ((used | wanted) & CEPH_CAP_ANY_WR)
1947 			goto out;
1948 		/* Note: it's possible that i_filelock_ref becomes non-zero
1949 		 * after dropping auth caps. It doesn't hurt because reply
1950 		 * of lock mds request will re-add auth caps. */
1951 		if (atomic_read(&ci->i_filelock_ref) > 0)
1952 			goto out;
1953 	}
1954 	/* The inode has cached pages, but it's no longer used.
1955 	 * we can safely drop it */
1956 	if (S_ISREG(inode->i_mode) &&
1957 	    wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1958 	    !(oissued & CEPH_CAP_FILE_CACHE)) {
1959 	  used = 0;
1960 	  oissued = 0;
1961 	}
1962 	if ((used | wanted) & ~oissued & mine)
1963 		goto out;   /* we need these caps */
1964 
1965 	if (oissued) {
1966 		/* we aren't the only cap.. just remove us */
1967 		__ceph_remove_cap(cap, true);
1968 		(*remaining)--;
1969 	} else {
1970 		struct dentry *dentry;
1971 		/* try dropping referring dentries */
1972 		spin_unlock(&ci->i_ceph_lock);
1973 		dentry = d_find_any_alias(inode);
1974 		if (dentry && drop_negative_children(dentry)) {
1975 			int count;
1976 			dput(dentry);
1977 			d_prune_aliases(inode);
1978 			count = atomic_read(&inode->i_count);
1979 			if (count == 1)
1980 				(*remaining)--;
1981 			dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1982 			     inode, cap, count);
1983 		} else {
1984 			dput(dentry);
1985 		}
1986 		return 0;
1987 	}
1988 
1989 out:
1990 	spin_unlock(&ci->i_ceph_lock);
1991 	return 0;
1992 }
1993 
1994 /*
1995  * Trim session cap count down to some max number.
1996  */
1997 int ceph_trim_caps(struct ceph_mds_client *mdsc,
1998 		   struct ceph_mds_session *session,
1999 		   int max_caps)
2000 {
2001 	int trim_caps = session->s_nr_caps - max_caps;
2002 
2003 	dout("trim_caps mds%d start: %d / %d, trim %d\n",
2004 	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2005 	if (trim_caps > 0) {
2006 		int remaining = trim_caps;
2007 
2008 		ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2009 		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2010 		     session->s_mds, session->s_nr_caps, max_caps,
2011 			trim_caps - remaining);
2012 	}
2013 
2014 	ceph_flush_cap_releases(mdsc, session);
2015 	return 0;
2016 }
2017 
2018 static int check_caps_flush(struct ceph_mds_client *mdsc,
2019 			    u64 want_flush_tid)
2020 {
2021 	int ret = 1;
2022 
2023 	spin_lock(&mdsc->cap_dirty_lock);
2024 	if (!list_empty(&mdsc->cap_flush_list)) {
2025 		struct ceph_cap_flush *cf =
2026 			list_first_entry(&mdsc->cap_flush_list,
2027 					 struct ceph_cap_flush, g_list);
2028 		if (cf->tid <= want_flush_tid) {
2029 			dout("check_caps_flush still flushing tid "
2030 			     "%llu <= %llu\n", cf->tid, want_flush_tid);
2031 			ret = 0;
2032 		}
2033 	}
2034 	spin_unlock(&mdsc->cap_dirty_lock);
2035 	return ret;
2036 }
2037 
2038 /*
2039  * flush all dirty inode data to disk.
2040  *
2041  * returns true if we've flushed through want_flush_tid
2042  */
2043 static void wait_caps_flush(struct ceph_mds_client *mdsc,
2044 			    u64 want_flush_tid)
2045 {
2046 	dout("check_caps_flush want %llu\n", want_flush_tid);
2047 
2048 	wait_event(mdsc->cap_flushing_wq,
2049 		   check_caps_flush(mdsc, want_flush_tid));
2050 
2051 	dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2052 }
2053 
2054 /*
2055  * called under s_mutex
2056  */
2057 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2058 				   struct ceph_mds_session *session)
2059 {
2060 	struct ceph_msg *msg = NULL;
2061 	struct ceph_mds_cap_release *head;
2062 	struct ceph_mds_cap_item *item;
2063 	struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2064 	struct ceph_cap *cap;
2065 	LIST_HEAD(tmp_list);
2066 	int num_cap_releases;
2067 	__le32	barrier, *cap_barrier;
2068 
2069 	down_read(&osdc->lock);
2070 	barrier = cpu_to_le32(osdc->epoch_barrier);
2071 	up_read(&osdc->lock);
2072 
2073 	spin_lock(&session->s_cap_lock);
2074 again:
2075 	list_splice_init(&session->s_cap_releases, &tmp_list);
2076 	num_cap_releases = session->s_num_cap_releases;
2077 	session->s_num_cap_releases = 0;
2078 	spin_unlock(&session->s_cap_lock);
2079 
2080 	while (!list_empty(&tmp_list)) {
2081 		if (!msg) {
2082 			msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2083 					PAGE_SIZE, GFP_NOFS, false);
2084 			if (!msg)
2085 				goto out_err;
2086 			head = msg->front.iov_base;
2087 			head->num = cpu_to_le32(0);
2088 			msg->front.iov_len = sizeof(*head);
2089 
2090 			msg->hdr.version = cpu_to_le16(2);
2091 			msg->hdr.compat_version = cpu_to_le16(1);
2092 		}
2093 
2094 		cap = list_first_entry(&tmp_list, struct ceph_cap,
2095 					session_caps);
2096 		list_del(&cap->session_caps);
2097 		num_cap_releases--;
2098 
2099 		head = msg->front.iov_base;
2100 		put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2101 				   &head->num);
2102 		item = msg->front.iov_base + msg->front.iov_len;
2103 		item->ino = cpu_to_le64(cap->cap_ino);
2104 		item->cap_id = cpu_to_le64(cap->cap_id);
2105 		item->migrate_seq = cpu_to_le32(cap->mseq);
2106 		item->seq = cpu_to_le32(cap->issue_seq);
2107 		msg->front.iov_len += sizeof(*item);
2108 
2109 		ceph_put_cap(mdsc, cap);
2110 
2111 		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2112 			// Append cap_barrier field
2113 			cap_barrier = msg->front.iov_base + msg->front.iov_len;
2114 			*cap_barrier = barrier;
2115 			msg->front.iov_len += sizeof(*cap_barrier);
2116 
2117 			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2118 			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2119 			ceph_con_send(&session->s_con, msg);
2120 			msg = NULL;
2121 		}
2122 	}
2123 
2124 	BUG_ON(num_cap_releases != 0);
2125 
2126 	spin_lock(&session->s_cap_lock);
2127 	if (!list_empty(&session->s_cap_releases))
2128 		goto again;
2129 	spin_unlock(&session->s_cap_lock);
2130 
2131 	if (msg) {
2132 		// Append cap_barrier field
2133 		cap_barrier = msg->front.iov_base + msg->front.iov_len;
2134 		*cap_barrier = barrier;
2135 		msg->front.iov_len += sizeof(*cap_barrier);
2136 
2137 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2138 		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2139 		ceph_con_send(&session->s_con, msg);
2140 	}
2141 	return;
2142 out_err:
2143 	pr_err("send_cap_releases mds%d, failed to allocate message\n",
2144 		session->s_mds);
2145 	spin_lock(&session->s_cap_lock);
2146 	list_splice(&tmp_list, &session->s_cap_releases);
2147 	session->s_num_cap_releases += num_cap_releases;
2148 	spin_unlock(&session->s_cap_lock);
2149 }
2150 
2151 static void ceph_cap_release_work(struct work_struct *work)
2152 {
2153 	struct ceph_mds_session *session =
2154 		container_of(work, struct ceph_mds_session, s_cap_release_work);
2155 
2156 	mutex_lock(&session->s_mutex);
2157 	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2158 	    session->s_state == CEPH_MDS_SESSION_HUNG)
2159 		ceph_send_cap_releases(session->s_mdsc, session);
2160 	mutex_unlock(&session->s_mutex);
2161 	ceph_put_mds_session(session);
2162 }
2163 
2164 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2165 		             struct ceph_mds_session *session)
2166 {
2167 	if (mdsc->stopping)
2168 		return;
2169 
2170 	ceph_get_mds_session(session);
2171 	if (queue_work(mdsc->fsc->cap_wq,
2172 		       &session->s_cap_release_work)) {
2173 		dout("cap release work queued\n");
2174 	} else {
2175 		ceph_put_mds_session(session);
2176 		dout("failed to queue cap release work\n");
2177 	}
2178 }
2179 
2180 /*
2181  * caller holds session->s_cap_lock
2182  */
2183 void __ceph_queue_cap_release(struct ceph_mds_session *session,
2184 			      struct ceph_cap *cap)
2185 {
2186 	list_add_tail(&cap->session_caps, &session->s_cap_releases);
2187 	session->s_num_cap_releases++;
2188 
2189 	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2190 		ceph_flush_cap_releases(session->s_mdsc, session);
2191 }
2192 
2193 static void ceph_cap_reclaim_work(struct work_struct *work)
2194 {
2195 	struct ceph_mds_client *mdsc =
2196 		container_of(work, struct ceph_mds_client, cap_reclaim_work);
2197 	int ret = ceph_trim_dentries(mdsc);
2198 	if (ret == -EAGAIN)
2199 		ceph_queue_cap_reclaim_work(mdsc);
2200 }
2201 
2202 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2203 {
2204 	if (mdsc->stopping)
2205 		return;
2206 
2207         if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2208                 dout("caps reclaim work queued\n");
2209         } else {
2210                 dout("failed to queue caps release work\n");
2211         }
2212 }
2213 
2214 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2215 {
2216 	int val;
2217 	if (!nr)
2218 		return;
2219 	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2220 	if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2221 		atomic_set(&mdsc->cap_reclaim_pending, 0);
2222 		ceph_queue_cap_reclaim_work(mdsc);
2223 	}
2224 }
2225 
2226 /*
2227  * requests
2228  */
2229 
2230 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2231 				    struct inode *dir)
2232 {
2233 	struct ceph_inode_info *ci = ceph_inode(dir);
2234 	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2235 	struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2236 	size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2237 	unsigned int num_entries;
2238 	int order;
2239 
2240 	spin_lock(&ci->i_ceph_lock);
2241 	num_entries = ci->i_files + ci->i_subdirs;
2242 	spin_unlock(&ci->i_ceph_lock);
2243 	num_entries = max(num_entries, 1U);
2244 	num_entries = min(num_entries, opt->max_readdir);
2245 
2246 	order = get_order(size * num_entries);
2247 	while (order >= 0) {
2248 		rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2249 							     __GFP_NOWARN,
2250 							     order);
2251 		if (rinfo->dir_entries)
2252 			break;
2253 		order--;
2254 	}
2255 	if (!rinfo->dir_entries)
2256 		return -ENOMEM;
2257 
2258 	num_entries = (PAGE_SIZE << order) / size;
2259 	num_entries = min(num_entries, opt->max_readdir);
2260 
2261 	rinfo->dir_buf_size = PAGE_SIZE << order;
2262 	req->r_num_caps = num_entries + 1;
2263 	req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2264 	req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2265 	return 0;
2266 }
2267 
2268 /*
2269  * Create an mds request.
2270  */
2271 struct ceph_mds_request *
2272 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2273 {
2274 	struct ceph_mds_request *req;
2275 
2276 	req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2277 	if (!req)
2278 		return ERR_PTR(-ENOMEM);
2279 
2280 	mutex_init(&req->r_fill_mutex);
2281 	req->r_mdsc = mdsc;
2282 	req->r_started = jiffies;
2283 	req->r_start_latency = ktime_get();
2284 	req->r_resend_mds = -1;
2285 	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2286 	INIT_LIST_HEAD(&req->r_unsafe_target_item);
2287 	req->r_fmode = -1;
2288 	kref_init(&req->r_kref);
2289 	RB_CLEAR_NODE(&req->r_node);
2290 	INIT_LIST_HEAD(&req->r_wait);
2291 	init_completion(&req->r_completion);
2292 	init_completion(&req->r_safe_completion);
2293 	INIT_LIST_HEAD(&req->r_unsafe_item);
2294 
2295 	ktime_get_coarse_real_ts64(&req->r_stamp);
2296 
2297 	req->r_op = op;
2298 	req->r_direct_mode = mode;
2299 	return req;
2300 }
2301 
2302 /*
2303  * return oldest (lowest) request, tid in request tree, 0 if none.
2304  *
2305  * called under mdsc->mutex.
2306  */
2307 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2308 {
2309 	if (RB_EMPTY_ROOT(&mdsc->request_tree))
2310 		return NULL;
2311 	return rb_entry(rb_first(&mdsc->request_tree),
2312 			struct ceph_mds_request, r_node);
2313 }
2314 
2315 static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2316 {
2317 	return mdsc->oldest_tid;
2318 }
2319 
2320 /*
2321  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
2322  * on build_path_from_dentry in fs/cifs/dir.c.
2323  *
2324  * If @stop_on_nosnap, generate path relative to the first non-snapped
2325  * inode.
2326  *
2327  * Encode hidden .snap dirs as a double /, i.e.
2328  *   foo/.snap/bar -> foo//bar
2329  */
2330 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2331 			   int stop_on_nosnap)
2332 {
2333 	struct dentry *temp;
2334 	char *path;
2335 	int pos;
2336 	unsigned seq;
2337 	u64 base;
2338 
2339 	if (!dentry)
2340 		return ERR_PTR(-EINVAL);
2341 
2342 	path = __getname();
2343 	if (!path)
2344 		return ERR_PTR(-ENOMEM);
2345 retry:
2346 	pos = PATH_MAX - 1;
2347 	path[pos] = '\0';
2348 
2349 	seq = read_seqbegin(&rename_lock);
2350 	rcu_read_lock();
2351 	temp = dentry;
2352 	for (;;) {
2353 		struct inode *inode;
2354 
2355 		spin_lock(&temp->d_lock);
2356 		inode = d_inode(temp);
2357 		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2358 			dout("build_path path+%d: %p SNAPDIR\n",
2359 			     pos, temp);
2360 		} else if (stop_on_nosnap && inode && dentry != temp &&
2361 			   ceph_snap(inode) == CEPH_NOSNAP) {
2362 			spin_unlock(&temp->d_lock);
2363 			pos++; /* get rid of any prepended '/' */
2364 			break;
2365 		} else {
2366 			pos -= temp->d_name.len;
2367 			if (pos < 0) {
2368 				spin_unlock(&temp->d_lock);
2369 				break;
2370 			}
2371 			memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2372 		}
2373 		spin_unlock(&temp->d_lock);
2374 		temp = READ_ONCE(temp->d_parent);
2375 
2376 		/* Are we at the root? */
2377 		if (IS_ROOT(temp))
2378 			break;
2379 
2380 		/* Are we out of buffer? */
2381 		if (--pos < 0)
2382 			break;
2383 
2384 		path[pos] = '/';
2385 	}
2386 	base = ceph_ino(d_inode(temp));
2387 	rcu_read_unlock();
2388 
2389 	if (read_seqretry(&rename_lock, seq))
2390 		goto retry;
2391 
2392 	if (pos < 0) {
2393 		/*
2394 		 * A rename didn't occur, but somehow we didn't end up where
2395 		 * we thought we would. Throw a warning and try again.
2396 		 */
2397 		pr_warn("build_path did not end path lookup where "
2398 			"expected, pos is %d\n", pos);
2399 		goto retry;
2400 	}
2401 
2402 	*pbase = base;
2403 	*plen = PATH_MAX - 1 - pos;
2404 	dout("build_path on %p %d built %llx '%.*s'\n",
2405 	     dentry, d_count(dentry), base, *plen, path + pos);
2406 	return path + pos;
2407 }
2408 
2409 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2410 			     const char **ppath, int *ppathlen, u64 *pino,
2411 			     bool *pfreepath, bool parent_locked)
2412 {
2413 	char *path;
2414 
2415 	rcu_read_lock();
2416 	if (!dir)
2417 		dir = d_inode_rcu(dentry->d_parent);
2418 	if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2419 		*pino = ceph_ino(dir);
2420 		rcu_read_unlock();
2421 		*ppath = dentry->d_name.name;
2422 		*ppathlen = dentry->d_name.len;
2423 		return 0;
2424 	}
2425 	rcu_read_unlock();
2426 	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2427 	if (IS_ERR(path))
2428 		return PTR_ERR(path);
2429 	*ppath = path;
2430 	*pfreepath = true;
2431 	return 0;
2432 }
2433 
2434 static int build_inode_path(struct inode *inode,
2435 			    const char **ppath, int *ppathlen, u64 *pino,
2436 			    bool *pfreepath)
2437 {
2438 	struct dentry *dentry;
2439 	char *path;
2440 
2441 	if (ceph_snap(inode) == CEPH_NOSNAP) {
2442 		*pino = ceph_ino(inode);
2443 		*ppathlen = 0;
2444 		return 0;
2445 	}
2446 	dentry = d_find_alias(inode);
2447 	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2448 	dput(dentry);
2449 	if (IS_ERR(path))
2450 		return PTR_ERR(path);
2451 	*ppath = path;
2452 	*pfreepath = true;
2453 	return 0;
2454 }
2455 
2456 /*
2457  * request arguments may be specified via an inode *, a dentry *, or
2458  * an explicit ino+path.
2459  */
2460 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2461 				  struct inode *rdiri, const char *rpath,
2462 				  u64 rino, const char **ppath, int *pathlen,
2463 				  u64 *ino, bool *freepath, bool parent_locked)
2464 {
2465 	int r = 0;
2466 
2467 	if (rinode) {
2468 		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2469 		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2470 		     ceph_snap(rinode));
2471 	} else if (rdentry) {
2472 		r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2473 					freepath, parent_locked);
2474 		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2475 		     *ppath);
2476 	} else if (rpath || rino) {
2477 		*ino = rino;
2478 		*ppath = rpath;
2479 		*pathlen = rpath ? strlen(rpath) : 0;
2480 		dout(" path %.*s\n", *pathlen, rpath);
2481 	}
2482 
2483 	return r;
2484 }
2485 
2486 static void encode_timestamp_and_gids(void **p,
2487 				      const struct ceph_mds_request *req)
2488 {
2489 	struct ceph_timespec ts;
2490 	int i;
2491 
2492 	ceph_encode_timespec64(&ts, &req->r_stamp);
2493 	ceph_encode_copy(p, &ts, sizeof(ts));
2494 
2495 	/* gid_list */
2496 	ceph_encode_32(p, req->r_cred->group_info->ngroups);
2497 	for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2498 		ceph_encode_64(p, from_kgid(&init_user_ns,
2499 					    req->r_cred->group_info->gid[i]));
2500 }
2501 
2502 /*
2503  * called under mdsc->mutex
2504  */
2505 static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2506 					       struct ceph_mds_request *req,
2507 					       bool drop_cap_releases)
2508 {
2509 	int mds = session->s_mds;
2510 	struct ceph_mds_client *mdsc = session->s_mdsc;
2511 	struct ceph_msg *msg;
2512 	struct ceph_mds_request_head_old *head;
2513 	const char *path1 = NULL;
2514 	const char *path2 = NULL;
2515 	u64 ino1 = 0, ino2 = 0;
2516 	int pathlen1 = 0, pathlen2 = 0;
2517 	bool freepath1 = false, freepath2 = false;
2518 	int len;
2519 	u16 releases;
2520 	void *p, *end;
2521 	int ret;
2522 	bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
2523 
2524 	ret = set_request_path_attr(req->r_inode, req->r_dentry,
2525 			      req->r_parent, req->r_path1, req->r_ino1.ino,
2526 			      &path1, &pathlen1, &ino1, &freepath1,
2527 			      test_bit(CEPH_MDS_R_PARENT_LOCKED,
2528 					&req->r_req_flags));
2529 	if (ret < 0) {
2530 		msg = ERR_PTR(ret);
2531 		goto out;
2532 	}
2533 
2534 	/* If r_old_dentry is set, then assume that its parent is locked */
2535 	ret = set_request_path_attr(NULL, req->r_old_dentry,
2536 			      req->r_old_dentry_dir,
2537 			      req->r_path2, req->r_ino2.ino,
2538 			      &path2, &pathlen2, &ino2, &freepath2, true);
2539 	if (ret < 0) {
2540 		msg = ERR_PTR(ret);
2541 		goto out_free1;
2542 	}
2543 
2544 	len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head);
2545 	len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2546 		sizeof(struct ceph_timespec);
2547 	len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
2548 
2549 	/* calculate (max) length for cap releases */
2550 	len += sizeof(struct ceph_mds_request_release) *
2551 		(!!req->r_inode_drop + !!req->r_dentry_drop +
2552 		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2553 
2554 	if (req->r_dentry_drop)
2555 		len += pathlen1;
2556 	if (req->r_old_dentry_drop)
2557 		len += pathlen2;
2558 
2559 	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2560 	if (!msg) {
2561 		msg = ERR_PTR(-ENOMEM);
2562 		goto out_free2;
2563 	}
2564 
2565 	msg->hdr.tid = cpu_to_le64(req->r_tid);
2566 
2567 	/*
2568 	 * The old ceph_mds_request_head didn't contain a version field, and
2569 	 * one was added when we moved the message version from 3->4.
2570 	 */
2571 	if (legacy) {
2572 		msg->hdr.version = cpu_to_le16(3);
2573 		head = msg->front.iov_base;
2574 		p = msg->front.iov_base + sizeof(*head);
2575 	} else {
2576 		struct ceph_mds_request_head *new_head = msg->front.iov_base;
2577 
2578 		msg->hdr.version = cpu_to_le16(4);
2579 		new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
2580 		head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2581 		p = msg->front.iov_base + sizeof(*new_head);
2582 	}
2583 
2584 	end = msg->front.iov_base + msg->front.iov_len;
2585 
2586 	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2587 	head->op = cpu_to_le32(req->r_op);
2588 	head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
2589 						 req->r_cred->fsuid));
2590 	head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
2591 						 req->r_cred->fsgid));
2592 	head->ino = cpu_to_le64(req->r_deleg_ino);
2593 	head->args = req->r_args;
2594 
2595 	ceph_encode_filepath(&p, end, ino1, path1);
2596 	ceph_encode_filepath(&p, end, ino2, path2);
2597 
2598 	/* make note of release offset, in case we need to replay */
2599 	req->r_request_release_offset = p - msg->front.iov_base;
2600 
2601 	/* cap releases */
2602 	releases = 0;
2603 	if (req->r_inode_drop)
2604 		releases += ceph_encode_inode_release(&p,
2605 		      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2606 		      mds, req->r_inode_drop, req->r_inode_unless,
2607 		      req->r_op == CEPH_MDS_OP_READDIR);
2608 	if (req->r_dentry_drop)
2609 		releases += ceph_encode_dentry_release(&p, req->r_dentry,
2610 				req->r_parent, mds, req->r_dentry_drop,
2611 				req->r_dentry_unless);
2612 	if (req->r_old_dentry_drop)
2613 		releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2614 				req->r_old_dentry_dir, mds,
2615 				req->r_old_dentry_drop,
2616 				req->r_old_dentry_unless);
2617 	if (req->r_old_inode_drop)
2618 		releases += ceph_encode_inode_release(&p,
2619 		      d_inode(req->r_old_dentry),
2620 		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2621 
2622 	if (drop_cap_releases) {
2623 		releases = 0;
2624 		p = msg->front.iov_base + req->r_request_release_offset;
2625 	}
2626 
2627 	head->num_releases = cpu_to_le16(releases);
2628 
2629 	encode_timestamp_and_gids(&p, req);
2630 
2631 	if (WARN_ON_ONCE(p > end)) {
2632 		ceph_msg_put(msg);
2633 		msg = ERR_PTR(-ERANGE);
2634 		goto out_free2;
2635 	}
2636 
2637 	msg->front.iov_len = p - msg->front.iov_base;
2638 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2639 
2640 	if (req->r_pagelist) {
2641 		struct ceph_pagelist *pagelist = req->r_pagelist;
2642 		ceph_msg_data_add_pagelist(msg, pagelist);
2643 		msg->hdr.data_len = cpu_to_le32(pagelist->length);
2644 	} else {
2645 		msg->hdr.data_len = 0;
2646 	}
2647 
2648 	msg->hdr.data_off = cpu_to_le16(0);
2649 
2650 out_free2:
2651 	if (freepath2)
2652 		ceph_mdsc_free_path((char *)path2, pathlen2);
2653 out_free1:
2654 	if (freepath1)
2655 		ceph_mdsc_free_path((char *)path1, pathlen1);
2656 out:
2657 	return msg;
2658 }
2659 
2660 /*
2661  * called under mdsc->mutex if error, under no mutex if
2662  * success.
2663  */
2664 static void complete_request(struct ceph_mds_client *mdsc,
2665 			     struct ceph_mds_request *req)
2666 {
2667 	req->r_end_latency = ktime_get();
2668 
2669 	if (req->r_callback)
2670 		req->r_callback(mdsc, req);
2671 	complete_all(&req->r_completion);
2672 }
2673 
2674 static struct ceph_mds_request_head_old *
2675 find_old_request_head(void *p, u64 features)
2676 {
2677 	bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2678 	struct ceph_mds_request_head *new_head;
2679 
2680 	if (legacy)
2681 		return (struct ceph_mds_request_head_old *)p;
2682 	new_head = (struct ceph_mds_request_head *)p;
2683 	return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2684 }
2685 
2686 /*
2687  * called under mdsc->mutex
2688  */
2689 static int __prepare_send_request(struct ceph_mds_session *session,
2690 				  struct ceph_mds_request *req,
2691 				  bool drop_cap_releases)
2692 {
2693 	int mds = session->s_mds;
2694 	struct ceph_mds_client *mdsc = session->s_mdsc;
2695 	struct ceph_mds_request_head_old *rhead;
2696 	struct ceph_msg *msg;
2697 	int flags = 0;
2698 
2699 	req->r_attempts++;
2700 	if (req->r_inode) {
2701 		struct ceph_cap *cap =
2702 			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2703 
2704 		if (cap)
2705 			req->r_sent_on_mseq = cap->mseq;
2706 		else
2707 			req->r_sent_on_mseq = -1;
2708 	}
2709 	dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2710 	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2711 
2712 	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2713 		void *p;
2714 
2715 		/*
2716 		 * Replay.  Do not regenerate message (and rebuild
2717 		 * paths, etc.); just use the original message.
2718 		 * Rebuilding paths will break for renames because
2719 		 * d_move mangles the src name.
2720 		 */
2721 		msg = req->r_request;
2722 		rhead = find_old_request_head(msg->front.iov_base,
2723 					      session->s_con.peer_features);
2724 
2725 		flags = le32_to_cpu(rhead->flags);
2726 		flags |= CEPH_MDS_FLAG_REPLAY;
2727 		rhead->flags = cpu_to_le32(flags);
2728 
2729 		if (req->r_target_inode)
2730 			rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2731 
2732 		rhead->num_retry = req->r_attempts - 1;
2733 
2734 		/* remove cap/dentry releases from message */
2735 		rhead->num_releases = 0;
2736 
2737 		p = msg->front.iov_base + req->r_request_release_offset;
2738 		encode_timestamp_and_gids(&p, req);
2739 
2740 		msg->front.iov_len = p - msg->front.iov_base;
2741 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2742 		return 0;
2743 	}
2744 
2745 	if (req->r_request) {
2746 		ceph_msg_put(req->r_request);
2747 		req->r_request = NULL;
2748 	}
2749 	msg = create_request_message(session, req, drop_cap_releases);
2750 	if (IS_ERR(msg)) {
2751 		req->r_err = PTR_ERR(msg);
2752 		return PTR_ERR(msg);
2753 	}
2754 	req->r_request = msg;
2755 
2756 	rhead = find_old_request_head(msg->front.iov_base,
2757 				      session->s_con.peer_features);
2758 	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2759 	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2760 		flags |= CEPH_MDS_FLAG_REPLAY;
2761 	if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2762 		flags |= CEPH_MDS_FLAG_ASYNC;
2763 	if (req->r_parent)
2764 		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2765 	rhead->flags = cpu_to_le32(flags);
2766 	rhead->num_fwd = req->r_num_fwd;
2767 	rhead->num_retry = req->r_attempts - 1;
2768 
2769 	dout(" r_parent = %p\n", req->r_parent);
2770 	return 0;
2771 }
2772 
2773 /*
2774  * called under mdsc->mutex
2775  */
2776 static int __send_request(struct ceph_mds_session *session,
2777 			  struct ceph_mds_request *req,
2778 			  bool drop_cap_releases)
2779 {
2780 	int err;
2781 
2782 	err = __prepare_send_request(session, req, drop_cap_releases);
2783 	if (!err) {
2784 		ceph_msg_get(req->r_request);
2785 		ceph_con_send(&session->s_con, req->r_request);
2786 	}
2787 
2788 	return err;
2789 }
2790 
2791 /*
2792  * send request, or put it on the appropriate wait list.
2793  */
2794 static void __do_request(struct ceph_mds_client *mdsc,
2795 			struct ceph_mds_request *req)
2796 {
2797 	struct ceph_mds_session *session = NULL;
2798 	int mds = -1;
2799 	int err = 0;
2800 	bool random;
2801 
2802 	if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2803 		if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2804 			__unregister_request(mdsc, req);
2805 		return;
2806 	}
2807 
2808 	if (req->r_timeout &&
2809 	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2810 		dout("do_request timed out\n");
2811 		err = -ETIMEDOUT;
2812 		goto finish;
2813 	}
2814 	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2815 		dout("do_request forced umount\n");
2816 		err = -EIO;
2817 		goto finish;
2818 	}
2819 	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2820 		if (mdsc->mdsmap_err) {
2821 			err = mdsc->mdsmap_err;
2822 			dout("do_request mdsmap err %d\n", err);
2823 			goto finish;
2824 		}
2825 		if (mdsc->mdsmap->m_epoch == 0) {
2826 			dout("do_request no mdsmap, waiting for map\n");
2827 			list_add(&req->r_wait, &mdsc->waiting_for_map);
2828 			return;
2829 		}
2830 		if (!(mdsc->fsc->mount_options->flags &
2831 		      CEPH_MOUNT_OPT_MOUNTWAIT) &&
2832 		    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2833 			err = -EHOSTUNREACH;
2834 			goto finish;
2835 		}
2836 	}
2837 
2838 	put_request_session(req);
2839 
2840 	mds = __choose_mds(mdsc, req, &random);
2841 	if (mds < 0 ||
2842 	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2843 		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2844 			err = -EJUKEBOX;
2845 			goto finish;
2846 		}
2847 		dout("do_request no mds or not active, waiting for map\n");
2848 		list_add(&req->r_wait, &mdsc->waiting_for_map);
2849 		return;
2850 	}
2851 
2852 	/* get, open session */
2853 	session = __ceph_lookup_mds_session(mdsc, mds);
2854 	if (!session) {
2855 		session = register_session(mdsc, mds);
2856 		if (IS_ERR(session)) {
2857 			err = PTR_ERR(session);
2858 			goto finish;
2859 		}
2860 	}
2861 	req->r_session = ceph_get_mds_session(session);
2862 
2863 	dout("do_request mds%d session %p state %s\n", mds, session,
2864 	     ceph_session_state_name(session->s_state));
2865 	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2866 	    session->s_state != CEPH_MDS_SESSION_HUNG) {
2867 		/*
2868 		 * We cannot queue async requests since the caps and delegated
2869 		 * inodes are bound to the session. Just return -EJUKEBOX and
2870 		 * let the caller retry a sync request in that case.
2871 		 */
2872 		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2873 			err = -EJUKEBOX;
2874 			goto out_session;
2875 		}
2876 
2877 		/*
2878 		 * If the session has been REJECTED, then return a hard error,
2879 		 * unless it's a CLEANRECOVER mount, in which case we'll queue
2880 		 * it to the mdsc queue.
2881 		 */
2882 		if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2883 			if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
2884 				list_add(&req->r_wait, &mdsc->waiting_for_map);
2885 			else
2886 				err = -EACCES;
2887 			goto out_session;
2888 		}
2889 
2890 		if (session->s_state == CEPH_MDS_SESSION_NEW ||
2891 		    session->s_state == CEPH_MDS_SESSION_CLOSING) {
2892 			err = __open_session(mdsc, session);
2893 			if (err)
2894 				goto out_session;
2895 			/* retry the same mds later */
2896 			if (random)
2897 				req->r_resend_mds = mds;
2898 		}
2899 		list_add(&req->r_wait, &session->s_waiting);
2900 		goto out_session;
2901 	}
2902 
2903 	/* send request */
2904 	req->r_resend_mds = -1;   /* forget any previous mds hint */
2905 
2906 	if (req->r_request_started == 0)   /* note request start time */
2907 		req->r_request_started = jiffies;
2908 
2909 	err = __send_request(session, req, false);
2910 
2911 out_session:
2912 	ceph_put_mds_session(session);
2913 finish:
2914 	if (err) {
2915 		dout("__do_request early error %d\n", err);
2916 		req->r_err = err;
2917 		complete_request(mdsc, req);
2918 		__unregister_request(mdsc, req);
2919 	}
2920 	return;
2921 }
2922 
2923 /*
2924  * called under mdsc->mutex
2925  */
2926 static void __wake_requests(struct ceph_mds_client *mdsc,
2927 			    struct list_head *head)
2928 {
2929 	struct ceph_mds_request *req;
2930 	LIST_HEAD(tmp_list);
2931 
2932 	list_splice_init(head, &tmp_list);
2933 
2934 	while (!list_empty(&tmp_list)) {
2935 		req = list_entry(tmp_list.next,
2936 				 struct ceph_mds_request, r_wait);
2937 		list_del_init(&req->r_wait);
2938 		dout(" wake request %p tid %llu\n", req, req->r_tid);
2939 		__do_request(mdsc, req);
2940 	}
2941 }
2942 
2943 /*
2944  * Wake up threads with requests pending for @mds, so that they can
2945  * resubmit their requests to a possibly different mds.
2946  */
2947 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2948 {
2949 	struct ceph_mds_request *req;
2950 	struct rb_node *p = rb_first(&mdsc->request_tree);
2951 
2952 	dout("kick_requests mds%d\n", mds);
2953 	while (p) {
2954 		req = rb_entry(p, struct ceph_mds_request, r_node);
2955 		p = rb_next(p);
2956 		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2957 			continue;
2958 		if (req->r_attempts > 0)
2959 			continue; /* only new requests */
2960 		if (req->r_session &&
2961 		    req->r_session->s_mds == mds) {
2962 			dout(" kicking tid %llu\n", req->r_tid);
2963 			list_del_init(&req->r_wait);
2964 			__do_request(mdsc, req);
2965 		}
2966 	}
2967 }
2968 
2969 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2970 			      struct ceph_mds_request *req)
2971 {
2972 	int err = 0;
2973 
2974 	/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2975 	if (req->r_inode)
2976 		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2977 	if (req->r_parent) {
2978 		struct ceph_inode_info *ci = ceph_inode(req->r_parent);
2979 		int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
2980 			    CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
2981 		spin_lock(&ci->i_ceph_lock);
2982 		ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
2983 		__ceph_touch_fmode(ci, mdsc, fmode);
2984 		spin_unlock(&ci->i_ceph_lock);
2985 	}
2986 	if (req->r_old_dentry_dir)
2987 		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2988 				  CEPH_CAP_PIN);
2989 
2990 	if (req->r_inode) {
2991 		err = ceph_wait_on_async_create(req->r_inode);
2992 		if (err) {
2993 			dout("%s: wait for async create returned: %d\n",
2994 			     __func__, err);
2995 			return err;
2996 		}
2997 	}
2998 
2999 	if (!err && req->r_old_inode) {
3000 		err = ceph_wait_on_async_create(req->r_old_inode);
3001 		if (err) {
3002 			dout("%s: wait for async create returned: %d\n",
3003 			     __func__, err);
3004 			return err;
3005 		}
3006 	}
3007 
3008 	dout("submit_request on %p for inode %p\n", req, dir);
3009 	mutex_lock(&mdsc->mutex);
3010 	__register_request(mdsc, req, dir);
3011 	__do_request(mdsc, req);
3012 	err = req->r_err;
3013 	mutex_unlock(&mdsc->mutex);
3014 	return err;
3015 }
3016 
3017 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3018 				  struct ceph_mds_request *req)
3019 {
3020 	int err;
3021 
3022 	/* wait */
3023 	dout("do_request waiting\n");
3024 	if (!req->r_timeout && req->r_wait_for_completion) {
3025 		err = req->r_wait_for_completion(mdsc, req);
3026 	} else {
3027 		long timeleft = wait_for_completion_killable_timeout(
3028 					&req->r_completion,
3029 					ceph_timeout_jiffies(req->r_timeout));
3030 		if (timeleft > 0)
3031 			err = 0;
3032 		else if (!timeleft)
3033 			err = -ETIMEDOUT;  /* timed out */
3034 		else
3035 			err = timeleft;  /* killed */
3036 	}
3037 	dout("do_request waited, got %d\n", err);
3038 	mutex_lock(&mdsc->mutex);
3039 
3040 	/* only abort if we didn't race with a real reply */
3041 	if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3042 		err = le32_to_cpu(req->r_reply_info.head->result);
3043 	} else if (err < 0) {
3044 		dout("aborted request %lld with %d\n", req->r_tid, err);
3045 
3046 		/*
3047 		 * ensure we aren't running concurrently with
3048 		 * ceph_fill_trace or ceph_readdir_prepopulate, which
3049 		 * rely on locks (dir mutex) held by our caller.
3050 		 */
3051 		mutex_lock(&req->r_fill_mutex);
3052 		req->r_err = err;
3053 		set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3054 		mutex_unlock(&req->r_fill_mutex);
3055 
3056 		if (req->r_parent &&
3057 		    (req->r_op & CEPH_MDS_OP_WRITE))
3058 			ceph_invalidate_dir_request(req);
3059 	} else {
3060 		err = req->r_err;
3061 	}
3062 
3063 	mutex_unlock(&mdsc->mutex);
3064 	return err;
3065 }
3066 
3067 /*
3068  * Synchrously perform an mds request.  Take care of all of the
3069  * session setup, forwarding, retry details.
3070  */
3071 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3072 			 struct inode *dir,
3073 			 struct ceph_mds_request *req)
3074 {
3075 	int err;
3076 
3077 	dout("do_request on %p\n", req);
3078 
3079 	/* issue */
3080 	err = ceph_mdsc_submit_request(mdsc, dir, req);
3081 	if (!err)
3082 		err = ceph_mdsc_wait_request(mdsc, req);
3083 	dout("do_request %p done, result %d\n", req, err);
3084 	return err;
3085 }
3086 
3087 /*
3088  * Invalidate dir's completeness, dentry lease state on an aborted MDS
3089  * namespace request.
3090  */
3091 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3092 {
3093 	struct inode *dir = req->r_parent;
3094 	struct inode *old_dir = req->r_old_dentry_dir;
3095 
3096 	dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3097 
3098 	ceph_dir_clear_complete(dir);
3099 	if (old_dir)
3100 		ceph_dir_clear_complete(old_dir);
3101 	if (req->r_dentry)
3102 		ceph_invalidate_dentry_lease(req->r_dentry);
3103 	if (req->r_old_dentry)
3104 		ceph_invalidate_dentry_lease(req->r_old_dentry);
3105 }
3106 
3107 /*
3108  * Handle mds reply.
3109  *
3110  * We take the session mutex and parse and process the reply immediately.
3111  * This preserves the logical ordering of replies, capabilities, etc., sent
3112  * by the MDS as they are applied to our local cache.
3113  */
3114 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3115 {
3116 	struct ceph_mds_client *mdsc = session->s_mdsc;
3117 	struct ceph_mds_request *req;
3118 	struct ceph_mds_reply_head *head = msg->front.iov_base;
3119 	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
3120 	struct ceph_snap_realm *realm;
3121 	u64 tid;
3122 	int err, result;
3123 	int mds = session->s_mds;
3124 
3125 	if (msg->front.iov_len < sizeof(*head)) {
3126 		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3127 		ceph_msg_dump(msg);
3128 		return;
3129 	}
3130 
3131 	/* get request, session */
3132 	tid = le64_to_cpu(msg->hdr.tid);
3133 	mutex_lock(&mdsc->mutex);
3134 	req = lookup_get_request(mdsc, tid);
3135 	if (!req) {
3136 		dout("handle_reply on unknown tid %llu\n", tid);
3137 		mutex_unlock(&mdsc->mutex);
3138 		return;
3139 	}
3140 	dout("handle_reply %p\n", req);
3141 
3142 	/* correct session? */
3143 	if (req->r_session != session) {
3144 		pr_err("mdsc_handle_reply got %llu on session mds%d"
3145 		       " not mds%d\n", tid, session->s_mds,
3146 		       req->r_session ? req->r_session->s_mds : -1);
3147 		mutex_unlock(&mdsc->mutex);
3148 		goto out;
3149 	}
3150 
3151 	/* dup? */
3152 	if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3153 	    (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3154 		pr_warn("got a dup %s reply on %llu from mds%d\n",
3155 			   head->safe ? "safe" : "unsafe", tid, mds);
3156 		mutex_unlock(&mdsc->mutex);
3157 		goto out;
3158 	}
3159 	if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3160 		pr_warn("got unsafe after safe on %llu from mds%d\n",
3161 			   tid, mds);
3162 		mutex_unlock(&mdsc->mutex);
3163 		goto out;
3164 	}
3165 
3166 	result = le32_to_cpu(head->result);
3167 
3168 	/*
3169 	 * Handle an ESTALE
3170 	 * if we're not talking to the authority, send to them
3171 	 * if the authority has changed while we weren't looking,
3172 	 * send to new authority
3173 	 * Otherwise we just have to return an ESTALE
3174 	 */
3175 	if (result == -ESTALE) {
3176 		dout("got ESTALE on request %llu\n", req->r_tid);
3177 		req->r_resend_mds = -1;
3178 		if (req->r_direct_mode != USE_AUTH_MDS) {
3179 			dout("not using auth, setting for that now\n");
3180 			req->r_direct_mode = USE_AUTH_MDS;
3181 			__do_request(mdsc, req);
3182 			mutex_unlock(&mdsc->mutex);
3183 			goto out;
3184 		} else  {
3185 			int mds = __choose_mds(mdsc, req, NULL);
3186 			if (mds >= 0 && mds != req->r_session->s_mds) {
3187 				dout("but auth changed, so resending\n");
3188 				__do_request(mdsc, req);
3189 				mutex_unlock(&mdsc->mutex);
3190 				goto out;
3191 			}
3192 		}
3193 		dout("have to return ESTALE on request %llu\n", req->r_tid);
3194 	}
3195 
3196 
3197 	if (head->safe) {
3198 		set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3199 		__unregister_request(mdsc, req);
3200 
3201 		/* last request during umount? */
3202 		if (mdsc->stopping && !__get_oldest_req(mdsc))
3203 			complete_all(&mdsc->safe_umount_waiters);
3204 
3205 		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3206 			/*
3207 			 * We already handled the unsafe response, now do the
3208 			 * cleanup.  No need to examine the response; the MDS
3209 			 * doesn't include any result info in the safe
3210 			 * response.  And even if it did, there is nothing
3211 			 * useful we could do with a revised return value.
3212 			 */
3213 			dout("got safe reply %llu, mds%d\n", tid, mds);
3214 
3215 			mutex_unlock(&mdsc->mutex);
3216 			goto out;
3217 		}
3218 	} else {
3219 		set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3220 		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3221 	}
3222 
3223 	dout("handle_reply tid %lld result %d\n", tid, result);
3224 	rinfo = &req->r_reply_info;
3225 	if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3226 		err = parse_reply_info(session, msg, rinfo, (u64)-1);
3227 	else
3228 		err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
3229 	mutex_unlock(&mdsc->mutex);
3230 
3231 	/* Must find target inode outside of mutexes to avoid deadlocks */
3232 	if ((err >= 0) && rinfo->head->is_target) {
3233 		struct inode *in;
3234 		struct ceph_vino tvino = {
3235 			.ino  = le64_to_cpu(rinfo->targeti.in->ino),
3236 			.snap = le64_to_cpu(rinfo->targeti.in->snapid)
3237 		};
3238 
3239 		in = ceph_get_inode(mdsc->fsc->sb, tvino);
3240 		if (IS_ERR(in)) {
3241 			err = PTR_ERR(in);
3242 			mutex_lock(&session->s_mutex);
3243 			goto out_err;
3244 		}
3245 		req->r_target_inode = in;
3246 	}
3247 
3248 	mutex_lock(&session->s_mutex);
3249 	if (err < 0) {
3250 		pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3251 		ceph_msg_dump(msg);
3252 		goto out_err;
3253 	}
3254 
3255 	/* snap trace */
3256 	realm = NULL;
3257 	if (rinfo->snapblob_len) {
3258 		down_write(&mdsc->snap_rwsem);
3259 		ceph_update_snap_trace(mdsc, rinfo->snapblob,
3260 				rinfo->snapblob + rinfo->snapblob_len,
3261 				le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3262 				&realm);
3263 		downgrade_write(&mdsc->snap_rwsem);
3264 	} else {
3265 		down_read(&mdsc->snap_rwsem);
3266 	}
3267 
3268 	/* insert trace into our cache */
3269 	mutex_lock(&req->r_fill_mutex);
3270 	current->journal_info = req;
3271 	err = ceph_fill_trace(mdsc->fsc->sb, req);
3272 	if (err == 0) {
3273 		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3274 				    req->r_op == CEPH_MDS_OP_LSSNAP))
3275 			ceph_readdir_prepopulate(req, req->r_session);
3276 	}
3277 	current->journal_info = NULL;
3278 	mutex_unlock(&req->r_fill_mutex);
3279 
3280 	up_read(&mdsc->snap_rwsem);
3281 	if (realm)
3282 		ceph_put_snap_realm(mdsc, realm);
3283 
3284 	if (err == 0) {
3285 		if (req->r_target_inode &&
3286 		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3287 			struct ceph_inode_info *ci =
3288 				ceph_inode(req->r_target_inode);
3289 			spin_lock(&ci->i_unsafe_lock);
3290 			list_add_tail(&req->r_unsafe_target_item,
3291 				      &ci->i_unsafe_iops);
3292 			spin_unlock(&ci->i_unsafe_lock);
3293 		}
3294 
3295 		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3296 	}
3297 out_err:
3298 	mutex_lock(&mdsc->mutex);
3299 	if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3300 		if (err) {
3301 			req->r_err = err;
3302 		} else {
3303 			req->r_reply =  ceph_msg_get(msg);
3304 			set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3305 		}
3306 	} else {
3307 		dout("reply arrived after request %lld was aborted\n", tid);
3308 	}
3309 	mutex_unlock(&mdsc->mutex);
3310 
3311 	mutex_unlock(&session->s_mutex);
3312 
3313 	/* kick calling process */
3314 	complete_request(mdsc, req);
3315 
3316 	ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
3317 				     req->r_end_latency, err);
3318 out:
3319 	ceph_mdsc_put_request(req);
3320 	return;
3321 }
3322 
3323 
3324 
3325 /*
3326  * handle mds notification that our request has been forwarded.
3327  */
3328 static void handle_forward(struct ceph_mds_client *mdsc,
3329 			   struct ceph_mds_session *session,
3330 			   struct ceph_msg *msg)
3331 {
3332 	struct ceph_mds_request *req;
3333 	u64 tid = le64_to_cpu(msg->hdr.tid);
3334 	u32 next_mds;
3335 	u32 fwd_seq;
3336 	int err = -EINVAL;
3337 	void *p = msg->front.iov_base;
3338 	void *end = p + msg->front.iov_len;
3339 
3340 	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3341 	next_mds = ceph_decode_32(&p);
3342 	fwd_seq = ceph_decode_32(&p);
3343 
3344 	mutex_lock(&mdsc->mutex);
3345 	req = lookup_get_request(mdsc, tid);
3346 	if (!req) {
3347 		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3348 		goto out;  /* dup reply? */
3349 	}
3350 
3351 	if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3352 		dout("forward tid %llu aborted, unregistering\n", tid);
3353 		__unregister_request(mdsc, req);
3354 	} else if (fwd_seq <= req->r_num_fwd) {
3355 		dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3356 		     tid, next_mds, req->r_num_fwd, fwd_seq);
3357 	} else {
3358 		/* resend. forward race not possible; mds would drop */
3359 		dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3360 		BUG_ON(req->r_err);
3361 		BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3362 		req->r_attempts = 0;
3363 		req->r_num_fwd = fwd_seq;
3364 		req->r_resend_mds = next_mds;
3365 		put_request_session(req);
3366 		__do_request(mdsc, req);
3367 	}
3368 	ceph_mdsc_put_request(req);
3369 out:
3370 	mutex_unlock(&mdsc->mutex);
3371 	return;
3372 
3373 bad:
3374 	pr_err("mdsc_handle_forward decode error err=%d\n", err);
3375 }
3376 
3377 static int __decode_session_metadata(void **p, void *end,
3378 				     bool *blocklisted)
3379 {
3380 	/* map<string,string> */
3381 	u32 n;
3382 	bool err_str;
3383 	ceph_decode_32_safe(p, end, n, bad);
3384 	while (n-- > 0) {
3385 		u32 len;
3386 		ceph_decode_32_safe(p, end, len, bad);
3387 		ceph_decode_need(p, end, len, bad);
3388 		err_str = !strncmp(*p, "error_string", len);
3389 		*p += len;
3390 		ceph_decode_32_safe(p, end, len, bad);
3391 		ceph_decode_need(p, end, len, bad);
3392 		/*
3393 		 * Match "blocklisted (blacklisted)" from newer MDSes,
3394 		 * or "blacklisted" from older MDSes.
3395 		 */
3396 		if (err_str && strnstr(*p, "blacklisted", len))
3397 			*blocklisted = true;
3398 		*p += len;
3399 	}
3400 	return 0;
3401 bad:
3402 	return -1;
3403 }
3404 
3405 /*
3406  * handle a mds session control message
3407  */
3408 static void handle_session(struct ceph_mds_session *session,
3409 			   struct ceph_msg *msg)
3410 {
3411 	struct ceph_mds_client *mdsc = session->s_mdsc;
3412 	int mds = session->s_mds;
3413 	int msg_version = le16_to_cpu(msg->hdr.version);
3414 	void *p = msg->front.iov_base;
3415 	void *end = p + msg->front.iov_len;
3416 	struct ceph_mds_session_head *h;
3417 	u32 op;
3418 	u64 seq, features = 0;
3419 	int wake = 0;
3420 	bool blocklisted = false;
3421 
3422 	/* decode */
3423 	ceph_decode_need(&p, end, sizeof(*h), bad);
3424 	h = p;
3425 	p += sizeof(*h);
3426 
3427 	op = le32_to_cpu(h->op);
3428 	seq = le64_to_cpu(h->seq);
3429 
3430 	if (msg_version >= 3) {
3431 		u32 len;
3432 		/* version >= 2, metadata */
3433 		if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3434 			goto bad;
3435 		/* version >= 3, feature bits */
3436 		ceph_decode_32_safe(&p, end, len, bad);
3437 		if (len) {
3438 			ceph_decode_64_safe(&p, end, features, bad);
3439 			p += len - sizeof(features);
3440 		}
3441 	}
3442 
3443 	mutex_lock(&mdsc->mutex);
3444 	if (op == CEPH_SESSION_CLOSE) {
3445 		ceph_get_mds_session(session);
3446 		__unregister_session(mdsc, session);
3447 	}
3448 	/* FIXME: this ttl calculation is generous */
3449 	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3450 	mutex_unlock(&mdsc->mutex);
3451 
3452 	mutex_lock(&session->s_mutex);
3453 
3454 	dout("handle_session mds%d %s %p state %s seq %llu\n",
3455 	     mds, ceph_session_op_name(op), session,
3456 	     ceph_session_state_name(session->s_state), seq);
3457 
3458 	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3459 		session->s_state = CEPH_MDS_SESSION_OPEN;
3460 		pr_info("mds%d came back\n", session->s_mds);
3461 	}
3462 
3463 	switch (op) {
3464 	case CEPH_SESSION_OPEN:
3465 		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3466 			pr_info("mds%d reconnect success\n", session->s_mds);
3467 		session->s_state = CEPH_MDS_SESSION_OPEN;
3468 		session->s_features = features;
3469 		renewed_caps(mdsc, session, 0);
3470 		if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
3471 			metric_schedule_delayed(&mdsc->metric);
3472 		wake = 1;
3473 		if (mdsc->stopping)
3474 			__close_session(mdsc, session);
3475 		break;
3476 
3477 	case CEPH_SESSION_RENEWCAPS:
3478 		if (session->s_renew_seq == seq)
3479 			renewed_caps(mdsc, session, 1);
3480 		break;
3481 
3482 	case CEPH_SESSION_CLOSE:
3483 		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3484 			pr_info("mds%d reconnect denied\n", session->s_mds);
3485 		session->s_state = CEPH_MDS_SESSION_CLOSED;
3486 		cleanup_session_requests(mdsc, session);
3487 		remove_session_caps(session);
3488 		wake = 2; /* for good measure */
3489 		wake_up_all(&mdsc->session_close_wq);
3490 		break;
3491 
3492 	case CEPH_SESSION_STALE:
3493 		pr_info("mds%d caps went stale, renewing\n",
3494 			session->s_mds);
3495 		atomic_inc(&session->s_cap_gen);
3496 		session->s_cap_ttl = jiffies - 1;
3497 		send_renew_caps(mdsc, session);
3498 		break;
3499 
3500 	case CEPH_SESSION_RECALL_STATE:
3501 		ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3502 		break;
3503 
3504 	case CEPH_SESSION_FLUSHMSG:
3505 		send_flushmsg_ack(mdsc, session, seq);
3506 		break;
3507 
3508 	case CEPH_SESSION_FORCE_RO:
3509 		dout("force_session_readonly %p\n", session);
3510 		spin_lock(&session->s_cap_lock);
3511 		session->s_readonly = true;
3512 		spin_unlock(&session->s_cap_lock);
3513 		wake_up_session_caps(session, FORCE_RO);
3514 		break;
3515 
3516 	case CEPH_SESSION_REJECT:
3517 		WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3518 		pr_info("mds%d rejected session\n", session->s_mds);
3519 		session->s_state = CEPH_MDS_SESSION_REJECTED;
3520 		cleanup_session_requests(mdsc, session);
3521 		remove_session_caps(session);
3522 		if (blocklisted)
3523 			mdsc->fsc->blocklisted = true;
3524 		wake = 2; /* for good measure */
3525 		break;
3526 
3527 	default:
3528 		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3529 		WARN_ON(1);
3530 	}
3531 
3532 	mutex_unlock(&session->s_mutex);
3533 	if (wake) {
3534 		mutex_lock(&mdsc->mutex);
3535 		__wake_requests(mdsc, &session->s_waiting);
3536 		if (wake == 2)
3537 			kick_requests(mdsc, mds);
3538 		mutex_unlock(&mdsc->mutex);
3539 	}
3540 	if (op == CEPH_SESSION_CLOSE)
3541 		ceph_put_mds_session(session);
3542 	return;
3543 
3544 bad:
3545 	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3546 	       (int)msg->front.iov_len);
3547 	ceph_msg_dump(msg);
3548 	return;
3549 }
3550 
3551 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3552 {
3553 	int dcaps;
3554 
3555 	dcaps = xchg(&req->r_dir_caps, 0);
3556 	if (dcaps) {
3557 		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3558 		ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3559 	}
3560 }
3561 
3562 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3563 {
3564 	int dcaps;
3565 
3566 	dcaps = xchg(&req->r_dir_caps, 0);
3567 	if (dcaps) {
3568 		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3569 		ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3570 						dcaps);
3571 	}
3572 }
3573 
3574 /*
3575  * called under session->mutex.
3576  */
3577 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3578 				   struct ceph_mds_session *session)
3579 {
3580 	struct ceph_mds_request *req, *nreq;
3581 	struct rb_node *p;
3582 
3583 	dout("replay_unsafe_requests mds%d\n", session->s_mds);
3584 
3585 	mutex_lock(&mdsc->mutex);
3586 	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3587 		__send_request(session, req, true);
3588 
3589 	/*
3590 	 * also re-send old requests when MDS enters reconnect stage. So that MDS
3591 	 * can process completed request in clientreplay stage.
3592 	 */
3593 	p = rb_first(&mdsc->request_tree);
3594 	while (p) {
3595 		req = rb_entry(p, struct ceph_mds_request, r_node);
3596 		p = rb_next(p);
3597 		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3598 			continue;
3599 		if (req->r_attempts == 0)
3600 			continue; /* only old requests */
3601 		if (!req->r_session)
3602 			continue;
3603 		if (req->r_session->s_mds != session->s_mds)
3604 			continue;
3605 
3606 		ceph_mdsc_release_dir_caps_no_check(req);
3607 
3608 		__send_request(session, req, true);
3609 	}
3610 	mutex_unlock(&mdsc->mutex);
3611 }
3612 
3613 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3614 {
3615 	struct ceph_msg *reply;
3616 	struct ceph_pagelist *_pagelist;
3617 	struct page *page;
3618 	__le32 *addr;
3619 	int err = -ENOMEM;
3620 
3621 	if (!recon_state->allow_multi)
3622 		return -ENOSPC;
3623 
3624 	/* can't handle message that contains both caps and realm */
3625 	BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3626 
3627 	/* pre-allocate new pagelist */
3628 	_pagelist = ceph_pagelist_alloc(GFP_NOFS);
3629 	if (!_pagelist)
3630 		return -ENOMEM;
3631 
3632 	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3633 	if (!reply)
3634 		goto fail_msg;
3635 
3636 	/* placeholder for nr_caps */
3637 	err = ceph_pagelist_encode_32(_pagelist, 0);
3638 	if (err < 0)
3639 		goto fail;
3640 
3641 	if (recon_state->nr_caps) {
3642 		/* currently encoding caps */
3643 		err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3644 		if (err)
3645 			goto fail;
3646 	} else {
3647 		/* placeholder for nr_realms (currently encoding relams) */
3648 		err = ceph_pagelist_encode_32(_pagelist, 0);
3649 		if (err < 0)
3650 			goto fail;
3651 	}
3652 
3653 	err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3654 	if (err)
3655 		goto fail;
3656 
3657 	page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3658 	addr = kmap_atomic(page);
3659 	if (recon_state->nr_caps) {
3660 		/* currently encoding caps */
3661 		*addr = cpu_to_le32(recon_state->nr_caps);
3662 	} else {
3663 		/* currently encoding relams */
3664 		*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3665 	}
3666 	kunmap_atomic(addr);
3667 
3668 	reply->hdr.version = cpu_to_le16(5);
3669 	reply->hdr.compat_version = cpu_to_le16(4);
3670 
3671 	reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3672 	ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3673 
3674 	ceph_con_send(&recon_state->session->s_con, reply);
3675 	ceph_pagelist_release(recon_state->pagelist);
3676 
3677 	recon_state->pagelist = _pagelist;
3678 	recon_state->nr_caps = 0;
3679 	recon_state->nr_realms = 0;
3680 	recon_state->msg_version = 5;
3681 	return 0;
3682 fail:
3683 	ceph_msg_put(reply);
3684 fail_msg:
3685 	ceph_pagelist_release(_pagelist);
3686 	return err;
3687 }
3688 
3689 static struct dentry* d_find_primary(struct inode *inode)
3690 {
3691 	struct dentry *alias, *dn = NULL;
3692 
3693 	if (hlist_empty(&inode->i_dentry))
3694 		return NULL;
3695 
3696 	spin_lock(&inode->i_lock);
3697 	if (hlist_empty(&inode->i_dentry))
3698 		goto out_unlock;
3699 
3700 	if (S_ISDIR(inode->i_mode)) {
3701 		alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
3702 		if (!IS_ROOT(alias))
3703 			dn = dget(alias);
3704 		goto out_unlock;
3705 	}
3706 
3707 	hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
3708 		spin_lock(&alias->d_lock);
3709 		if (!d_unhashed(alias) &&
3710 		    (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
3711 			dn = dget_dlock(alias);
3712 		}
3713 		spin_unlock(&alias->d_lock);
3714 		if (dn)
3715 			break;
3716 	}
3717 out_unlock:
3718 	spin_unlock(&inode->i_lock);
3719 	return dn;
3720 }
3721 
3722 /*
3723  * Encode information about a cap for a reconnect with the MDS.
3724  */
3725 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
3726 			  void *arg)
3727 {
3728 	union {
3729 		struct ceph_mds_cap_reconnect v2;
3730 		struct ceph_mds_cap_reconnect_v1 v1;
3731 	} rec;
3732 	struct ceph_inode_info *ci = cap->ci;
3733 	struct ceph_reconnect_state *recon_state = arg;
3734 	struct ceph_pagelist *pagelist = recon_state->pagelist;
3735 	struct dentry *dentry;
3736 	char *path;
3737 	int pathlen, err;
3738 	u64 pathbase;
3739 	u64 snap_follows;
3740 
3741 	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3742 	     inode, ceph_vinop(inode), cap, cap->cap_id,
3743 	     ceph_cap_string(cap->issued));
3744 
3745 	dentry = d_find_primary(inode);
3746 	if (dentry) {
3747 		/* set pathbase to parent dir when msg_version >= 2 */
3748 		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
3749 					    recon_state->msg_version >= 2);
3750 		dput(dentry);
3751 		if (IS_ERR(path)) {
3752 			err = PTR_ERR(path);
3753 			goto out_err;
3754 		}
3755 	} else {
3756 		path = NULL;
3757 		pathlen = 0;
3758 		pathbase = 0;
3759 	}
3760 
3761 	spin_lock(&ci->i_ceph_lock);
3762 	cap->seq = 0;        /* reset cap seq */
3763 	cap->issue_seq = 0;  /* and issue_seq */
3764 	cap->mseq = 0;       /* and migrate_seq */
3765 	cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
3766 
3767 	/* These are lost when the session goes away */
3768 	if (S_ISDIR(inode->i_mode)) {
3769 		if (cap->issued & CEPH_CAP_DIR_CREATE) {
3770 			ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3771 			memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3772 		}
3773 		cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3774 	}
3775 
3776 	if (recon_state->msg_version >= 2) {
3777 		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3778 		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3779 		rec.v2.issued = cpu_to_le32(cap->issued);
3780 		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3781 		rec.v2.pathbase = cpu_to_le64(pathbase);
3782 		rec.v2.flock_len = (__force __le32)
3783 			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3784 	} else {
3785 		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3786 		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3787 		rec.v1.issued = cpu_to_le32(cap->issued);
3788 		rec.v1.size = cpu_to_le64(i_size_read(inode));
3789 		ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3790 		ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3791 		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3792 		rec.v1.pathbase = cpu_to_le64(pathbase);
3793 	}
3794 
3795 	if (list_empty(&ci->i_cap_snaps)) {
3796 		snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3797 	} else {
3798 		struct ceph_cap_snap *capsnap =
3799 			list_first_entry(&ci->i_cap_snaps,
3800 					 struct ceph_cap_snap, ci_item);
3801 		snap_follows = capsnap->follows;
3802 	}
3803 	spin_unlock(&ci->i_ceph_lock);
3804 
3805 	if (recon_state->msg_version >= 2) {
3806 		int num_fcntl_locks, num_flock_locks;
3807 		struct ceph_filelock *flocks = NULL;
3808 		size_t struct_len, total_len = sizeof(u64);
3809 		u8 struct_v = 0;
3810 
3811 encode_again:
3812 		if (rec.v2.flock_len) {
3813 			ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3814 		} else {
3815 			num_fcntl_locks = 0;
3816 			num_flock_locks = 0;
3817 		}
3818 		if (num_fcntl_locks + num_flock_locks > 0) {
3819 			flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3820 					       sizeof(struct ceph_filelock),
3821 					       GFP_NOFS);
3822 			if (!flocks) {
3823 				err = -ENOMEM;
3824 				goto out_err;
3825 			}
3826 			err = ceph_encode_locks_to_buffer(inode, flocks,
3827 							  num_fcntl_locks,
3828 							  num_flock_locks);
3829 			if (err) {
3830 				kfree(flocks);
3831 				flocks = NULL;
3832 				if (err == -ENOSPC)
3833 					goto encode_again;
3834 				goto out_err;
3835 			}
3836 		} else {
3837 			kfree(flocks);
3838 			flocks = NULL;
3839 		}
3840 
3841 		if (recon_state->msg_version >= 3) {
3842 			/* version, compat_version and struct_len */
3843 			total_len += 2 * sizeof(u8) + sizeof(u32);
3844 			struct_v = 2;
3845 		}
3846 		/*
3847 		 * number of encoded locks is stable, so copy to pagelist
3848 		 */
3849 		struct_len = 2 * sizeof(u32) +
3850 			    (num_fcntl_locks + num_flock_locks) *
3851 			    sizeof(struct ceph_filelock);
3852 		rec.v2.flock_len = cpu_to_le32(struct_len);
3853 
3854 		struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
3855 
3856 		if (struct_v >= 2)
3857 			struct_len += sizeof(u64); /* snap_follows */
3858 
3859 		total_len += struct_len;
3860 
3861 		if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3862 			err = send_reconnect_partial(recon_state);
3863 			if (err)
3864 				goto out_freeflocks;
3865 			pagelist = recon_state->pagelist;
3866 		}
3867 
3868 		err = ceph_pagelist_reserve(pagelist, total_len);
3869 		if (err)
3870 			goto out_freeflocks;
3871 
3872 		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3873 		if (recon_state->msg_version >= 3) {
3874 			ceph_pagelist_encode_8(pagelist, struct_v);
3875 			ceph_pagelist_encode_8(pagelist, 1);
3876 			ceph_pagelist_encode_32(pagelist, struct_len);
3877 		}
3878 		ceph_pagelist_encode_string(pagelist, path, pathlen);
3879 		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3880 		ceph_locks_to_pagelist(flocks, pagelist,
3881 				       num_fcntl_locks, num_flock_locks);
3882 		if (struct_v >= 2)
3883 			ceph_pagelist_encode_64(pagelist, snap_follows);
3884 out_freeflocks:
3885 		kfree(flocks);
3886 	} else {
3887 		err = ceph_pagelist_reserve(pagelist,
3888 					    sizeof(u64) + sizeof(u32) +
3889 					    pathlen + sizeof(rec.v1));
3890 		if (err)
3891 			goto out_err;
3892 
3893 		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3894 		ceph_pagelist_encode_string(pagelist, path, pathlen);
3895 		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3896 	}
3897 
3898 out_err:
3899 	ceph_mdsc_free_path(path, pathlen);
3900 	if (!err)
3901 		recon_state->nr_caps++;
3902 	return err;
3903 }
3904 
3905 static int encode_snap_realms(struct ceph_mds_client *mdsc,
3906 			      struct ceph_reconnect_state *recon_state)
3907 {
3908 	struct rb_node *p;
3909 	struct ceph_pagelist *pagelist = recon_state->pagelist;
3910 	int err = 0;
3911 
3912 	if (recon_state->msg_version >= 4) {
3913 		err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3914 		if (err < 0)
3915 			goto fail;
3916 	}
3917 
3918 	/*
3919 	 * snaprealms.  we provide mds with the ino, seq (version), and
3920 	 * parent for all of our realms.  If the mds has any newer info,
3921 	 * it will tell us.
3922 	 */
3923 	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3924 		struct ceph_snap_realm *realm =
3925 		       rb_entry(p, struct ceph_snap_realm, node);
3926 		struct ceph_mds_snaprealm_reconnect sr_rec;
3927 
3928 		if (recon_state->msg_version >= 4) {
3929 			size_t need = sizeof(u8) * 2 + sizeof(u32) +
3930 				      sizeof(sr_rec);
3931 
3932 			if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3933 				err = send_reconnect_partial(recon_state);
3934 				if (err)
3935 					goto fail;
3936 				pagelist = recon_state->pagelist;
3937 			}
3938 
3939 			err = ceph_pagelist_reserve(pagelist, need);
3940 			if (err)
3941 				goto fail;
3942 
3943 			ceph_pagelist_encode_8(pagelist, 1);
3944 			ceph_pagelist_encode_8(pagelist, 1);
3945 			ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3946 		}
3947 
3948 		dout(" adding snap realm %llx seq %lld parent %llx\n",
3949 		     realm->ino, realm->seq, realm->parent_ino);
3950 		sr_rec.ino = cpu_to_le64(realm->ino);
3951 		sr_rec.seq = cpu_to_le64(realm->seq);
3952 		sr_rec.parent = cpu_to_le64(realm->parent_ino);
3953 
3954 		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3955 		if (err)
3956 			goto fail;
3957 
3958 		recon_state->nr_realms++;
3959 	}
3960 fail:
3961 	return err;
3962 }
3963 
3964 
3965 /*
3966  * If an MDS fails and recovers, clients need to reconnect in order to
3967  * reestablish shared state.  This includes all caps issued through
3968  * this session _and_ the snap_realm hierarchy.  Because it's not
3969  * clear which snap realms the mds cares about, we send everything we
3970  * know about.. that ensures we'll then get any new info the
3971  * recovering MDS might have.
3972  *
3973  * This is a relatively heavyweight operation, but it's rare.
3974  */
3975 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3976 			       struct ceph_mds_session *session)
3977 {
3978 	struct ceph_msg *reply;
3979 	int mds = session->s_mds;
3980 	int err = -ENOMEM;
3981 	struct ceph_reconnect_state recon_state = {
3982 		.session = session,
3983 	};
3984 	LIST_HEAD(dispose);
3985 
3986 	pr_info("mds%d reconnect start\n", mds);
3987 
3988 	recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3989 	if (!recon_state.pagelist)
3990 		goto fail_nopagelist;
3991 
3992 	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3993 	if (!reply)
3994 		goto fail_nomsg;
3995 
3996 	xa_destroy(&session->s_delegated_inos);
3997 
3998 	mutex_lock(&session->s_mutex);
3999 	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4000 	session->s_seq = 0;
4001 
4002 	dout("session %p state %s\n", session,
4003 	     ceph_session_state_name(session->s_state));
4004 
4005 	atomic_inc(&session->s_cap_gen);
4006 
4007 	spin_lock(&session->s_cap_lock);
4008 	/* don't know if session is readonly */
4009 	session->s_readonly = 0;
4010 	/*
4011 	 * notify __ceph_remove_cap() that we are composing cap reconnect.
4012 	 * If a cap get released before being added to the cap reconnect,
4013 	 * __ceph_remove_cap() should skip queuing cap release.
4014 	 */
4015 	session->s_cap_reconnect = 1;
4016 	/* drop old cap expires; we're about to reestablish that state */
4017 	detach_cap_releases(session, &dispose);
4018 	spin_unlock(&session->s_cap_lock);
4019 	dispose_cap_releases(mdsc, &dispose);
4020 
4021 	/* trim unused caps to reduce MDS's cache rejoin time */
4022 	if (mdsc->fsc->sb->s_root)
4023 		shrink_dcache_parent(mdsc->fsc->sb->s_root);
4024 
4025 	ceph_con_close(&session->s_con);
4026 	ceph_con_open(&session->s_con,
4027 		      CEPH_ENTITY_TYPE_MDS, mds,
4028 		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4029 
4030 	/* replay unsafe requests */
4031 	replay_unsafe_requests(mdsc, session);
4032 
4033 	ceph_early_kick_flushing_caps(mdsc, session);
4034 
4035 	down_read(&mdsc->snap_rwsem);
4036 
4037 	/* placeholder for nr_caps */
4038 	err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4039 	if (err)
4040 		goto fail;
4041 
4042 	if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4043 		recon_state.msg_version = 3;
4044 		recon_state.allow_multi = true;
4045 	} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4046 		recon_state.msg_version = 3;
4047 	} else {
4048 		recon_state.msg_version = 2;
4049 	}
4050 	/* trsaverse this session's caps */
4051 	err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4052 
4053 	spin_lock(&session->s_cap_lock);
4054 	session->s_cap_reconnect = 0;
4055 	spin_unlock(&session->s_cap_lock);
4056 
4057 	if (err < 0)
4058 		goto fail;
4059 
4060 	/* check if all realms can be encoded into current message */
4061 	if (mdsc->num_snap_realms) {
4062 		size_t total_len =
4063 			recon_state.pagelist->length +
4064 			mdsc->num_snap_realms *
4065 			sizeof(struct ceph_mds_snaprealm_reconnect);
4066 		if (recon_state.msg_version >= 4) {
4067 			/* number of realms */
4068 			total_len += sizeof(u32);
4069 			/* version, compat_version and struct_len */
4070 			total_len += mdsc->num_snap_realms *
4071 				     (2 * sizeof(u8) + sizeof(u32));
4072 		}
4073 		if (total_len > RECONNECT_MAX_SIZE) {
4074 			if (!recon_state.allow_multi) {
4075 				err = -ENOSPC;
4076 				goto fail;
4077 			}
4078 			if (recon_state.nr_caps) {
4079 				err = send_reconnect_partial(&recon_state);
4080 				if (err)
4081 					goto fail;
4082 			}
4083 			recon_state.msg_version = 5;
4084 		}
4085 	}
4086 
4087 	err = encode_snap_realms(mdsc, &recon_state);
4088 	if (err < 0)
4089 		goto fail;
4090 
4091 	if (recon_state.msg_version >= 5) {
4092 		err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4093 		if (err < 0)
4094 			goto fail;
4095 	}
4096 
4097 	if (recon_state.nr_caps || recon_state.nr_realms) {
4098 		struct page *page =
4099 			list_first_entry(&recon_state.pagelist->head,
4100 					struct page, lru);
4101 		__le32 *addr = kmap_atomic(page);
4102 		if (recon_state.nr_caps) {
4103 			WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4104 			*addr = cpu_to_le32(recon_state.nr_caps);
4105 		} else if (recon_state.msg_version >= 4) {
4106 			*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4107 		}
4108 		kunmap_atomic(addr);
4109 	}
4110 
4111 	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4112 	if (recon_state.msg_version >= 4)
4113 		reply->hdr.compat_version = cpu_to_le16(4);
4114 
4115 	reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4116 	ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4117 
4118 	ceph_con_send(&session->s_con, reply);
4119 
4120 	mutex_unlock(&session->s_mutex);
4121 
4122 	mutex_lock(&mdsc->mutex);
4123 	__wake_requests(mdsc, &session->s_waiting);
4124 	mutex_unlock(&mdsc->mutex);
4125 
4126 	up_read(&mdsc->snap_rwsem);
4127 	ceph_pagelist_release(recon_state.pagelist);
4128 	return;
4129 
4130 fail:
4131 	ceph_msg_put(reply);
4132 	up_read(&mdsc->snap_rwsem);
4133 	mutex_unlock(&session->s_mutex);
4134 fail_nomsg:
4135 	ceph_pagelist_release(recon_state.pagelist);
4136 fail_nopagelist:
4137 	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4138 	return;
4139 }
4140 
4141 
4142 /*
4143  * compare old and new mdsmaps, kicking requests
4144  * and closing out old connections as necessary
4145  *
4146  * called under mdsc->mutex.
4147  */
4148 static void check_new_map(struct ceph_mds_client *mdsc,
4149 			  struct ceph_mdsmap *newmap,
4150 			  struct ceph_mdsmap *oldmap)
4151 {
4152 	int i;
4153 	int oldstate, newstate;
4154 	struct ceph_mds_session *s;
4155 
4156 	dout("check_new_map new %u old %u\n",
4157 	     newmap->m_epoch, oldmap->m_epoch);
4158 
4159 	for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4160 		if (!mdsc->sessions[i])
4161 			continue;
4162 		s = mdsc->sessions[i];
4163 		oldstate = ceph_mdsmap_get_state(oldmap, i);
4164 		newstate = ceph_mdsmap_get_state(newmap, i);
4165 
4166 		dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4167 		     i, ceph_mds_state_name(oldstate),
4168 		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4169 		     ceph_mds_state_name(newstate),
4170 		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4171 		     ceph_session_state_name(s->s_state));
4172 
4173 		if (i >= newmap->possible_max_rank) {
4174 			/* force close session for stopped mds */
4175 			ceph_get_mds_session(s);
4176 			__unregister_session(mdsc, s);
4177 			__wake_requests(mdsc, &s->s_waiting);
4178 			mutex_unlock(&mdsc->mutex);
4179 
4180 			mutex_lock(&s->s_mutex);
4181 			cleanup_session_requests(mdsc, s);
4182 			remove_session_caps(s);
4183 			mutex_unlock(&s->s_mutex);
4184 
4185 			ceph_put_mds_session(s);
4186 
4187 			mutex_lock(&mdsc->mutex);
4188 			kick_requests(mdsc, i);
4189 			continue;
4190 		}
4191 
4192 		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4193 			   ceph_mdsmap_get_addr(newmap, i),
4194 			   sizeof(struct ceph_entity_addr))) {
4195 			/* just close it */
4196 			mutex_unlock(&mdsc->mutex);
4197 			mutex_lock(&s->s_mutex);
4198 			mutex_lock(&mdsc->mutex);
4199 			ceph_con_close(&s->s_con);
4200 			mutex_unlock(&s->s_mutex);
4201 			s->s_state = CEPH_MDS_SESSION_RESTARTING;
4202 		} else if (oldstate == newstate) {
4203 			continue;  /* nothing new with this mds */
4204 		}
4205 
4206 		/*
4207 		 * send reconnect?
4208 		 */
4209 		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4210 		    newstate >= CEPH_MDS_STATE_RECONNECT) {
4211 			mutex_unlock(&mdsc->mutex);
4212 			send_mds_reconnect(mdsc, s);
4213 			mutex_lock(&mdsc->mutex);
4214 		}
4215 
4216 		/*
4217 		 * kick request on any mds that has gone active.
4218 		 */
4219 		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4220 		    newstate >= CEPH_MDS_STATE_ACTIVE) {
4221 			if (oldstate != CEPH_MDS_STATE_CREATING &&
4222 			    oldstate != CEPH_MDS_STATE_STARTING)
4223 				pr_info("mds%d recovery completed\n", s->s_mds);
4224 			kick_requests(mdsc, i);
4225 			mutex_unlock(&mdsc->mutex);
4226 			mutex_lock(&s->s_mutex);
4227 			mutex_lock(&mdsc->mutex);
4228 			ceph_kick_flushing_caps(mdsc, s);
4229 			mutex_unlock(&s->s_mutex);
4230 			wake_up_session_caps(s, RECONNECT);
4231 		}
4232 	}
4233 
4234 	for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4235 		s = mdsc->sessions[i];
4236 		if (!s)
4237 			continue;
4238 		if (!ceph_mdsmap_is_laggy(newmap, i))
4239 			continue;
4240 		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4241 		    s->s_state == CEPH_MDS_SESSION_HUNG ||
4242 		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
4243 			dout(" connecting to export targets of laggy mds%d\n",
4244 			     i);
4245 			__open_export_target_sessions(mdsc, s);
4246 		}
4247 	}
4248 }
4249 
4250 
4251 
4252 /*
4253  * leases
4254  */
4255 
4256 /*
4257  * caller must hold session s_mutex, dentry->d_lock
4258  */
4259 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4260 {
4261 	struct ceph_dentry_info *di = ceph_dentry(dentry);
4262 
4263 	ceph_put_mds_session(di->lease_session);
4264 	di->lease_session = NULL;
4265 }
4266 
4267 static void handle_lease(struct ceph_mds_client *mdsc,
4268 			 struct ceph_mds_session *session,
4269 			 struct ceph_msg *msg)
4270 {
4271 	struct super_block *sb = mdsc->fsc->sb;
4272 	struct inode *inode;
4273 	struct dentry *parent, *dentry;
4274 	struct ceph_dentry_info *di;
4275 	int mds = session->s_mds;
4276 	struct ceph_mds_lease *h = msg->front.iov_base;
4277 	u32 seq;
4278 	struct ceph_vino vino;
4279 	struct qstr dname;
4280 	int release = 0;
4281 
4282 	dout("handle_lease from mds%d\n", mds);
4283 
4284 	/* decode */
4285 	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4286 		goto bad;
4287 	vino.ino = le64_to_cpu(h->ino);
4288 	vino.snap = CEPH_NOSNAP;
4289 	seq = le32_to_cpu(h->seq);
4290 	dname.len = get_unaligned_le32(h + 1);
4291 	if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4292 		goto bad;
4293 	dname.name = (void *)(h + 1) + sizeof(u32);
4294 
4295 	/* lookup inode */
4296 	inode = ceph_find_inode(sb, vino);
4297 	dout("handle_lease %s, ino %llx %p %.*s\n",
4298 	     ceph_lease_op_name(h->action), vino.ino, inode,
4299 	     dname.len, dname.name);
4300 
4301 	mutex_lock(&session->s_mutex);
4302 	inc_session_sequence(session);
4303 
4304 	if (!inode) {
4305 		dout("handle_lease no inode %llx\n", vino.ino);
4306 		goto release;
4307 	}
4308 
4309 	/* dentry */
4310 	parent = d_find_alias(inode);
4311 	if (!parent) {
4312 		dout("no parent dentry on inode %p\n", inode);
4313 		WARN_ON(1);
4314 		goto release;  /* hrm... */
4315 	}
4316 	dname.hash = full_name_hash(parent, dname.name, dname.len);
4317 	dentry = d_lookup(parent, &dname);
4318 	dput(parent);
4319 	if (!dentry)
4320 		goto release;
4321 
4322 	spin_lock(&dentry->d_lock);
4323 	di = ceph_dentry(dentry);
4324 	switch (h->action) {
4325 	case CEPH_MDS_LEASE_REVOKE:
4326 		if (di->lease_session == session) {
4327 			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4328 				h->seq = cpu_to_le32(di->lease_seq);
4329 			__ceph_mdsc_drop_dentry_lease(dentry);
4330 		}
4331 		release = 1;
4332 		break;
4333 
4334 	case CEPH_MDS_LEASE_RENEW:
4335 		if (di->lease_session == session &&
4336 		    di->lease_gen == atomic_read(&session->s_cap_gen) &&
4337 		    di->lease_renew_from &&
4338 		    di->lease_renew_after == 0) {
4339 			unsigned long duration =
4340 				msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4341 
4342 			di->lease_seq = seq;
4343 			di->time = di->lease_renew_from + duration;
4344 			di->lease_renew_after = di->lease_renew_from +
4345 				(duration >> 1);
4346 			di->lease_renew_from = 0;
4347 		}
4348 		break;
4349 	}
4350 	spin_unlock(&dentry->d_lock);
4351 	dput(dentry);
4352 
4353 	if (!release)
4354 		goto out;
4355 
4356 release:
4357 	/* let's just reuse the same message */
4358 	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4359 	ceph_msg_get(msg);
4360 	ceph_con_send(&session->s_con, msg);
4361 
4362 out:
4363 	mutex_unlock(&session->s_mutex);
4364 	iput(inode);
4365 	return;
4366 
4367 bad:
4368 	pr_err("corrupt lease message\n");
4369 	ceph_msg_dump(msg);
4370 }
4371 
4372 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4373 			      struct dentry *dentry, char action,
4374 			      u32 seq)
4375 {
4376 	struct ceph_msg *msg;
4377 	struct ceph_mds_lease *lease;
4378 	struct inode *dir;
4379 	int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4380 
4381 	dout("lease_send_msg identry %p %s to mds%d\n",
4382 	     dentry, ceph_lease_op_name(action), session->s_mds);
4383 
4384 	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4385 	if (!msg)
4386 		return;
4387 	lease = msg->front.iov_base;
4388 	lease->action = action;
4389 	lease->seq = cpu_to_le32(seq);
4390 
4391 	spin_lock(&dentry->d_lock);
4392 	dir = d_inode(dentry->d_parent);
4393 	lease->ino = cpu_to_le64(ceph_ino(dir));
4394 	lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4395 
4396 	put_unaligned_le32(dentry->d_name.len, lease + 1);
4397 	memcpy((void *)(lease + 1) + 4,
4398 	       dentry->d_name.name, dentry->d_name.len);
4399 	spin_unlock(&dentry->d_lock);
4400 	/*
4401 	 * if this is a preemptive lease RELEASE, no need to
4402 	 * flush request stream, since the actual request will
4403 	 * soon follow.
4404 	 */
4405 	msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4406 
4407 	ceph_con_send(&session->s_con, msg);
4408 }
4409 
4410 /*
4411  * lock unlock sessions, to wait ongoing session activities
4412  */
4413 static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
4414 {
4415 	int i;
4416 
4417 	mutex_lock(&mdsc->mutex);
4418 	for (i = 0; i < mdsc->max_sessions; i++) {
4419 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4420 		if (!s)
4421 			continue;
4422 		mutex_unlock(&mdsc->mutex);
4423 		mutex_lock(&s->s_mutex);
4424 		mutex_unlock(&s->s_mutex);
4425 		ceph_put_mds_session(s);
4426 		mutex_lock(&mdsc->mutex);
4427 	}
4428 	mutex_unlock(&mdsc->mutex);
4429 }
4430 
4431 static void maybe_recover_session(struct ceph_mds_client *mdsc)
4432 {
4433 	struct ceph_fs_client *fsc = mdsc->fsc;
4434 
4435 	if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4436 		return;
4437 
4438 	if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4439 		return;
4440 
4441 	if (!READ_ONCE(fsc->blocklisted))
4442 		return;
4443 
4444 	pr_info("auto reconnect after blocklisted\n");
4445 	ceph_force_reconnect(fsc->sb);
4446 }
4447 
4448 bool check_session_state(struct ceph_mds_session *s)
4449 {
4450 	switch (s->s_state) {
4451 	case CEPH_MDS_SESSION_OPEN:
4452 		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4453 			s->s_state = CEPH_MDS_SESSION_HUNG;
4454 			pr_info("mds%d hung\n", s->s_mds);
4455 		}
4456 		break;
4457 	case CEPH_MDS_SESSION_CLOSING:
4458 		/* Should never reach this when we're unmounting */
4459 		WARN_ON_ONCE(true);
4460 		fallthrough;
4461 	case CEPH_MDS_SESSION_NEW:
4462 	case CEPH_MDS_SESSION_RESTARTING:
4463 	case CEPH_MDS_SESSION_CLOSED:
4464 	case CEPH_MDS_SESSION_REJECTED:
4465 		return false;
4466 	}
4467 
4468 	return true;
4469 }
4470 
4471 /*
4472  * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
4473  * then we need to retransmit that request.
4474  */
4475 void inc_session_sequence(struct ceph_mds_session *s)
4476 {
4477 	lockdep_assert_held(&s->s_mutex);
4478 
4479 	s->s_seq++;
4480 
4481 	if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4482 		int ret;
4483 
4484 		dout("resending session close request for mds%d\n", s->s_mds);
4485 		ret = request_close_session(s);
4486 		if (ret < 0)
4487 			pr_err("unable to close session to mds%d: %d\n",
4488 			       s->s_mds, ret);
4489 	}
4490 }
4491 
4492 /*
4493  * delayed work -- periodically trim expired leases, renew caps with mds
4494  */
4495 static void schedule_delayed(struct ceph_mds_client *mdsc)
4496 {
4497 	int delay = 5;
4498 	unsigned hz = round_jiffies_relative(HZ * delay);
4499 	schedule_delayed_work(&mdsc->delayed_work, hz);
4500 }
4501 
4502 static void delayed_work(struct work_struct *work)
4503 {
4504 	int i;
4505 	struct ceph_mds_client *mdsc =
4506 		container_of(work, struct ceph_mds_client, delayed_work.work);
4507 	int renew_interval;
4508 	int renew_caps;
4509 
4510 	dout("mdsc delayed_work\n");
4511 
4512 	if (mdsc->stopping)
4513 		return;
4514 
4515 	mutex_lock(&mdsc->mutex);
4516 	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4517 	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4518 				   mdsc->last_renew_caps);
4519 	if (renew_caps)
4520 		mdsc->last_renew_caps = jiffies;
4521 
4522 	for (i = 0; i < mdsc->max_sessions; i++) {
4523 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4524 		if (!s)
4525 			continue;
4526 
4527 		if (!check_session_state(s)) {
4528 			ceph_put_mds_session(s);
4529 			continue;
4530 		}
4531 		mutex_unlock(&mdsc->mutex);
4532 
4533 		mutex_lock(&s->s_mutex);
4534 		if (renew_caps)
4535 			send_renew_caps(mdsc, s);
4536 		else
4537 			ceph_con_keepalive(&s->s_con);
4538 		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4539 		    s->s_state == CEPH_MDS_SESSION_HUNG)
4540 			ceph_send_cap_releases(mdsc, s);
4541 		mutex_unlock(&s->s_mutex);
4542 		ceph_put_mds_session(s);
4543 
4544 		mutex_lock(&mdsc->mutex);
4545 	}
4546 	mutex_unlock(&mdsc->mutex);
4547 
4548 	ceph_check_delayed_caps(mdsc);
4549 
4550 	ceph_queue_cap_reclaim_work(mdsc);
4551 
4552 	ceph_trim_snapid_map(mdsc);
4553 
4554 	maybe_recover_session(mdsc);
4555 
4556 	schedule_delayed(mdsc);
4557 }
4558 
4559 int ceph_mdsc_init(struct ceph_fs_client *fsc)
4560 
4561 {
4562 	struct ceph_mds_client *mdsc;
4563 	int err;
4564 
4565 	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4566 	if (!mdsc)
4567 		return -ENOMEM;
4568 	mdsc->fsc = fsc;
4569 	mutex_init(&mdsc->mutex);
4570 	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4571 	if (!mdsc->mdsmap) {
4572 		err = -ENOMEM;
4573 		goto err_mdsc;
4574 	}
4575 
4576 	init_completion(&mdsc->safe_umount_waiters);
4577 	init_waitqueue_head(&mdsc->session_close_wq);
4578 	INIT_LIST_HEAD(&mdsc->waiting_for_map);
4579 	mdsc->sessions = NULL;
4580 	atomic_set(&mdsc->num_sessions, 0);
4581 	mdsc->max_sessions = 0;
4582 	mdsc->stopping = 0;
4583 	atomic64_set(&mdsc->quotarealms_count, 0);
4584 	mdsc->quotarealms_inodes = RB_ROOT;
4585 	mutex_init(&mdsc->quotarealms_inodes_mutex);
4586 	mdsc->last_snap_seq = 0;
4587 	init_rwsem(&mdsc->snap_rwsem);
4588 	mdsc->snap_realms = RB_ROOT;
4589 	INIT_LIST_HEAD(&mdsc->snap_empty);
4590 	mdsc->num_snap_realms = 0;
4591 	spin_lock_init(&mdsc->snap_empty_lock);
4592 	mdsc->last_tid = 0;
4593 	mdsc->oldest_tid = 0;
4594 	mdsc->request_tree = RB_ROOT;
4595 	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4596 	mdsc->last_renew_caps = jiffies;
4597 	INIT_LIST_HEAD(&mdsc->cap_delay_list);
4598 	INIT_LIST_HEAD(&mdsc->cap_wait_list);
4599 	spin_lock_init(&mdsc->cap_delay_lock);
4600 	INIT_LIST_HEAD(&mdsc->snap_flush_list);
4601 	spin_lock_init(&mdsc->snap_flush_lock);
4602 	mdsc->last_cap_flush_tid = 1;
4603 	INIT_LIST_HEAD(&mdsc->cap_flush_list);
4604 	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4605 	mdsc->num_cap_flushing = 0;
4606 	spin_lock_init(&mdsc->cap_dirty_lock);
4607 	init_waitqueue_head(&mdsc->cap_flushing_wq);
4608 	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4609 	atomic_set(&mdsc->cap_reclaim_pending, 0);
4610 	err = ceph_metric_init(&mdsc->metric);
4611 	if (err)
4612 		goto err_mdsmap;
4613 
4614 	spin_lock_init(&mdsc->dentry_list_lock);
4615 	INIT_LIST_HEAD(&mdsc->dentry_leases);
4616 	INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4617 
4618 	ceph_caps_init(mdsc);
4619 	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4620 
4621 	spin_lock_init(&mdsc->snapid_map_lock);
4622 	mdsc->snapid_map_tree = RB_ROOT;
4623 	INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4624 
4625 	init_rwsem(&mdsc->pool_perm_rwsem);
4626 	mdsc->pool_perm_tree = RB_ROOT;
4627 
4628 	strscpy(mdsc->nodename, utsname()->nodename,
4629 		sizeof(mdsc->nodename));
4630 
4631 	fsc->mdsc = mdsc;
4632 	return 0;
4633 
4634 err_mdsmap:
4635 	kfree(mdsc->mdsmap);
4636 err_mdsc:
4637 	kfree(mdsc);
4638 	return err;
4639 }
4640 
4641 /*
4642  * Wait for safe replies on open mds requests.  If we time out, drop
4643  * all requests from the tree to avoid dangling dentry refs.
4644  */
4645 static void wait_requests(struct ceph_mds_client *mdsc)
4646 {
4647 	struct ceph_options *opts = mdsc->fsc->client->options;
4648 	struct ceph_mds_request *req;
4649 
4650 	mutex_lock(&mdsc->mutex);
4651 	if (__get_oldest_req(mdsc)) {
4652 		mutex_unlock(&mdsc->mutex);
4653 
4654 		dout("wait_requests waiting for requests\n");
4655 		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4656 				    ceph_timeout_jiffies(opts->mount_timeout));
4657 
4658 		/* tear down remaining requests */
4659 		mutex_lock(&mdsc->mutex);
4660 		while ((req = __get_oldest_req(mdsc))) {
4661 			dout("wait_requests timed out on tid %llu\n",
4662 			     req->r_tid);
4663 			list_del_init(&req->r_wait);
4664 			__unregister_request(mdsc, req);
4665 		}
4666 	}
4667 	mutex_unlock(&mdsc->mutex);
4668 	dout("wait_requests done\n");
4669 }
4670 
4671 /*
4672  * called before mount is ro, and before dentries are torn down.
4673  * (hmm, does this still race with new lookups?)
4674  */
4675 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4676 {
4677 	dout("pre_umount\n");
4678 	mdsc->stopping = 1;
4679 
4680 	lock_unlock_sessions(mdsc);
4681 	ceph_flush_dirty_caps(mdsc);
4682 	wait_requests(mdsc);
4683 
4684 	/*
4685 	 * wait for reply handlers to drop their request refs and
4686 	 * their inode/dcache refs
4687 	 */
4688 	ceph_msgr_flush();
4689 
4690 	ceph_cleanup_quotarealms_inodes(mdsc);
4691 }
4692 
4693 /*
4694  * wait for all write mds requests to flush.
4695  */
4696 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4697 {
4698 	struct ceph_mds_request *req = NULL, *nextreq;
4699 	struct rb_node *n;
4700 
4701 	mutex_lock(&mdsc->mutex);
4702 	dout("wait_unsafe_requests want %lld\n", want_tid);
4703 restart:
4704 	req = __get_oldest_req(mdsc);
4705 	while (req && req->r_tid <= want_tid) {
4706 		/* find next request */
4707 		n = rb_next(&req->r_node);
4708 		if (n)
4709 			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4710 		else
4711 			nextreq = NULL;
4712 		if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4713 		    (req->r_op & CEPH_MDS_OP_WRITE)) {
4714 			/* write op */
4715 			ceph_mdsc_get_request(req);
4716 			if (nextreq)
4717 				ceph_mdsc_get_request(nextreq);
4718 			mutex_unlock(&mdsc->mutex);
4719 			dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
4720 			     req->r_tid, want_tid);
4721 			wait_for_completion(&req->r_safe_completion);
4722 			mutex_lock(&mdsc->mutex);
4723 			ceph_mdsc_put_request(req);
4724 			if (!nextreq)
4725 				break;  /* next dne before, so we're done! */
4726 			if (RB_EMPTY_NODE(&nextreq->r_node)) {
4727 				/* next request was removed from tree */
4728 				ceph_mdsc_put_request(nextreq);
4729 				goto restart;
4730 			}
4731 			ceph_mdsc_put_request(nextreq);  /* won't go away */
4732 		}
4733 		req = nextreq;
4734 	}
4735 	mutex_unlock(&mdsc->mutex);
4736 	dout("wait_unsafe_requests done\n");
4737 }
4738 
4739 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4740 {
4741 	u64 want_tid, want_flush;
4742 
4743 	if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
4744 		return;
4745 
4746 	dout("sync\n");
4747 	mutex_lock(&mdsc->mutex);
4748 	want_tid = mdsc->last_tid;
4749 	mutex_unlock(&mdsc->mutex);
4750 
4751 	ceph_flush_dirty_caps(mdsc);
4752 	spin_lock(&mdsc->cap_dirty_lock);
4753 	want_flush = mdsc->last_cap_flush_tid;
4754 	if (!list_empty(&mdsc->cap_flush_list)) {
4755 		struct ceph_cap_flush *cf =
4756 			list_last_entry(&mdsc->cap_flush_list,
4757 					struct ceph_cap_flush, g_list);
4758 		cf->wake = true;
4759 	}
4760 	spin_unlock(&mdsc->cap_dirty_lock);
4761 
4762 	dout("sync want tid %lld flush_seq %lld\n",
4763 	     want_tid, want_flush);
4764 
4765 	wait_unsafe_requests(mdsc, want_tid);
4766 	wait_caps_flush(mdsc, want_flush);
4767 }
4768 
4769 /*
4770  * true if all sessions are closed, or we force unmount
4771  */
4772 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4773 {
4774 	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4775 		return true;
4776 	return atomic_read(&mdsc->num_sessions) <= skipped;
4777 }
4778 
4779 /*
4780  * called after sb is ro.
4781  */
4782 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4783 {
4784 	struct ceph_options *opts = mdsc->fsc->client->options;
4785 	struct ceph_mds_session *session;
4786 	int i;
4787 	int skipped = 0;
4788 
4789 	dout("close_sessions\n");
4790 
4791 	/* close sessions */
4792 	mutex_lock(&mdsc->mutex);
4793 	for (i = 0; i < mdsc->max_sessions; i++) {
4794 		session = __ceph_lookup_mds_session(mdsc, i);
4795 		if (!session)
4796 			continue;
4797 		mutex_unlock(&mdsc->mutex);
4798 		mutex_lock(&session->s_mutex);
4799 		if (__close_session(mdsc, session) <= 0)
4800 			skipped++;
4801 		mutex_unlock(&session->s_mutex);
4802 		ceph_put_mds_session(session);
4803 		mutex_lock(&mdsc->mutex);
4804 	}
4805 	mutex_unlock(&mdsc->mutex);
4806 
4807 	dout("waiting for sessions to close\n");
4808 	wait_event_timeout(mdsc->session_close_wq,
4809 			   done_closing_sessions(mdsc, skipped),
4810 			   ceph_timeout_jiffies(opts->mount_timeout));
4811 
4812 	/* tear down remaining sessions */
4813 	mutex_lock(&mdsc->mutex);
4814 	for (i = 0; i < mdsc->max_sessions; i++) {
4815 		if (mdsc->sessions[i]) {
4816 			session = ceph_get_mds_session(mdsc->sessions[i]);
4817 			__unregister_session(mdsc, session);
4818 			mutex_unlock(&mdsc->mutex);
4819 			mutex_lock(&session->s_mutex);
4820 			remove_session_caps(session);
4821 			mutex_unlock(&session->s_mutex);
4822 			ceph_put_mds_session(session);
4823 			mutex_lock(&mdsc->mutex);
4824 		}
4825 	}
4826 	WARN_ON(!list_empty(&mdsc->cap_delay_list));
4827 	mutex_unlock(&mdsc->mutex);
4828 
4829 	ceph_cleanup_snapid_map(mdsc);
4830 	ceph_cleanup_empty_realms(mdsc);
4831 
4832 	cancel_work_sync(&mdsc->cap_reclaim_work);
4833 	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4834 
4835 	dout("stopped\n");
4836 }
4837 
4838 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4839 {
4840 	struct ceph_mds_session *session;
4841 	int mds;
4842 
4843 	dout("force umount\n");
4844 
4845 	mutex_lock(&mdsc->mutex);
4846 	for (mds = 0; mds < mdsc->max_sessions; mds++) {
4847 		session = __ceph_lookup_mds_session(mdsc, mds);
4848 		if (!session)
4849 			continue;
4850 
4851 		if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4852 			__unregister_session(mdsc, session);
4853 		__wake_requests(mdsc, &session->s_waiting);
4854 		mutex_unlock(&mdsc->mutex);
4855 
4856 		mutex_lock(&session->s_mutex);
4857 		__close_session(mdsc, session);
4858 		if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4859 			cleanup_session_requests(mdsc, session);
4860 			remove_session_caps(session);
4861 		}
4862 		mutex_unlock(&session->s_mutex);
4863 		ceph_put_mds_session(session);
4864 
4865 		mutex_lock(&mdsc->mutex);
4866 		kick_requests(mdsc, mds);
4867 	}
4868 	__wake_requests(mdsc, &mdsc->waiting_for_map);
4869 	mutex_unlock(&mdsc->mutex);
4870 }
4871 
4872 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4873 {
4874 	dout("stop\n");
4875 	/*
4876 	 * Make sure the delayed work stopped before releasing
4877 	 * the resources.
4878 	 *
4879 	 * Because the cancel_delayed_work_sync() will only
4880 	 * guarantee that the work finishes executing. But the
4881 	 * delayed work will re-arm itself again after that.
4882 	 */
4883 	flush_delayed_work(&mdsc->delayed_work);
4884 
4885 	if (mdsc->mdsmap)
4886 		ceph_mdsmap_destroy(mdsc->mdsmap);
4887 	kfree(mdsc->sessions);
4888 	ceph_caps_finalize(mdsc);
4889 	ceph_pool_perm_destroy(mdsc);
4890 }
4891 
4892 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4893 {
4894 	struct ceph_mds_client *mdsc = fsc->mdsc;
4895 	dout("mdsc_destroy %p\n", mdsc);
4896 
4897 	if (!mdsc)
4898 		return;
4899 
4900 	/* flush out any connection work with references to us */
4901 	ceph_msgr_flush();
4902 
4903 	ceph_mdsc_stop(mdsc);
4904 
4905 	ceph_metric_destroy(&mdsc->metric);
4906 
4907 	flush_delayed_work(&mdsc->metric.delayed_work);
4908 	fsc->mdsc = NULL;
4909 	kfree(mdsc);
4910 	dout("mdsc_destroy %p done\n", mdsc);
4911 }
4912 
4913 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4914 {
4915 	struct ceph_fs_client *fsc = mdsc->fsc;
4916 	const char *mds_namespace = fsc->mount_options->mds_namespace;
4917 	void *p = msg->front.iov_base;
4918 	void *end = p + msg->front.iov_len;
4919 	u32 epoch;
4920 	u32 num_fs;
4921 	u32 mount_fscid = (u32)-1;
4922 	int err = -EINVAL;
4923 
4924 	ceph_decode_need(&p, end, sizeof(u32), bad);
4925 	epoch = ceph_decode_32(&p);
4926 
4927 	dout("handle_fsmap epoch %u\n", epoch);
4928 
4929 	/* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
4930 	ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
4931 
4932 	ceph_decode_32_safe(&p, end, num_fs, bad);
4933 	while (num_fs-- > 0) {
4934 		void *info_p, *info_end;
4935 		u32 info_len;
4936 		u32 fscid, namelen;
4937 
4938 		ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4939 		p += 2;		// info_v, info_cv
4940 		info_len = ceph_decode_32(&p);
4941 		ceph_decode_need(&p, end, info_len, bad);
4942 		info_p = p;
4943 		info_end = p + info_len;
4944 		p = info_end;
4945 
4946 		ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4947 		fscid = ceph_decode_32(&info_p);
4948 		namelen = ceph_decode_32(&info_p);
4949 		ceph_decode_need(&info_p, info_end, namelen, bad);
4950 
4951 		if (mds_namespace &&
4952 		    strlen(mds_namespace) == namelen &&
4953 		    !strncmp(mds_namespace, (char *)info_p, namelen)) {
4954 			mount_fscid = fscid;
4955 			break;
4956 		}
4957 	}
4958 
4959 	ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4960 	if (mount_fscid != (u32)-1) {
4961 		fsc->client->monc.fs_cluster_id = mount_fscid;
4962 		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4963 				   0, true);
4964 		ceph_monc_renew_subs(&fsc->client->monc);
4965 	} else {
4966 		err = -ENOENT;
4967 		goto err_out;
4968 	}
4969 	return;
4970 
4971 bad:
4972 	pr_err("error decoding fsmap\n");
4973 err_out:
4974 	mutex_lock(&mdsc->mutex);
4975 	mdsc->mdsmap_err = err;
4976 	__wake_requests(mdsc, &mdsc->waiting_for_map);
4977 	mutex_unlock(&mdsc->mutex);
4978 }
4979 
4980 /*
4981  * handle mds map update.
4982  */
4983 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4984 {
4985 	u32 epoch;
4986 	u32 maplen;
4987 	void *p = msg->front.iov_base;
4988 	void *end = p + msg->front.iov_len;
4989 	struct ceph_mdsmap *newmap, *oldmap;
4990 	struct ceph_fsid fsid;
4991 	int err = -EINVAL;
4992 
4993 	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
4994 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
4995 	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
4996 		return;
4997 	epoch = ceph_decode_32(&p);
4998 	maplen = ceph_decode_32(&p);
4999 	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5000 
5001 	/* do we need it? */
5002 	mutex_lock(&mdsc->mutex);
5003 	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5004 		dout("handle_map epoch %u <= our %u\n",
5005 		     epoch, mdsc->mdsmap->m_epoch);
5006 		mutex_unlock(&mdsc->mutex);
5007 		return;
5008 	}
5009 
5010 	newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client));
5011 	if (IS_ERR(newmap)) {
5012 		err = PTR_ERR(newmap);
5013 		goto bad_unlock;
5014 	}
5015 
5016 	/* swap into place */
5017 	if (mdsc->mdsmap) {
5018 		oldmap = mdsc->mdsmap;
5019 		mdsc->mdsmap = newmap;
5020 		check_new_map(mdsc, newmap, oldmap);
5021 		ceph_mdsmap_destroy(oldmap);
5022 	} else {
5023 		mdsc->mdsmap = newmap;  /* first mds map */
5024 	}
5025 	mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5026 					MAX_LFS_FILESIZE);
5027 
5028 	__wake_requests(mdsc, &mdsc->waiting_for_map);
5029 	ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5030 			  mdsc->mdsmap->m_epoch);
5031 
5032 	mutex_unlock(&mdsc->mutex);
5033 	schedule_delayed(mdsc);
5034 	return;
5035 
5036 bad_unlock:
5037 	mutex_unlock(&mdsc->mutex);
5038 bad:
5039 	pr_err("error decoding mdsmap %d\n", err);
5040 	return;
5041 }
5042 
5043 static struct ceph_connection *mds_get_con(struct ceph_connection *con)
5044 {
5045 	struct ceph_mds_session *s = con->private;
5046 
5047 	if (ceph_get_mds_session(s))
5048 		return con;
5049 	return NULL;
5050 }
5051 
5052 static void mds_put_con(struct ceph_connection *con)
5053 {
5054 	struct ceph_mds_session *s = con->private;
5055 
5056 	ceph_put_mds_session(s);
5057 }
5058 
5059 /*
5060  * if the client is unresponsive for long enough, the mds will kill
5061  * the session entirely.
5062  */
5063 static void mds_peer_reset(struct ceph_connection *con)
5064 {
5065 	struct ceph_mds_session *s = con->private;
5066 	struct ceph_mds_client *mdsc = s->s_mdsc;
5067 
5068 	pr_warn("mds%d closed our session\n", s->s_mds);
5069 	send_mds_reconnect(mdsc, s);
5070 }
5071 
5072 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5073 {
5074 	struct ceph_mds_session *s = con->private;
5075 	struct ceph_mds_client *mdsc = s->s_mdsc;
5076 	int type = le16_to_cpu(msg->hdr.type);
5077 
5078 	mutex_lock(&mdsc->mutex);
5079 	if (__verify_registered_session(mdsc, s) < 0) {
5080 		mutex_unlock(&mdsc->mutex);
5081 		goto out;
5082 	}
5083 	mutex_unlock(&mdsc->mutex);
5084 
5085 	switch (type) {
5086 	case CEPH_MSG_MDS_MAP:
5087 		ceph_mdsc_handle_mdsmap(mdsc, msg);
5088 		break;
5089 	case CEPH_MSG_FS_MAP_USER:
5090 		ceph_mdsc_handle_fsmap(mdsc, msg);
5091 		break;
5092 	case CEPH_MSG_CLIENT_SESSION:
5093 		handle_session(s, msg);
5094 		break;
5095 	case CEPH_MSG_CLIENT_REPLY:
5096 		handle_reply(s, msg);
5097 		break;
5098 	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5099 		handle_forward(mdsc, s, msg);
5100 		break;
5101 	case CEPH_MSG_CLIENT_CAPS:
5102 		ceph_handle_caps(s, msg);
5103 		break;
5104 	case CEPH_MSG_CLIENT_SNAP:
5105 		ceph_handle_snap(mdsc, s, msg);
5106 		break;
5107 	case CEPH_MSG_CLIENT_LEASE:
5108 		handle_lease(mdsc, s, msg);
5109 		break;
5110 	case CEPH_MSG_CLIENT_QUOTA:
5111 		ceph_handle_quota(mdsc, s, msg);
5112 		break;
5113 
5114 	default:
5115 		pr_err("received unknown message type %d %s\n", type,
5116 		       ceph_msg_type_name(type));
5117 	}
5118 out:
5119 	ceph_msg_put(msg);
5120 }
5121 
5122 /*
5123  * authentication
5124  */
5125 
5126 /*
5127  * Note: returned pointer is the address of a structure that's
5128  * managed separately.  Caller must *not* attempt to free it.
5129  */
5130 static struct ceph_auth_handshake *
5131 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5132 {
5133 	struct ceph_mds_session *s = con->private;
5134 	struct ceph_mds_client *mdsc = s->s_mdsc;
5135 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5136 	struct ceph_auth_handshake *auth = &s->s_auth;
5137 	int ret;
5138 
5139 	ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5140 					 force_new, proto, NULL, NULL);
5141 	if (ret)
5142 		return ERR_PTR(ret);
5143 
5144 	return auth;
5145 }
5146 
5147 static int mds_add_authorizer_challenge(struct ceph_connection *con,
5148 				    void *challenge_buf, int challenge_buf_len)
5149 {
5150 	struct ceph_mds_session *s = con->private;
5151 	struct ceph_mds_client *mdsc = s->s_mdsc;
5152 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5153 
5154 	return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5155 					    challenge_buf, challenge_buf_len);
5156 }
5157 
5158 static int mds_verify_authorizer_reply(struct ceph_connection *con)
5159 {
5160 	struct ceph_mds_session *s = con->private;
5161 	struct ceph_mds_client *mdsc = s->s_mdsc;
5162 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5163 	struct ceph_auth_handshake *auth = &s->s_auth;
5164 
5165 	return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5166 		auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5167 		NULL, NULL, NULL, NULL);
5168 }
5169 
5170 static int mds_invalidate_authorizer(struct ceph_connection *con)
5171 {
5172 	struct ceph_mds_session *s = con->private;
5173 	struct ceph_mds_client *mdsc = s->s_mdsc;
5174 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5175 
5176 	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5177 
5178 	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5179 }
5180 
5181 static int mds_get_auth_request(struct ceph_connection *con,
5182 				void *buf, int *buf_len,
5183 				void **authorizer, int *authorizer_len)
5184 {
5185 	struct ceph_mds_session *s = con->private;
5186 	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5187 	struct ceph_auth_handshake *auth = &s->s_auth;
5188 	int ret;
5189 
5190 	ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5191 				       buf, buf_len);
5192 	if (ret)
5193 		return ret;
5194 
5195 	*authorizer = auth->authorizer_buf;
5196 	*authorizer_len = auth->authorizer_buf_len;
5197 	return 0;
5198 }
5199 
5200 static int mds_handle_auth_reply_more(struct ceph_connection *con,
5201 				      void *reply, int reply_len,
5202 				      void *buf, int *buf_len,
5203 				      void **authorizer, int *authorizer_len)
5204 {
5205 	struct ceph_mds_session *s = con->private;
5206 	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5207 	struct ceph_auth_handshake *auth = &s->s_auth;
5208 	int ret;
5209 
5210 	ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5211 					      buf, buf_len);
5212 	if (ret)
5213 		return ret;
5214 
5215 	*authorizer = auth->authorizer_buf;
5216 	*authorizer_len = auth->authorizer_buf_len;
5217 	return 0;
5218 }
5219 
5220 static int mds_handle_auth_done(struct ceph_connection *con,
5221 				u64 global_id, void *reply, int reply_len,
5222 				u8 *session_key, int *session_key_len,
5223 				u8 *con_secret, int *con_secret_len)
5224 {
5225 	struct ceph_mds_session *s = con->private;
5226 	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5227 	struct ceph_auth_handshake *auth = &s->s_auth;
5228 
5229 	return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5230 					       session_key, session_key_len,
5231 					       con_secret, con_secret_len);
5232 }
5233 
5234 static int mds_handle_auth_bad_method(struct ceph_connection *con,
5235 				      int used_proto, int result,
5236 				      const int *allowed_protos, int proto_cnt,
5237 				      const int *allowed_modes, int mode_cnt)
5238 {
5239 	struct ceph_mds_session *s = con->private;
5240 	struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
5241 	int ret;
5242 
5243 	if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
5244 					    used_proto, result,
5245 					    allowed_protos, proto_cnt,
5246 					    allowed_modes, mode_cnt)) {
5247 		ret = ceph_monc_validate_auth(monc);
5248 		if (ret)
5249 			return ret;
5250 	}
5251 
5252 	return -EACCES;
5253 }
5254 
5255 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5256 				struct ceph_msg_header *hdr, int *skip)
5257 {
5258 	struct ceph_msg *msg;
5259 	int type = (int) le16_to_cpu(hdr->type);
5260 	int front_len = (int) le32_to_cpu(hdr->front_len);
5261 
5262 	if (con->in_msg)
5263 		return con->in_msg;
5264 
5265 	*skip = 0;
5266 	msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5267 	if (!msg) {
5268 		pr_err("unable to allocate msg type %d len %d\n",
5269 		       type, front_len);
5270 		return NULL;
5271 	}
5272 
5273 	return msg;
5274 }
5275 
5276 static int mds_sign_message(struct ceph_msg *msg)
5277 {
5278        struct ceph_mds_session *s = msg->con->private;
5279        struct ceph_auth_handshake *auth = &s->s_auth;
5280 
5281        return ceph_auth_sign_message(auth, msg);
5282 }
5283 
5284 static int mds_check_message_signature(struct ceph_msg *msg)
5285 {
5286        struct ceph_mds_session *s = msg->con->private;
5287        struct ceph_auth_handshake *auth = &s->s_auth;
5288 
5289        return ceph_auth_check_message_signature(auth, msg);
5290 }
5291 
5292 static const struct ceph_connection_operations mds_con_ops = {
5293 	.get = mds_get_con,
5294 	.put = mds_put_con,
5295 	.alloc_msg = mds_alloc_msg,
5296 	.dispatch = mds_dispatch,
5297 	.peer_reset = mds_peer_reset,
5298 	.get_authorizer = mds_get_authorizer,
5299 	.add_authorizer_challenge = mds_add_authorizer_challenge,
5300 	.verify_authorizer_reply = mds_verify_authorizer_reply,
5301 	.invalidate_authorizer = mds_invalidate_authorizer,
5302 	.sign_message = mds_sign_message,
5303 	.check_message_signature = mds_check_message_signature,
5304 	.get_auth_request = mds_get_auth_request,
5305 	.handle_auth_reply_more = mds_handle_auth_reply_more,
5306 	.handle_auth_done = mds_handle_auth_done,
5307 	.handle_auth_bad_method = mds_handle_auth_bad_method,
5308 };
5309 
5310 /* eof */
5311