xref: /openbmc/linux/fs/ceph/mds_client.c (revision dd1fc3c5)
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3 
4 #include <linux/fs.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
7 #include <linux/gfp.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12 #include <linux/bits.h>
13 #include <linux/ktime.h>
14 
15 #include "super.h"
16 #include "mds_client.h"
17 
18 #include <linux/ceph/ceph_features.h>
19 #include <linux/ceph/messenger.h>
20 #include <linux/ceph/decode.h>
21 #include <linux/ceph/pagelist.h>
22 #include <linux/ceph/auth.h>
23 #include <linux/ceph/debugfs.h>
24 
25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
26 
27 /*
28  * A cluster of MDS (metadata server) daemons is responsible for
29  * managing the file system namespace (the directory hierarchy and
30  * inodes) and for coordinating shared access to storage.  Metadata is
31  * partitioning hierarchically across a number of servers, and that
32  * partition varies over time as the cluster adjusts the distribution
33  * in order to balance load.
34  *
35  * The MDS client is primarily responsible to managing synchronous
36  * metadata requests for operations like open, unlink, and so forth.
37  * If there is a MDS failure, we find out about it when we (possibly
38  * request and) receive a new MDS map, and can resubmit affected
39  * requests.
40  *
41  * For the most part, though, we take advantage of a lossless
42  * communications channel to the MDS, and do not need to worry about
43  * timing out or resubmitting requests.
44  *
45  * We maintain a stateful "session" with each MDS we interact with.
46  * Within each session, we sent periodic heartbeat messages to ensure
47  * any capabilities or leases we have been issues remain valid.  If
48  * the session times out and goes stale, our leases and capabilities
49  * are no longer valid.
50  */
51 
52 struct ceph_reconnect_state {
53 	struct ceph_mds_session *session;
54 	int nr_caps, nr_realms;
55 	struct ceph_pagelist *pagelist;
56 	unsigned msg_version;
57 	bool allow_multi;
58 };
59 
60 static void __wake_requests(struct ceph_mds_client *mdsc,
61 			    struct list_head *head);
62 static void ceph_cap_release_work(struct work_struct *work);
63 static void ceph_cap_reclaim_work(struct work_struct *work);
64 
65 static const struct ceph_connection_operations mds_con_ops;
66 
67 
68 /*
69  * mds reply parsing
70  */
71 
72 static int parse_reply_info_quota(void **p, void *end,
73 				  struct ceph_mds_reply_info_in *info)
74 {
75 	u8 struct_v, struct_compat;
76 	u32 struct_len;
77 
78 	ceph_decode_8_safe(p, end, struct_v, bad);
79 	ceph_decode_8_safe(p, end, struct_compat, bad);
80 	/* struct_v is expected to be >= 1. we only
81 	 * understand encoding with struct_compat == 1. */
82 	if (!struct_v || struct_compat != 1)
83 		goto bad;
84 	ceph_decode_32_safe(p, end, struct_len, bad);
85 	ceph_decode_need(p, end, struct_len, bad);
86 	end = *p + struct_len;
87 	ceph_decode_64_safe(p, end, info->max_bytes, bad);
88 	ceph_decode_64_safe(p, end, info->max_files, bad);
89 	*p = end;
90 	return 0;
91 bad:
92 	return -EIO;
93 }
94 
95 /*
96  * parse individual inode info
97  */
98 static int parse_reply_info_in(void **p, void *end,
99 			       struct ceph_mds_reply_info_in *info,
100 			       u64 features)
101 {
102 	int err = 0;
103 	u8 struct_v = 0;
104 
105 	if (features == (u64)-1) {
106 		u32 struct_len;
107 		u8 struct_compat;
108 		ceph_decode_8_safe(p, end, struct_v, bad);
109 		ceph_decode_8_safe(p, end, struct_compat, bad);
110 		/* struct_v is expected to be >= 1. we only understand
111 		 * encoding with struct_compat == 1. */
112 		if (!struct_v || struct_compat != 1)
113 			goto bad;
114 		ceph_decode_32_safe(p, end, struct_len, bad);
115 		ceph_decode_need(p, end, struct_len, bad);
116 		end = *p + struct_len;
117 	}
118 
119 	ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
120 	info->in = *p;
121 	*p += sizeof(struct ceph_mds_reply_inode) +
122 		sizeof(*info->in->fragtree.splits) *
123 		le32_to_cpu(info->in->fragtree.nsplits);
124 
125 	ceph_decode_32_safe(p, end, info->symlink_len, bad);
126 	ceph_decode_need(p, end, info->symlink_len, bad);
127 	info->symlink = *p;
128 	*p += info->symlink_len;
129 
130 	ceph_decode_copy_safe(p, end, &info->dir_layout,
131 			      sizeof(info->dir_layout), bad);
132 	ceph_decode_32_safe(p, end, info->xattr_len, bad);
133 	ceph_decode_need(p, end, info->xattr_len, bad);
134 	info->xattr_data = *p;
135 	*p += info->xattr_len;
136 
137 	if (features == (u64)-1) {
138 		/* inline data */
139 		ceph_decode_64_safe(p, end, info->inline_version, bad);
140 		ceph_decode_32_safe(p, end, info->inline_len, bad);
141 		ceph_decode_need(p, end, info->inline_len, bad);
142 		info->inline_data = *p;
143 		*p += info->inline_len;
144 		/* quota */
145 		err = parse_reply_info_quota(p, end, info);
146 		if (err < 0)
147 			goto out_bad;
148 		/* pool namespace */
149 		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
150 		if (info->pool_ns_len > 0) {
151 			ceph_decode_need(p, end, info->pool_ns_len, bad);
152 			info->pool_ns_data = *p;
153 			*p += info->pool_ns_len;
154 		}
155 
156 		/* btime */
157 		ceph_decode_need(p, end, sizeof(info->btime), bad);
158 		ceph_decode_copy(p, &info->btime, sizeof(info->btime));
159 
160 		/* change attribute */
161 		ceph_decode_64_safe(p, end, info->change_attr, bad);
162 
163 		/* dir pin */
164 		if (struct_v >= 2) {
165 			ceph_decode_32_safe(p, end, info->dir_pin, bad);
166 		} else {
167 			info->dir_pin = -ENODATA;
168 		}
169 
170 		/* snapshot birth time, remains zero for v<=2 */
171 		if (struct_v >= 3) {
172 			ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
173 			ceph_decode_copy(p, &info->snap_btime,
174 					 sizeof(info->snap_btime));
175 		} else {
176 			memset(&info->snap_btime, 0, sizeof(info->snap_btime));
177 		}
178 
179 		/* snapshot count, remains zero for v<=3 */
180 		if (struct_v >= 4) {
181 			ceph_decode_64_safe(p, end, info->rsnaps, bad);
182 		} else {
183 			info->rsnaps = 0;
184 		}
185 
186 		*p = end;
187 	} else {
188 		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
189 			ceph_decode_64_safe(p, end, info->inline_version, bad);
190 			ceph_decode_32_safe(p, end, info->inline_len, bad);
191 			ceph_decode_need(p, end, info->inline_len, bad);
192 			info->inline_data = *p;
193 			*p += info->inline_len;
194 		} else
195 			info->inline_version = CEPH_INLINE_NONE;
196 
197 		if (features & CEPH_FEATURE_MDS_QUOTA) {
198 			err = parse_reply_info_quota(p, end, info);
199 			if (err < 0)
200 				goto out_bad;
201 		} else {
202 			info->max_bytes = 0;
203 			info->max_files = 0;
204 		}
205 
206 		info->pool_ns_len = 0;
207 		info->pool_ns_data = NULL;
208 		if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
209 			ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
210 			if (info->pool_ns_len > 0) {
211 				ceph_decode_need(p, end, info->pool_ns_len, bad);
212 				info->pool_ns_data = *p;
213 				*p += info->pool_ns_len;
214 			}
215 		}
216 
217 		if (features & CEPH_FEATURE_FS_BTIME) {
218 			ceph_decode_need(p, end, sizeof(info->btime), bad);
219 			ceph_decode_copy(p, &info->btime, sizeof(info->btime));
220 			ceph_decode_64_safe(p, end, info->change_attr, bad);
221 		}
222 
223 		info->dir_pin = -ENODATA;
224 		/* info->snap_btime and info->rsnaps remain zero */
225 	}
226 	return 0;
227 bad:
228 	err = -EIO;
229 out_bad:
230 	return err;
231 }
232 
233 static int parse_reply_info_dir(void **p, void *end,
234 				struct ceph_mds_reply_dirfrag **dirfrag,
235 				u64 features)
236 {
237 	if (features == (u64)-1) {
238 		u8 struct_v, struct_compat;
239 		u32 struct_len;
240 		ceph_decode_8_safe(p, end, struct_v, bad);
241 		ceph_decode_8_safe(p, end, struct_compat, bad);
242 		/* struct_v is expected to be >= 1. we only understand
243 		 * encoding whose struct_compat == 1. */
244 		if (!struct_v || struct_compat != 1)
245 			goto bad;
246 		ceph_decode_32_safe(p, end, struct_len, bad);
247 		ceph_decode_need(p, end, struct_len, bad);
248 		end = *p + struct_len;
249 	}
250 
251 	ceph_decode_need(p, end, sizeof(**dirfrag), bad);
252 	*dirfrag = *p;
253 	*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
254 	if (unlikely(*p > end))
255 		goto bad;
256 	if (features == (u64)-1)
257 		*p = end;
258 	return 0;
259 bad:
260 	return -EIO;
261 }
262 
263 static int parse_reply_info_lease(void **p, void *end,
264 				  struct ceph_mds_reply_lease **lease,
265 				  u64 features)
266 {
267 	if (features == (u64)-1) {
268 		u8 struct_v, struct_compat;
269 		u32 struct_len;
270 		ceph_decode_8_safe(p, end, struct_v, bad);
271 		ceph_decode_8_safe(p, end, struct_compat, bad);
272 		/* struct_v is expected to be >= 1. we only understand
273 		 * encoding whose struct_compat == 1. */
274 		if (!struct_v || struct_compat != 1)
275 			goto bad;
276 		ceph_decode_32_safe(p, end, struct_len, bad);
277 		ceph_decode_need(p, end, struct_len, bad);
278 		end = *p + struct_len;
279 	}
280 
281 	ceph_decode_need(p, end, sizeof(**lease), bad);
282 	*lease = *p;
283 	*p += sizeof(**lease);
284 	if (features == (u64)-1)
285 		*p = end;
286 	return 0;
287 bad:
288 	return -EIO;
289 }
290 
291 /*
292  * parse a normal reply, which may contain a (dir+)dentry and/or a
293  * target inode.
294  */
295 static int parse_reply_info_trace(void **p, void *end,
296 				  struct ceph_mds_reply_info_parsed *info,
297 				  u64 features)
298 {
299 	int err;
300 
301 	if (info->head->is_dentry) {
302 		err = parse_reply_info_in(p, end, &info->diri, features);
303 		if (err < 0)
304 			goto out_bad;
305 
306 		err = parse_reply_info_dir(p, end, &info->dirfrag, features);
307 		if (err < 0)
308 			goto out_bad;
309 
310 		ceph_decode_32_safe(p, end, info->dname_len, bad);
311 		ceph_decode_need(p, end, info->dname_len, bad);
312 		info->dname = *p;
313 		*p += info->dname_len;
314 
315 		err = parse_reply_info_lease(p, end, &info->dlease, features);
316 		if (err < 0)
317 			goto out_bad;
318 	}
319 
320 	if (info->head->is_target) {
321 		err = parse_reply_info_in(p, end, &info->targeti, features);
322 		if (err < 0)
323 			goto out_bad;
324 	}
325 
326 	if (unlikely(*p != end))
327 		goto bad;
328 	return 0;
329 
330 bad:
331 	err = -EIO;
332 out_bad:
333 	pr_err("problem parsing mds trace %d\n", err);
334 	return err;
335 }
336 
337 /*
338  * parse readdir results
339  */
340 static int parse_reply_info_readdir(void **p, void *end,
341 				struct ceph_mds_reply_info_parsed *info,
342 				u64 features)
343 {
344 	u32 num, i = 0;
345 	int err;
346 
347 	err = parse_reply_info_dir(p, end, &info->dir_dir, features);
348 	if (err < 0)
349 		goto out_bad;
350 
351 	ceph_decode_need(p, end, sizeof(num) + 2, bad);
352 	num = ceph_decode_32(p);
353 	{
354 		u16 flags = ceph_decode_16(p);
355 		info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
356 		info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
357 		info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
358 		info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
359 	}
360 	if (num == 0)
361 		goto done;
362 
363 	BUG_ON(!info->dir_entries);
364 	if ((unsigned long)(info->dir_entries + num) >
365 	    (unsigned long)info->dir_entries + info->dir_buf_size) {
366 		pr_err("dir contents are larger than expected\n");
367 		WARN_ON(1);
368 		goto bad;
369 	}
370 
371 	info->dir_nr = num;
372 	while (num) {
373 		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
374 		/* dentry */
375 		ceph_decode_32_safe(p, end, rde->name_len, bad);
376 		ceph_decode_need(p, end, rde->name_len, bad);
377 		rde->name = *p;
378 		*p += rde->name_len;
379 		dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
380 
381 		/* dentry lease */
382 		err = parse_reply_info_lease(p, end, &rde->lease, features);
383 		if (err)
384 			goto out_bad;
385 		/* inode */
386 		err = parse_reply_info_in(p, end, &rde->inode, features);
387 		if (err < 0)
388 			goto out_bad;
389 		/* ceph_readdir_prepopulate() will update it */
390 		rde->offset = 0;
391 		i++;
392 		num--;
393 	}
394 
395 done:
396 	/* Skip over any unrecognized fields */
397 	*p = end;
398 	return 0;
399 
400 bad:
401 	err = -EIO;
402 out_bad:
403 	pr_err("problem parsing dir contents %d\n", err);
404 	return err;
405 }
406 
407 /*
408  * parse fcntl F_GETLK results
409  */
410 static int parse_reply_info_filelock(void **p, void *end,
411 				     struct ceph_mds_reply_info_parsed *info,
412 				     u64 features)
413 {
414 	if (*p + sizeof(*info->filelock_reply) > end)
415 		goto bad;
416 
417 	info->filelock_reply = *p;
418 
419 	/* Skip over any unrecognized fields */
420 	*p = end;
421 	return 0;
422 bad:
423 	return -EIO;
424 }
425 
426 
427 #if BITS_PER_LONG == 64
428 
429 #define DELEGATED_INO_AVAILABLE		xa_mk_value(1)
430 
431 static int ceph_parse_deleg_inos(void **p, void *end,
432 				 struct ceph_mds_session *s)
433 {
434 	u32 sets;
435 
436 	ceph_decode_32_safe(p, end, sets, bad);
437 	dout("got %u sets of delegated inodes\n", sets);
438 	while (sets--) {
439 		u64 start, len, ino;
440 
441 		ceph_decode_64_safe(p, end, start, bad);
442 		ceph_decode_64_safe(p, end, len, bad);
443 
444 		/* Don't accept a delegation of system inodes */
445 		if (start < CEPH_INO_SYSTEM_BASE) {
446 			pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
447 					start, len);
448 			continue;
449 		}
450 		while (len--) {
451 			int err = xa_insert(&s->s_delegated_inos, ino = start++,
452 					    DELEGATED_INO_AVAILABLE,
453 					    GFP_KERNEL);
454 			if (!err) {
455 				dout("added delegated inode 0x%llx\n",
456 				     start - 1);
457 			} else if (err == -EBUSY) {
458 				pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
459 					start - 1);
460 			} else {
461 				return err;
462 			}
463 		}
464 	}
465 	return 0;
466 bad:
467 	return -EIO;
468 }
469 
470 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
471 {
472 	unsigned long ino;
473 	void *val;
474 
475 	xa_for_each(&s->s_delegated_inos, ino, val) {
476 		val = xa_erase(&s->s_delegated_inos, ino);
477 		if (val == DELEGATED_INO_AVAILABLE)
478 			return ino;
479 	}
480 	return 0;
481 }
482 
483 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
484 {
485 	return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
486 			 GFP_KERNEL);
487 }
488 #else /* BITS_PER_LONG == 64 */
489 /*
490  * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
491  * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
492  * and bottom words?
493  */
494 static int ceph_parse_deleg_inos(void **p, void *end,
495 				 struct ceph_mds_session *s)
496 {
497 	u32 sets;
498 
499 	ceph_decode_32_safe(p, end, sets, bad);
500 	if (sets)
501 		ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
502 	return 0;
503 bad:
504 	return -EIO;
505 }
506 
507 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
508 {
509 	return 0;
510 }
511 
512 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
513 {
514 	return 0;
515 }
516 #endif /* BITS_PER_LONG == 64 */
517 
518 /*
519  * parse create results
520  */
521 static int parse_reply_info_create(void **p, void *end,
522 				  struct ceph_mds_reply_info_parsed *info,
523 				  u64 features, struct ceph_mds_session *s)
524 {
525 	int ret;
526 
527 	if (features == (u64)-1 ||
528 	    (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
529 		if (*p == end) {
530 			/* Malformed reply? */
531 			info->has_create_ino = false;
532 		} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
533 			info->has_create_ino = true;
534 			/* struct_v, struct_compat, and len */
535 			ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
536 			ceph_decode_64_safe(p, end, info->ino, bad);
537 			ret = ceph_parse_deleg_inos(p, end, s);
538 			if (ret)
539 				return ret;
540 		} else {
541 			/* legacy */
542 			ceph_decode_64_safe(p, end, info->ino, bad);
543 			info->has_create_ino = true;
544 		}
545 	} else {
546 		if (*p != end)
547 			goto bad;
548 	}
549 
550 	/* Skip over any unrecognized fields */
551 	*p = end;
552 	return 0;
553 bad:
554 	return -EIO;
555 }
556 
557 /*
558  * parse extra results
559  */
560 static int parse_reply_info_extra(void **p, void *end,
561 				  struct ceph_mds_reply_info_parsed *info,
562 				  u64 features, struct ceph_mds_session *s)
563 {
564 	u32 op = le32_to_cpu(info->head->op);
565 
566 	if (op == CEPH_MDS_OP_GETFILELOCK)
567 		return parse_reply_info_filelock(p, end, info, features);
568 	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
569 		return parse_reply_info_readdir(p, end, info, features);
570 	else if (op == CEPH_MDS_OP_CREATE)
571 		return parse_reply_info_create(p, end, info, features, s);
572 	else
573 		return -EIO;
574 }
575 
576 /*
577  * parse entire mds reply
578  */
579 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
580 			    struct ceph_mds_reply_info_parsed *info,
581 			    u64 features)
582 {
583 	void *p, *end;
584 	u32 len;
585 	int err;
586 
587 	info->head = msg->front.iov_base;
588 	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
589 	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
590 
591 	/* trace */
592 	ceph_decode_32_safe(&p, end, len, bad);
593 	if (len > 0) {
594 		ceph_decode_need(&p, end, len, bad);
595 		err = parse_reply_info_trace(&p, p+len, info, features);
596 		if (err < 0)
597 			goto out_bad;
598 	}
599 
600 	/* extra */
601 	ceph_decode_32_safe(&p, end, len, bad);
602 	if (len > 0) {
603 		ceph_decode_need(&p, end, len, bad);
604 		err = parse_reply_info_extra(&p, p+len, info, features, s);
605 		if (err < 0)
606 			goto out_bad;
607 	}
608 
609 	/* snap blob */
610 	ceph_decode_32_safe(&p, end, len, bad);
611 	info->snapblob_len = len;
612 	info->snapblob = p;
613 	p += len;
614 
615 	if (p != end)
616 		goto bad;
617 	return 0;
618 
619 bad:
620 	err = -EIO;
621 out_bad:
622 	pr_err("mds parse_reply err %d\n", err);
623 	return err;
624 }
625 
626 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
627 {
628 	if (!info->dir_entries)
629 		return;
630 	free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
631 }
632 
633 
634 /*
635  * sessions
636  */
637 const char *ceph_session_state_name(int s)
638 {
639 	switch (s) {
640 	case CEPH_MDS_SESSION_NEW: return "new";
641 	case CEPH_MDS_SESSION_OPENING: return "opening";
642 	case CEPH_MDS_SESSION_OPEN: return "open";
643 	case CEPH_MDS_SESSION_HUNG: return "hung";
644 	case CEPH_MDS_SESSION_CLOSING: return "closing";
645 	case CEPH_MDS_SESSION_CLOSED: return "closed";
646 	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
647 	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
648 	case CEPH_MDS_SESSION_REJECTED: return "rejected";
649 	default: return "???";
650 	}
651 }
652 
653 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
654 {
655 	if (refcount_inc_not_zero(&s->s_ref)) {
656 		dout("mdsc get_session %p %d -> %d\n", s,
657 		     refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
658 		return s;
659 	} else {
660 		dout("mdsc get_session %p 0 -- FAIL\n", s);
661 		return NULL;
662 	}
663 }
664 
665 void ceph_put_mds_session(struct ceph_mds_session *s)
666 {
667 	dout("mdsc put_session %p %d -> %d\n", s,
668 	     refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
669 	if (refcount_dec_and_test(&s->s_ref)) {
670 		if (s->s_auth.authorizer)
671 			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
672 		WARN_ON(mutex_is_locked(&s->s_mutex));
673 		xa_destroy(&s->s_delegated_inos);
674 		kfree(s);
675 	}
676 }
677 
678 /*
679  * called under mdsc->mutex
680  */
681 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
682 						   int mds)
683 {
684 	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
685 		return NULL;
686 	return ceph_get_mds_session(mdsc->sessions[mds]);
687 }
688 
689 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
690 {
691 	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
692 		return false;
693 	else
694 		return true;
695 }
696 
697 static int __verify_registered_session(struct ceph_mds_client *mdsc,
698 				       struct ceph_mds_session *s)
699 {
700 	if (s->s_mds >= mdsc->max_sessions ||
701 	    mdsc->sessions[s->s_mds] != s)
702 		return -ENOENT;
703 	return 0;
704 }
705 
706 /*
707  * create+register a new session for given mds.
708  * called under mdsc->mutex.
709  */
710 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
711 						 int mds)
712 {
713 	struct ceph_mds_session *s;
714 
715 	if (mds >= mdsc->mdsmap->possible_max_rank)
716 		return ERR_PTR(-EINVAL);
717 
718 	s = kzalloc(sizeof(*s), GFP_NOFS);
719 	if (!s)
720 		return ERR_PTR(-ENOMEM);
721 
722 	if (mds >= mdsc->max_sessions) {
723 		int newmax = 1 << get_count_order(mds + 1);
724 		struct ceph_mds_session **sa;
725 
726 		dout("%s: realloc to %d\n", __func__, newmax);
727 		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
728 		if (!sa)
729 			goto fail_realloc;
730 		if (mdsc->sessions) {
731 			memcpy(sa, mdsc->sessions,
732 			       mdsc->max_sessions * sizeof(void *));
733 			kfree(mdsc->sessions);
734 		}
735 		mdsc->sessions = sa;
736 		mdsc->max_sessions = newmax;
737 	}
738 
739 	dout("%s: mds%d\n", __func__, mds);
740 	s->s_mdsc = mdsc;
741 	s->s_mds = mds;
742 	s->s_state = CEPH_MDS_SESSION_NEW;
743 	s->s_ttl = 0;
744 	s->s_seq = 0;
745 	mutex_init(&s->s_mutex);
746 
747 	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
748 
749 	spin_lock_init(&s->s_gen_ttl_lock);
750 	s->s_cap_gen = 1;
751 	s->s_cap_ttl = jiffies - 1;
752 
753 	spin_lock_init(&s->s_cap_lock);
754 	s->s_renew_requested = 0;
755 	s->s_renew_seq = 0;
756 	INIT_LIST_HEAD(&s->s_caps);
757 	s->s_nr_caps = 0;
758 	refcount_set(&s->s_ref, 1);
759 	INIT_LIST_HEAD(&s->s_waiting);
760 	INIT_LIST_HEAD(&s->s_unsafe);
761 	xa_init(&s->s_delegated_inos);
762 	s->s_num_cap_releases = 0;
763 	s->s_cap_reconnect = 0;
764 	s->s_cap_iterator = NULL;
765 	INIT_LIST_HEAD(&s->s_cap_releases);
766 	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
767 
768 	INIT_LIST_HEAD(&s->s_cap_dirty);
769 	INIT_LIST_HEAD(&s->s_cap_flushing);
770 
771 	mdsc->sessions[mds] = s;
772 	atomic_inc(&mdsc->num_sessions);
773 	refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
774 
775 	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
776 		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
777 
778 	return s;
779 
780 fail_realloc:
781 	kfree(s);
782 	return ERR_PTR(-ENOMEM);
783 }
784 
785 /*
786  * called under mdsc->mutex
787  */
788 static void __unregister_session(struct ceph_mds_client *mdsc,
789 			       struct ceph_mds_session *s)
790 {
791 	dout("__unregister_session mds%d %p\n", s->s_mds, s);
792 	BUG_ON(mdsc->sessions[s->s_mds] != s);
793 	mdsc->sessions[s->s_mds] = NULL;
794 	ceph_con_close(&s->s_con);
795 	ceph_put_mds_session(s);
796 	atomic_dec(&mdsc->num_sessions);
797 }
798 
799 /*
800  * drop session refs in request.
801  *
802  * should be last request ref, or hold mdsc->mutex
803  */
804 static void put_request_session(struct ceph_mds_request *req)
805 {
806 	if (req->r_session) {
807 		ceph_put_mds_session(req->r_session);
808 		req->r_session = NULL;
809 	}
810 }
811 
812 void ceph_mdsc_release_request(struct kref *kref)
813 {
814 	struct ceph_mds_request *req = container_of(kref,
815 						    struct ceph_mds_request,
816 						    r_kref);
817 	ceph_mdsc_release_dir_caps_no_check(req);
818 	destroy_reply_info(&req->r_reply_info);
819 	if (req->r_request)
820 		ceph_msg_put(req->r_request);
821 	if (req->r_reply)
822 		ceph_msg_put(req->r_reply);
823 	if (req->r_inode) {
824 		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
825 		/* avoid calling iput_final() in mds dispatch threads */
826 		ceph_async_iput(req->r_inode);
827 	}
828 	if (req->r_parent) {
829 		ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
830 		ceph_async_iput(req->r_parent);
831 	}
832 	ceph_async_iput(req->r_target_inode);
833 	if (req->r_dentry)
834 		dput(req->r_dentry);
835 	if (req->r_old_dentry)
836 		dput(req->r_old_dentry);
837 	if (req->r_old_dentry_dir) {
838 		/*
839 		 * track (and drop pins for) r_old_dentry_dir
840 		 * separately, since r_old_dentry's d_parent may have
841 		 * changed between the dir mutex being dropped and
842 		 * this request being freed.
843 		 */
844 		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
845 				  CEPH_CAP_PIN);
846 		ceph_async_iput(req->r_old_dentry_dir);
847 	}
848 	kfree(req->r_path1);
849 	kfree(req->r_path2);
850 	put_cred(req->r_cred);
851 	if (req->r_pagelist)
852 		ceph_pagelist_release(req->r_pagelist);
853 	put_request_session(req);
854 	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
855 	WARN_ON_ONCE(!list_empty(&req->r_wait));
856 	kmem_cache_free(ceph_mds_request_cachep, req);
857 }
858 
859 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
860 
861 /*
862  * lookup session, bump ref if found.
863  *
864  * called under mdsc->mutex.
865  */
866 static struct ceph_mds_request *
867 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
868 {
869 	struct ceph_mds_request *req;
870 
871 	req = lookup_request(&mdsc->request_tree, tid);
872 	if (req)
873 		ceph_mdsc_get_request(req);
874 
875 	return req;
876 }
877 
878 /*
879  * Register an in-flight request, and assign a tid.  Link to directory
880  * are modifying (if any).
881  *
882  * Called under mdsc->mutex.
883  */
884 static void __register_request(struct ceph_mds_client *mdsc,
885 			       struct ceph_mds_request *req,
886 			       struct inode *dir)
887 {
888 	int ret = 0;
889 
890 	req->r_tid = ++mdsc->last_tid;
891 	if (req->r_num_caps) {
892 		ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
893 					req->r_num_caps);
894 		if (ret < 0) {
895 			pr_err("__register_request %p "
896 			       "failed to reserve caps: %d\n", req, ret);
897 			/* set req->r_err to fail early from __do_request */
898 			req->r_err = ret;
899 			return;
900 		}
901 	}
902 	dout("__register_request %p tid %lld\n", req, req->r_tid);
903 	ceph_mdsc_get_request(req);
904 	insert_request(&mdsc->request_tree, req);
905 
906 	req->r_cred = get_current_cred();
907 
908 	if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
909 		mdsc->oldest_tid = req->r_tid;
910 
911 	if (dir) {
912 		struct ceph_inode_info *ci = ceph_inode(dir);
913 
914 		ihold(dir);
915 		req->r_unsafe_dir = dir;
916 		spin_lock(&ci->i_unsafe_lock);
917 		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
918 		spin_unlock(&ci->i_unsafe_lock);
919 	}
920 }
921 
922 static void __unregister_request(struct ceph_mds_client *mdsc,
923 				 struct ceph_mds_request *req)
924 {
925 	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
926 
927 	/* Never leave an unregistered request on an unsafe list! */
928 	list_del_init(&req->r_unsafe_item);
929 
930 	if (req->r_tid == mdsc->oldest_tid) {
931 		struct rb_node *p = rb_next(&req->r_node);
932 		mdsc->oldest_tid = 0;
933 		while (p) {
934 			struct ceph_mds_request *next_req =
935 				rb_entry(p, struct ceph_mds_request, r_node);
936 			if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
937 				mdsc->oldest_tid = next_req->r_tid;
938 				break;
939 			}
940 			p = rb_next(p);
941 		}
942 	}
943 
944 	erase_request(&mdsc->request_tree, req);
945 
946 	if (req->r_unsafe_dir) {
947 		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
948 		spin_lock(&ci->i_unsafe_lock);
949 		list_del_init(&req->r_unsafe_dir_item);
950 		spin_unlock(&ci->i_unsafe_lock);
951 	}
952 	if (req->r_target_inode &&
953 	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
954 		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
955 		spin_lock(&ci->i_unsafe_lock);
956 		list_del_init(&req->r_unsafe_target_item);
957 		spin_unlock(&ci->i_unsafe_lock);
958 	}
959 
960 	if (req->r_unsafe_dir) {
961 		/* avoid calling iput_final() in mds dispatch threads */
962 		ceph_async_iput(req->r_unsafe_dir);
963 		req->r_unsafe_dir = NULL;
964 	}
965 
966 	complete_all(&req->r_safe_completion);
967 
968 	ceph_mdsc_put_request(req);
969 }
970 
971 /*
972  * Walk back up the dentry tree until we hit a dentry representing a
973  * non-snapshot inode. We do this using the rcu_read_lock (which must be held
974  * when calling this) to ensure that the objects won't disappear while we're
975  * working with them. Once we hit a candidate dentry, we attempt to take a
976  * reference to it, and return that as the result.
977  */
978 static struct inode *get_nonsnap_parent(struct dentry *dentry)
979 {
980 	struct inode *inode = NULL;
981 
982 	while (dentry && !IS_ROOT(dentry)) {
983 		inode = d_inode_rcu(dentry);
984 		if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
985 			break;
986 		dentry = dentry->d_parent;
987 	}
988 	if (inode)
989 		inode = igrab(inode);
990 	return inode;
991 }
992 
993 /*
994  * Choose mds to send request to next.  If there is a hint set in the
995  * request (e.g., due to a prior forward hint from the mds), use that.
996  * Otherwise, consult frag tree and/or caps to identify the
997  * appropriate mds.  If all else fails, choose randomly.
998  *
999  * Called under mdsc->mutex.
1000  */
1001 static int __choose_mds(struct ceph_mds_client *mdsc,
1002 			struct ceph_mds_request *req,
1003 			bool *random)
1004 {
1005 	struct inode *inode;
1006 	struct ceph_inode_info *ci;
1007 	struct ceph_cap *cap;
1008 	int mode = req->r_direct_mode;
1009 	int mds = -1;
1010 	u32 hash = req->r_direct_hash;
1011 	bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1012 
1013 	if (random)
1014 		*random = false;
1015 
1016 	/*
1017 	 * is there a specific mds we should try?  ignore hint if we have
1018 	 * no session and the mds is not up (active or recovering).
1019 	 */
1020 	if (req->r_resend_mds >= 0 &&
1021 	    (__have_session(mdsc, req->r_resend_mds) ||
1022 	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1023 		dout("%s using resend_mds mds%d\n", __func__,
1024 		     req->r_resend_mds);
1025 		return req->r_resend_mds;
1026 	}
1027 
1028 	if (mode == USE_RANDOM_MDS)
1029 		goto random;
1030 
1031 	inode = NULL;
1032 	if (req->r_inode) {
1033 		if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1034 			inode = req->r_inode;
1035 			ihold(inode);
1036 		} else {
1037 			/* req->r_dentry is non-null for LSSNAP request */
1038 			rcu_read_lock();
1039 			inode = get_nonsnap_parent(req->r_dentry);
1040 			rcu_read_unlock();
1041 			dout("%s using snapdir's parent %p\n", __func__, inode);
1042 		}
1043 	} else if (req->r_dentry) {
1044 		/* ignore race with rename; old or new d_parent is okay */
1045 		struct dentry *parent;
1046 		struct inode *dir;
1047 
1048 		rcu_read_lock();
1049 		parent = READ_ONCE(req->r_dentry->d_parent);
1050 		dir = req->r_parent ? : d_inode_rcu(parent);
1051 
1052 		if (!dir || dir->i_sb != mdsc->fsc->sb) {
1053 			/*  not this fs or parent went negative */
1054 			inode = d_inode(req->r_dentry);
1055 			if (inode)
1056 				ihold(inode);
1057 		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
1058 			/* direct snapped/virtual snapdir requests
1059 			 * based on parent dir inode */
1060 			inode = get_nonsnap_parent(parent);
1061 			dout("%s using nonsnap parent %p\n", __func__, inode);
1062 		} else {
1063 			/* dentry target */
1064 			inode = d_inode(req->r_dentry);
1065 			if (!inode || mode == USE_AUTH_MDS) {
1066 				/* dir + name */
1067 				inode = igrab(dir);
1068 				hash = ceph_dentry_hash(dir, req->r_dentry);
1069 				is_hash = true;
1070 			} else {
1071 				ihold(inode);
1072 			}
1073 		}
1074 		rcu_read_unlock();
1075 	}
1076 
1077 	dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1078 	     hash, mode);
1079 	if (!inode)
1080 		goto random;
1081 	ci = ceph_inode(inode);
1082 
1083 	if (is_hash && S_ISDIR(inode->i_mode)) {
1084 		struct ceph_inode_frag frag;
1085 		int found;
1086 
1087 		ceph_choose_frag(ci, hash, &frag, &found);
1088 		if (found) {
1089 			if (mode == USE_ANY_MDS && frag.ndist > 0) {
1090 				u8 r;
1091 
1092 				/* choose a random replica */
1093 				get_random_bytes(&r, 1);
1094 				r %= frag.ndist;
1095 				mds = frag.dist[r];
1096 				dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1097 				     __func__, inode, ceph_vinop(inode),
1098 				     frag.frag, mds, (int)r, frag.ndist);
1099 				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1100 				    CEPH_MDS_STATE_ACTIVE &&
1101 				    !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1102 					goto out;
1103 			}
1104 
1105 			/* since this file/dir wasn't known to be
1106 			 * replicated, then we want to look for the
1107 			 * authoritative mds. */
1108 			if (frag.mds >= 0) {
1109 				/* choose auth mds */
1110 				mds = frag.mds;
1111 				dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1112 				     __func__, inode, ceph_vinop(inode),
1113 				     frag.frag, mds);
1114 				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1115 				    CEPH_MDS_STATE_ACTIVE) {
1116 					if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1117 								  mds))
1118 						goto out;
1119 				}
1120 			}
1121 			mode = USE_AUTH_MDS;
1122 		}
1123 	}
1124 
1125 	spin_lock(&ci->i_ceph_lock);
1126 	cap = NULL;
1127 	if (mode == USE_AUTH_MDS)
1128 		cap = ci->i_auth_cap;
1129 	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1130 		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1131 	if (!cap) {
1132 		spin_unlock(&ci->i_ceph_lock);
1133 		ceph_async_iput(inode);
1134 		goto random;
1135 	}
1136 	mds = cap->session->s_mds;
1137 	dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1138 	     inode, ceph_vinop(inode), mds,
1139 	     cap == ci->i_auth_cap ? "auth " : "", cap);
1140 	spin_unlock(&ci->i_ceph_lock);
1141 out:
1142 	/* avoid calling iput_final() while holding mdsc->mutex or
1143 	 * in mds dispatch threads */
1144 	ceph_async_iput(inode);
1145 	return mds;
1146 
1147 random:
1148 	if (random)
1149 		*random = true;
1150 
1151 	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1152 	dout("%s chose random mds%d\n", __func__, mds);
1153 	return mds;
1154 }
1155 
1156 
1157 /*
1158  * session messages
1159  */
1160 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
1161 {
1162 	struct ceph_msg *msg;
1163 	struct ceph_mds_session_head *h;
1164 
1165 	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1166 			   false);
1167 	if (!msg) {
1168 		pr_err("create_session_msg ENOMEM creating msg\n");
1169 		return NULL;
1170 	}
1171 	h = msg->front.iov_base;
1172 	h->op = cpu_to_le32(op);
1173 	h->seq = cpu_to_le64(seq);
1174 
1175 	return msg;
1176 }
1177 
1178 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1179 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1180 static int encode_supported_features(void **p, void *end)
1181 {
1182 	static const size_t count = ARRAY_SIZE(feature_bits);
1183 
1184 	if (count > 0) {
1185 		size_t i;
1186 		size_t size = FEATURE_BYTES(count);
1187 
1188 		if (WARN_ON_ONCE(*p + 4 + size > end))
1189 			return -ERANGE;
1190 
1191 		ceph_encode_32(p, size);
1192 		memset(*p, 0, size);
1193 		for (i = 0; i < count; i++)
1194 			((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8);
1195 		*p += size;
1196 	} else {
1197 		if (WARN_ON_ONCE(*p + 4 > end))
1198 			return -ERANGE;
1199 
1200 		ceph_encode_32(p, 0);
1201 	}
1202 
1203 	return 0;
1204 }
1205 
1206 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1207 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1208 static int encode_metric_spec(void **p, void *end)
1209 {
1210 	static const size_t count = ARRAY_SIZE(metric_bits);
1211 
1212 	/* header */
1213 	if (WARN_ON_ONCE(*p + 2 > end))
1214 		return -ERANGE;
1215 
1216 	ceph_encode_8(p, 1); /* version */
1217 	ceph_encode_8(p, 1); /* compat */
1218 
1219 	if (count > 0) {
1220 		size_t i;
1221 		size_t size = METRIC_BYTES(count);
1222 
1223 		if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1224 			return -ERANGE;
1225 
1226 		/* metric spec info length */
1227 		ceph_encode_32(p, 4 + size);
1228 
1229 		/* metric spec */
1230 		ceph_encode_32(p, size);
1231 		memset(*p, 0, size);
1232 		for (i = 0; i < count; i++)
1233 			((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1234 		*p += size;
1235 	} else {
1236 		if (WARN_ON_ONCE(*p + 4 + 4 > end))
1237 			return -ERANGE;
1238 
1239 		/* metric spec info length */
1240 		ceph_encode_32(p, 4);
1241 		/* metric spec */
1242 		ceph_encode_32(p, 0);
1243 	}
1244 
1245 	return 0;
1246 }
1247 
1248 /*
1249  * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1250  * to include additional client metadata fields.
1251  */
1252 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1253 {
1254 	struct ceph_msg *msg;
1255 	struct ceph_mds_session_head *h;
1256 	int i;
1257 	int extra_bytes = 0;
1258 	int metadata_key_count = 0;
1259 	struct ceph_options *opt = mdsc->fsc->client->options;
1260 	struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1261 	size_t size, count;
1262 	void *p, *end;
1263 	int ret;
1264 
1265 	const char* metadata[][2] = {
1266 		{"hostname", mdsc->nodename},
1267 		{"kernel_version", init_utsname()->release},
1268 		{"entity_id", opt->name ? : ""},
1269 		{"root", fsopt->server_path ? : "/"},
1270 		{NULL, NULL}
1271 	};
1272 
1273 	/* Calculate serialized length of metadata */
1274 	extra_bytes = 4;  /* map length */
1275 	for (i = 0; metadata[i][0]; ++i) {
1276 		extra_bytes += 8 + strlen(metadata[i][0]) +
1277 			strlen(metadata[i][1]);
1278 		metadata_key_count++;
1279 	}
1280 
1281 	/* supported feature */
1282 	size = 0;
1283 	count = ARRAY_SIZE(feature_bits);
1284 	if (count > 0)
1285 		size = FEATURE_BYTES(count);
1286 	extra_bytes += 4 + size;
1287 
1288 	/* metric spec */
1289 	size = 0;
1290 	count = ARRAY_SIZE(metric_bits);
1291 	if (count > 0)
1292 		size = METRIC_BYTES(count);
1293 	extra_bytes += 2 + 4 + 4 + size;
1294 
1295 	/* Allocate the message */
1296 	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1297 			   GFP_NOFS, false);
1298 	if (!msg) {
1299 		pr_err("create_session_msg ENOMEM creating msg\n");
1300 		return ERR_PTR(-ENOMEM);
1301 	}
1302 	p = msg->front.iov_base;
1303 	end = p + msg->front.iov_len;
1304 
1305 	h = p;
1306 	h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1307 	h->seq = cpu_to_le64(seq);
1308 
1309 	/*
1310 	 * Serialize client metadata into waiting buffer space, using
1311 	 * the format that userspace expects for map<string, string>
1312 	 *
1313 	 * ClientSession messages with metadata are v4
1314 	 */
1315 	msg->hdr.version = cpu_to_le16(4);
1316 	msg->hdr.compat_version = cpu_to_le16(1);
1317 
1318 	/* The write pointer, following the session_head structure */
1319 	p += sizeof(*h);
1320 
1321 	/* Number of entries in the map */
1322 	ceph_encode_32(&p, metadata_key_count);
1323 
1324 	/* Two length-prefixed strings for each entry in the map */
1325 	for (i = 0; metadata[i][0]; ++i) {
1326 		size_t const key_len = strlen(metadata[i][0]);
1327 		size_t const val_len = strlen(metadata[i][1]);
1328 
1329 		ceph_encode_32(&p, key_len);
1330 		memcpy(p, metadata[i][0], key_len);
1331 		p += key_len;
1332 		ceph_encode_32(&p, val_len);
1333 		memcpy(p, metadata[i][1], val_len);
1334 		p += val_len;
1335 	}
1336 
1337 	ret = encode_supported_features(&p, end);
1338 	if (ret) {
1339 		pr_err("encode_supported_features failed!\n");
1340 		ceph_msg_put(msg);
1341 		return ERR_PTR(ret);
1342 	}
1343 
1344 	ret = encode_metric_spec(&p, end);
1345 	if (ret) {
1346 		pr_err("encode_metric_spec failed!\n");
1347 		ceph_msg_put(msg);
1348 		return ERR_PTR(ret);
1349 	}
1350 
1351 	msg->front.iov_len = p - msg->front.iov_base;
1352 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1353 
1354 	return msg;
1355 }
1356 
1357 /*
1358  * send session open request.
1359  *
1360  * called under mdsc->mutex
1361  */
1362 static int __open_session(struct ceph_mds_client *mdsc,
1363 			  struct ceph_mds_session *session)
1364 {
1365 	struct ceph_msg *msg;
1366 	int mstate;
1367 	int mds = session->s_mds;
1368 
1369 	/* wait for mds to go active? */
1370 	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1371 	dout("open_session to mds%d (%s)\n", mds,
1372 	     ceph_mds_state_name(mstate));
1373 	session->s_state = CEPH_MDS_SESSION_OPENING;
1374 	session->s_renew_requested = jiffies;
1375 
1376 	/* send connect message */
1377 	msg = create_session_open_msg(mdsc, session->s_seq);
1378 	if (IS_ERR(msg))
1379 		return PTR_ERR(msg);
1380 	ceph_con_send(&session->s_con, msg);
1381 	return 0;
1382 }
1383 
1384 /*
1385  * open sessions for any export targets for the given mds
1386  *
1387  * called under mdsc->mutex
1388  */
1389 static struct ceph_mds_session *
1390 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1391 {
1392 	struct ceph_mds_session *session;
1393 	int ret;
1394 
1395 	session = __ceph_lookup_mds_session(mdsc, target);
1396 	if (!session) {
1397 		session = register_session(mdsc, target);
1398 		if (IS_ERR(session))
1399 			return session;
1400 	}
1401 	if (session->s_state == CEPH_MDS_SESSION_NEW ||
1402 	    session->s_state == CEPH_MDS_SESSION_CLOSING) {
1403 		ret = __open_session(mdsc, session);
1404 		if (ret)
1405 			return ERR_PTR(ret);
1406 	}
1407 
1408 	return session;
1409 }
1410 
1411 struct ceph_mds_session *
1412 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1413 {
1414 	struct ceph_mds_session *session;
1415 
1416 	dout("open_export_target_session to mds%d\n", target);
1417 
1418 	mutex_lock(&mdsc->mutex);
1419 	session = __open_export_target_session(mdsc, target);
1420 	mutex_unlock(&mdsc->mutex);
1421 
1422 	return session;
1423 }
1424 
1425 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1426 					  struct ceph_mds_session *session)
1427 {
1428 	struct ceph_mds_info *mi;
1429 	struct ceph_mds_session *ts;
1430 	int i, mds = session->s_mds;
1431 
1432 	if (mds >= mdsc->mdsmap->possible_max_rank)
1433 		return;
1434 
1435 	mi = &mdsc->mdsmap->m_info[mds];
1436 	dout("open_export_target_sessions for mds%d (%d targets)\n",
1437 	     session->s_mds, mi->num_export_targets);
1438 
1439 	for (i = 0; i < mi->num_export_targets; i++) {
1440 		ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1441 		if (!IS_ERR(ts))
1442 			ceph_put_mds_session(ts);
1443 	}
1444 }
1445 
1446 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1447 					   struct ceph_mds_session *session)
1448 {
1449 	mutex_lock(&mdsc->mutex);
1450 	__open_export_target_sessions(mdsc, session);
1451 	mutex_unlock(&mdsc->mutex);
1452 }
1453 
1454 /*
1455  * session caps
1456  */
1457 
1458 static void detach_cap_releases(struct ceph_mds_session *session,
1459 				struct list_head *target)
1460 {
1461 	lockdep_assert_held(&session->s_cap_lock);
1462 
1463 	list_splice_init(&session->s_cap_releases, target);
1464 	session->s_num_cap_releases = 0;
1465 	dout("dispose_cap_releases mds%d\n", session->s_mds);
1466 }
1467 
1468 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1469 				 struct list_head *dispose)
1470 {
1471 	while (!list_empty(dispose)) {
1472 		struct ceph_cap *cap;
1473 		/* zero out the in-progress message */
1474 		cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1475 		list_del(&cap->session_caps);
1476 		ceph_put_cap(mdsc, cap);
1477 	}
1478 }
1479 
1480 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1481 				     struct ceph_mds_session *session)
1482 {
1483 	struct ceph_mds_request *req;
1484 	struct rb_node *p;
1485 	struct ceph_inode_info *ci;
1486 
1487 	dout("cleanup_session_requests mds%d\n", session->s_mds);
1488 	mutex_lock(&mdsc->mutex);
1489 	while (!list_empty(&session->s_unsafe)) {
1490 		req = list_first_entry(&session->s_unsafe,
1491 				       struct ceph_mds_request, r_unsafe_item);
1492 		pr_warn_ratelimited(" dropping unsafe request %llu\n",
1493 				    req->r_tid);
1494 		if (req->r_target_inode) {
1495 			/* dropping unsafe change of inode's attributes */
1496 			ci = ceph_inode(req->r_target_inode);
1497 			errseq_set(&ci->i_meta_err, -EIO);
1498 		}
1499 		if (req->r_unsafe_dir) {
1500 			/* dropping unsafe directory operation */
1501 			ci = ceph_inode(req->r_unsafe_dir);
1502 			errseq_set(&ci->i_meta_err, -EIO);
1503 		}
1504 		__unregister_request(mdsc, req);
1505 	}
1506 	/* zero r_attempts, so kick_requests() will re-send requests */
1507 	p = rb_first(&mdsc->request_tree);
1508 	while (p) {
1509 		req = rb_entry(p, struct ceph_mds_request, r_node);
1510 		p = rb_next(p);
1511 		if (req->r_session &&
1512 		    req->r_session->s_mds == session->s_mds)
1513 			req->r_attempts = 0;
1514 	}
1515 	mutex_unlock(&mdsc->mutex);
1516 }
1517 
1518 /*
1519  * Helper to safely iterate over all caps associated with a session, with
1520  * special care taken to handle a racing __ceph_remove_cap().
1521  *
1522  * Caller must hold session s_mutex.
1523  */
1524 int ceph_iterate_session_caps(struct ceph_mds_session *session,
1525 			      int (*cb)(struct inode *, struct ceph_cap *,
1526 					void *), void *arg)
1527 {
1528 	struct list_head *p;
1529 	struct ceph_cap *cap;
1530 	struct inode *inode, *last_inode = NULL;
1531 	struct ceph_cap *old_cap = NULL;
1532 	int ret;
1533 
1534 	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1535 	spin_lock(&session->s_cap_lock);
1536 	p = session->s_caps.next;
1537 	while (p != &session->s_caps) {
1538 		cap = list_entry(p, struct ceph_cap, session_caps);
1539 		inode = igrab(&cap->ci->vfs_inode);
1540 		if (!inode) {
1541 			p = p->next;
1542 			continue;
1543 		}
1544 		session->s_cap_iterator = cap;
1545 		spin_unlock(&session->s_cap_lock);
1546 
1547 		if (last_inode) {
1548 			/* avoid calling iput_final() while holding
1549 			 * s_mutex or in mds dispatch threads */
1550 			ceph_async_iput(last_inode);
1551 			last_inode = NULL;
1552 		}
1553 		if (old_cap) {
1554 			ceph_put_cap(session->s_mdsc, old_cap);
1555 			old_cap = NULL;
1556 		}
1557 
1558 		ret = cb(inode, cap, arg);
1559 		last_inode = inode;
1560 
1561 		spin_lock(&session->s_cap_lock);
1562 		p = p->next;
1563 		if (!cap->ci) {
1564 			dout("iterate_session_caps  finishing cap %p removal\n",
1565 			     cap);
1566 			BUG_ON(cap->session != session);
1567 			cap->session = NULL;
1568 			list_del_init(&cap->session_caps);
1569 			session->s_nr_caps--;
1570 			atomic64_dec(&session->s_mdsc->metric.total_caps);
1571 			if (cap->queue_release)
1572 				__ceph_queue_cap_release(session, cap);
1573 			else
1574 				old_cap = cap;  /* put_cap it w/o locks held */
1575 		}
1576 		if (ret < 0)
1577 			goto out;
1578 	}
1579 	ret = 0;
1580 out:
1581 	session->s_cap_iterator = NULL;
1582 	spin_unlock(&session->s_cap_lock);
1583 
1584 	ceph_async_iput(last_inode);
1585 	if (old_cap)
1586 		ceph_put_cap(session->s_mdsc, old_cap);
1587 
1588 	return ret;
1589 }
1590 
1591 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1592 				  void *arg)
1593 {
1594 	struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1595 	struct ceph_inode_info *ci = ceph_inode(inode);
1596 	LIST_HEAD(to_remove);
1597 	bool dirty_dropped = false;
1598 	bool invalidate = false;
1599 
1600 	dout("removing cap %p, ci is %p, inode is %p\n",
1601 	     cap, ci, &ci->vfs_inode);
1602 	spin_lock(&ci->i_ceph_lock);
1603 	__ceph_remove_cap(cap, false);
1604 	if (!ci->i_auth_cap) {
1605 		struct ceph_cap_flush *cf;
1606 		struct ceph_mds_client *mdsc = fsc->mdsc;
1607 
1608 		if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
1609 			if (inode->i_data.nrpages > 0)
1610 				invalidate = true;
1611 			if (ci->i_wrbuffer_ref > 0)
1612 				mapping_set_error(&inode->i_data, -EIO);
1613 		}
1614 
1615 		while (!list_empty(&ci->i_cap_flush_list)) {
1616 			cf = list_first_entry(&ci->i_cap_flush_list,
1617 					      struct ceph_cap_flush, i_list);
1618 			list_move(&cf->i_list, &to_remove);
1619 		}
1620 
1621 		spin_lock(&mdsc->cap_dirty_lock);
1622 
1623 		list_for_each_entry(cf, &to_remove, i_list)
1624 			list_del(&cf->g_list);
1625 
1626 		if (!list_empty(&ci->i_dirty_item)) {
1627 			pr_warn_ratelimited(
1628 				" dropping dirty %s state for %p %lld\n",
1629 				ceph_cap_string(ci->i_dirty_caps),
1630 				inode, ceph_ino(inode));
1631 			ci->i_dirty_caps = 0;
1632 			list_del_init(&ci->i_dirty_item);
1633 			dirty_dropped = true;
1634 		}
1635 		if (!list_empty(&ci->i_flushing_item)) {
1636 			pr_warn_ratelimited(
1637 				" dropping dirty+flushing %s state for %p %lld\n",
1638 				ceph_cap_string(ci->i_flushing_caps),
1639 				inode, ceph_ino(inode));
1640 			ci->i_flushing_caps = 0;
1641 			list_del_init(&ci->i_flushing_item);
1642 			mdsc->num_cap_flushing--;
1643 			dirty_dropped = true;
1644 		}
1645 		spin_unlock(&mdsc->cap_dirty_lock);
1646 
1647 		if (dirty_dropped) {
1648 			errseq_set(&ci->i_meta_err, -EIO);
1649 
1650 			if (ci->i_wrbuffer_ref_head == 0 &&
1651 			    ci->i_wr_ref == 0 &&
1652 			    ci->i_dirty_caps == 0 &&
1653 			    ci->i_flushing_caps == 0) {
1654 				ceph_put_snap_context(ci->i_head_snapc);
1655 				ci->i_head_snapc = NULL;
1656 			}
1657 		}
1658 
1659 		if (atomic_read(&ci->i_filelock_ref) > 0) {
1660 			/* make further file lock syscall return -EIO */
1661 			ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1662 			pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1663 					    inode, ceph_ino(inode));
1664 		}
1665 
1666 		if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1667 			list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1668 			ci->i_prealloc_cap_flush = NULL;
1669 		}
1670 	}
1671 	spin_unlock(&ci->i_ceph_lock);
1672 	while (!list_empty(&to_remove)) {
1673 		struct ceph_cap_flush *cf;
1674 		cf = list_first_entry(&to_remove,
1675 				      struct ceph_cap_flush, i_list);
1676 		list_del(&cf->i_list);
1677 		ceph_free_cap_flush(cf);
1678 	}
1679 
1680 	wake_up_all(&ci->i_cap_wq);
1681 	if (invalidate)
1682 		ceph_queue_invalidate(inode);
1683 	if (dirty_dropped)
1684 		iput(inode);
1685 	return 0;
1686 }
1687 
1688 /*
1689  * caller must hold session s_mutex
1690  */
1691 static void remove_session_caps(struct ceph_mds_session *session)
1692 {
1693 	struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1694 	struct super_block *sb = fsc->sb;
1695 	LIST_HEAD(dispose);
1696 
1697 	dout("remove_session_caps on %p\n", session);
1698 	ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1699 
1700 	wake_up_all(&fsc->mdsc->cap_flushing_wq);
1701 
1702 	spin_lock(&session->s_cap_lock);
1703 	if (session->s_nr_caps > 0) {
1704 		struct inode *inode;
1705 		struct ceph_cap *cap, *prev = NULL;
1706 		struct ceph_vino vino;
1707 		/*
1708 		 * iterate_session_caps() skips inodes that are being
1709 		 * deleted, we need to wait until deletions are complete.
1710 		 * __wait_on_freeing_inode() is designed for the job,
1711 		 * but it is not exported, so use lookup inode function
1712 		 * to access it.
1713 		 */
1714 		while (!list_empty(&session->s_caps)) {
1715 			cap = list_entry(session->s_caps.next,
1716 					 struct ceph_cap, session_caps);
1717 			if (cap == prev)
1718 				break;
1719 			prev = cap;
1720 			vino = cap->ci->i_vino;
1721 			spin_unlock(&session->s_cap_lock);
1722 
1723 			inode = ceph_find_inode(sb, vino);
1724 			 /* avoid calling iput_final() while holding s_mutex */
1725 			ceph_async_iput(inode);
1726 
1727 			spin_lock(&session->s_cap_lock);
1728 		}
1729 	}
1730 
1731 	// drop cap expires and unlock s_cap_lock
1732 	detach_cap_releases(session, &dispose);
1733 
1734 	BUG_ON(session->s_nr_caps > 0);
1735 	BUG_ON(!list_empty(&session->s_cap_flushing));
1736 	spin_unlock(&session->s_cap_lock);
1737 	dispose_cap_releases(session->s_mdsc, &dispose);
1738 }
1739 
1740 enum {
1741 	RECONNECT,
1742 	RENEWCAPS,
1743 	FORCE_RO,
1744 };
1745 
1746 /*
1747  * wake up any threads waiting on this session's caps.  if the cap is
1748  * old (didn't get renewed on the client reconnect), remove it now.
1749  *
1750  * caller must hold s_mutex.
1751  */
1752 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1753 			      void *arg)
1754 {
1755 	struct ceph_inode_info *ci = ceph_inode(inode);
1756 	unsigned long ev = (unsigned long)arg;
1757 
1758 	if (ev == RECONNECT) {
1759 		spin_lock(&ci->i_ceph_lock);
1760 		ci->i_wanted_max_size = 0;
1761 		ci->i_requested_max_size = 0;
1762 		spin_unlock(&ci->i_ceph_lock);
1763 	} else if (ev == RENEWCAPS) {
1764 		if (cap->cap_gen < cap->session->s_cap_gen) {
1765 			/* mds did not re-issue stale cap */
1766 			spin_lock(&ci->i_ceph_lock);
1767 			cap->issued = cap->implemented = CEPH_CAP_PIN;
1768 			spin_unlock(&ci->i_ceph_lock);
1769 		}
1770 	} else if (ev == FORCE_RO) {
1771 	}
1772 	wake_up_all(&ci->i_cap_wq);
1773 	return 0;
1774 }
1775 
1776 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1777 {
1778 	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1779 	ceph_iterate_session_caps(session, wake_up_session_cb,
1780 				  (void *)(unsigned long)ev);
1781 }
1782 
1783 /*
1784  * Send periodic message to MDS renewing all currently held caps.  The
1785  * ack will reset the expiration for all caps from this session.
1786  *
1787  * caller holds s_mutex
1788  */
1789 static int send_renew_caps(struct ceph_mds_client *mdsc,
1790 			   struct ceph_mds_session *session)
1791 {
1792 	struct ceph_msg *msg;
1793 	int state;
1794 
1795 	if (time_after_eq(jiffies, session->s_cap_ttl) &&
1796 	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1797 		pr_info("mds%d caps stale\n", session->s_mds);
1798 	session->s_renew_requested = jiffies;
1799 
1800 	/* do not try to renew caps until a recovering mds has reconnected
1801 	 * with its clients. */
1802 	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1803 	if (state < CEPH_MDS_STATE_RECONNECT) {
1804 		dout("send_renew_caps ignoring mds%d (%s)\n",
1805 		     session->s_mds, ceph_mds_state_name(state));
1806 		return 0;
1807 	}
1808 
1809 	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1810 		ceph_mds_state_name(state));
1811 	msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1812 				 ++session->s_renew_seq);
1813 	if (!msg)
1814 		return -ENOMEM;
1815 	ceph_con_send(&session->s_con, msg);
1816 	return 0;
1817 }
1818 
1819 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1820 			     struct ceph_mds_session *session, u64 seq)
1821 {
1822 	struct ceph_msg *msg;
1823 
1824 	dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1825 	     session->s_mds, ceph_session_state_name(session->s_state), seq);
1826 	msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1827 	if (!msg)
1828 		return -ENOMEM;
1829 	ceph_con_send(&session->s_con, msg);
1830 	return 0;
1831 }
1832 
1833 
1834 /*
1835  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1836  *
1837  * Called under session->s_mutex
1838  */
1839 static void renewed_caps(struct ceph_mds_client *mdsc,
1840 			 struct ceph_mds_session *session, int is_renew)
1841 {
1842 	int was_stale;
1843 	int wake = 0;
1844 
1845 	spin_lock(&session->s_cap_lock);
1846 	was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1847 
1848 	session->s_cap_ttl = session->s_renew_requested +
1849 		mdsc->mdsmap->m_session_timeout*HZ;
1850 
1851 	if (was_stale) {
1852 		if (time_before(jiffies, session->s_cap_ttl)) {
1853 			pr_info("mds%d caps renewed\n", session->s_mds);
1854 			wake = 1;
1855 		} else {
1856 			pr_info("mds%d caps still stale\n", session->s_mds);
1857 		}
1858 	}
1859 	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1860 	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1861 	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1862 	spin_unlock(&session->s_cap_lock);
1863 
1864 	if (wake)
1865 		wake_up_session_caps(session, RENEWCAPS);
1866 }
1867 
1868 /*
1869  * send a session close request
1870  */
1871 static int request_close_session(struct ceph_mds_session *session)
1872 {
1873 	struct ceph_msg *msg;
1874 
1875 	dout("request_close_session mds%d state %s seq %lld\n",
1876 	     session->s_mds, ceph_session_state_name(session->s_state),
1877 	     session->s_seq);
1878 	msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1879 	if (!msg)
1880 		return -ENOMEM;
1881 	ceph_con_send(&session->s_con, msg);
1882 	return 1;
1883 }
1884 
1885 /*
1886  * Called with s_mutex held.
1887  */
1888 static int __close_session(struct ceph_mds_client *mdsc,
1889 			 struct ceph_mds_session *session)
1890 {
1891 	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1892 		return 0;
1893 	session->s_state = CEPH_MDS_SESSION_CLOSING;
1894 	return request_close_session(session);
1895 }
1896 
1897 static bool drop_negative_children(struct dentry *dentry)
1898 {
1899 	struct dentry *child;
1900 	bool all_negative = true;
1901 
1902 	if (!d_is_dir(dentry))
1903 		goto out;
1904 
1905 	spin_lock(&dentry->d_lock);
1906 	list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1907 		if (d_really_is_positive(child)) {
1908 			all_negative = false;
1909 			break;
1910 		}
1911 	}
1912 	spin_unlock(&dentry->d_lock);
1913 
1914 	if (all_negative)
1915 		shrink_dcache_parent(dentry);
1916 out:
1917 	return all_negative;
1918 }
1919 
1920 /*
1921  * Trim old(er) caps.
1922  *
1923  * Because we can't cache an inode without one or more caps, we do
1924  * this indirectly: if a cap is unused, we prune its aliases, at which
1925  * point the inode will hopefully get dropped to.
1926  *
1927  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1928  * memory pressure from the MDS, though, so it needn't be perfect.
1929  */
1930 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1931 {
1932 	int *remaining = arg;
1933 	struct ceph_inode_info *ci = ceph_inode(inode);
1934 	int used, wanted, oissued, mine;
1935 
1936 	if (*remaining <= 0)
1937 		return -1;
1938 
1939 	spin_lock(&ci->i_ceph_lock);
1940 	mine = cap->issued | cap->implemented;
1941 	used = __ceph_caps_used(ci);
1942 	wanted = __ceph_caps_file_wanted(ci);
1943 	oissued = __ceph_caps_issued_other(ci, cap);
1944 
1945 	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
1946 	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1947 	     ceph_cap_string(used), ceph_cap_string(wanted));
1948 	if (cap == ci->i_auth_cap) {
1949 		if (ci->i_dirty_caps || ci->i_flushing_caps ||
1950 		    !list_empty(&ci->i_cap_snaps))
1951 			goto out;
1952 		if ((used | wanted) & CEPH_CAP_ANY_WR)
1953 			goto out;
1954 		/* Note: it's possible that i_filelock_ref becomes non-zero
1955 		 * after dropping auth caps. It doesn't hurt because reply
1956 		 * of lock mds request will re-add auth caps. */
1957 		if (atomic_read(&ci->i_filelock_ref) > 0)
1958 			goto out;
1959 	}
1960 	/* The inode has cached pages, but it's no longer used.
1961 	 * we can safely drop it */
1962 	if (S_ISREG(inode->i_mode) &&
1963 	    wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
1964 	    !(oissued & CEPH_CAP_FILE_CACHE)) {
1965 	  used = 0;
1966 	  oissued = 0;
1967 	}
1968 	if ((used | wanted) & ~oissued & mine)
1969 		goto out;   /* we need these caps */
1970 
1971 	if (oissued) {
1972 		/* we aren't the only cap.. just remove us */
1973 		__ceph_remove_cap(cap, true);
1974 		(*remaining)--;
1975 	} else {
1976 		struct dentry *dentry;
1977 		/* try dropping referring dentries */
1978 		spin_unlock(&ci->i_ceph_lock);
1979 		dentry = d_find_any_alias(inode);
1980 		if (dentry && drop_negative_children(dentry)) {
1981 			int count;
1982 			dput(dentry);
1983 			d_prune_aliases(inode);
1984 			count = atomic_read(&inode->i_count);
1985 			if (count == 1)
1986 				(*remaining)--;
1987 			dout("trim_caps_cb %p cap %p pruned, count now %d\n",
1988 			     inode, cap, count);
1989 		} else {
1990 			dput(dentry);
1991 		}
1992 		return 0;
1993 	}
1994 
1995 out:
1996 	spin_unlock(&ci->i_ceph_lock);
1997 	return 0;
1998 }
1999 
2000 /*
2001  * Trim session cap count down to some max number.
2002  */
2003 int ceph_trim_caps(struct ceph_mds_client *mdsc,
2004 		   struct ceph_mds_session *session,
2005 		   int max_caps)
2006 {
2007 	int trim_caps = session->s_nr_caps - max_caps;
2008 
2009 	dout("trim_caps mds%d start: %d / %d, trim %d\n",
2010 	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2011 	if (trim_caps > 0) {
2012 		int remaining = trim_caps;
2013 
2014 		ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2015 		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2016 		     session->s_mds, session->s_nr_caps, max_caps,
2017 			trim_caps - remaining);
2018 	}
2019 
2020 	ceph_flush_cap_releases(mdsc, session);
2021 	return 0;
2022 }
2023 
2024 static int check_caps_flush(struct ceph_mds_client *mdsc,
2025 			    u64 want_flush_tid)
2026 {
2027 	int ret = 1;
2028 
2029 	spin_lock(&mdsc->cap_dirty_lock);
2030 	if (!list_empty(&mdsc->cap_flush_list)) {
2031 		struct ceph_cap_flush *cf =
2032 			list_first_entry(&mdsc->cap_flush_list,
2033 					 struct ceph_cap_flush, g_list);
2034 		if (cf->tid <= want_flush_tid) {
2035 			dout("check_caps_flush still flushing tid "
2036 			     "%llu <= %llu\n", cf->tid, want_flush_tid);
2037 			ret = 0;
2038 		}
2039 	}
2040 	spin_unlock(&mdsc->cap_dirty_lock);
2041 	return ret;
2042 }
2043 
2044 /*
2045  * flush all dirty inode data to disk.
2046  *
2047  * returns true if we've flushed through want_flush_tid
2048  */
2049 static void wait_caps_flush(struct ceph_mds_client *mdsc,
2050 			    u64 want_flush_tid)
2051 {
2052 	dout("check_caps_flush want %llu\n", want_flush_tid);
2053 
2054 	wait_event(mdsc->cap_flushing_wq,
2055 		   check_caps_flush(mdsc, want_flush_tid));
2056 
2057 	dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2058 }
2059 
2060 /*
2061  * called under s_mutex
2062  */
2063 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2064 				   struct ceph_mds_session *session)
2065 {
2066 	struct ceph_msg *msg = NULL;
2067 	struct ceph_mds_cap_release *head;
2068 	struct ceph_mds_cap_item *item;
2069 	struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2070 	struct ceph_cap *cap;
2071 	LIST_HEAD(tmp_list);
2072 	int num_cap_releases;
2073 	__le32	barrier, *cap_barrier;
2074 
2075 	down_read(&osdc->lock);
2076 	barrier = cpu_to_le32(osdc->epoch_barrier);
2077 	up_read(&osdc->lock);
2078 
2079 	spin_lock(&session->s_cap_lock);
2080 again:
2081 	list_splice_init(&session->s_cap_releases, &tmp_list);
2082 	num_cap_releases = session->s_num_cap_releases;
2083 	session->s_num_cap_releases = 0;
2084 	spin_unlock(&session->s_cap_lock);
2085 
2086 	while (!list_empty(&tmp_list)) {
2087 		if (!msg) {
2088 			msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2089 					PAGE_SIZE, GFP_NOFS, false);
2090 			if (!msg)
2091 				goto out_err;
2092 			head = msg->front.iov_base;
2093 			head->num = cpu_to_le32(0);
2094 			msg->front.iov_len = sizeof(*head);
2095 
2096 			msg->hdr.version = cpu_to_le16(2);
2097 			msg->hdr.compat_version = cpu_to_le16(1);
2098 		}
2099 
2100 		cap = list_first_entry(&tmp_list, struct ceph_cap,
2101 					session_caps);
2102 		list_del(&cap->session_caps);
2103 		num_cap_releases--;
2104 
2105 		head = msg->front.iov_base;
2106 		put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2107 				   &head->num);
2108 		item = msg->front.iov_base + msg->front.iov_len;
2109 		item->ino = cpu_to_le64(cap->cap_ino);
2110 		item->cap_id = cpu_to_le64(cap->cap_id);
2111 		item->migrate_seq = cpu_to_le32(cap->mseq);
2112 		item->seq = cpu_to_le32(cap->issue_seq);
2113 		msg->front.iov_len += sizeof(*item);
2114 
2115 		ceph_put_cap(mdsc, cap);
2116 
2117 		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2118 			// Append cap_barrier field
2119 			cap_barrier = msg->front.iov_base + msg->front.iov_len;
2120 			*cap_barrier = barrier;
2121 			msg->front.iov_len += sizeof(*cap_barrier);
2122 
2123 			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2124 			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2125 			ceph_con_send(&session->s_con, msg);
2126 			msg = NULL;
2127 		}
2128 	}
2129 
2130 	BUG_ON(num_cap_releases != 0);
2131 
2132 	spin_lock(&session->s_cap_lock);
2133 	if (!list_empty(&session->s_cap_releases))
2134 		goto again;
2135 	spin_unlock(&session->s_cap_lock);
2136 
2137 	if (msg) {
2138 		// Append cap_barrier field
2139 		cap_barrier = msg->front.iov_base + msg->front.iov_len;
2140 		*cap_barrier = barrier;
2141 		msg->front.iov_len += sizeof(*cap_barrier);
2142 
2143 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2144 		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2145 		ceph_con_send(&session->s_con, msg);
2146 	}
2147 	return;
2148 out_err:
2149 	pr_err("send_cap_releases mds%d, failed to allocate message\n",
2150 		session->s_mds);
2151 	spin_lock(&session->s_cap_lock);
2152 	list_splice(&tmp_list, &session->s_cap_releases);
2153 	session->s_num_cap_releases += num_cap_releases;
2154 	spin_unlock(&session->s_cap_lock);
2155 }
2156 
2157 static void ceph_cap_release_work(struct work_struct *work)
2158 {
2159 	struct ceph_mds_session *session =
2160 		container_of(work, struct ceph_mds_session, s_cap_release_work);
2161 
2162 	mutex_lock(&session->s_mutex);
2163 	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2164 	    session->s_state == CEPH_MDS_SESSION_HUNG)
2165 		ceph_send_cap_releases(session->s_mdsc, session);
2166 	mutex_unlock(&session->s_mutex);
2167 	ceph_put_mds_session(session);
2168 }
2169 
2170 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2171 		             struct ceph_mds_session *session)
2172 {
2173 	if (mdsc->stopping)
2174 		return;
2175 
2176 	ceph_get_mds_session(session);
2177 	if (queue_work(mdsc->fsc->cap_wq,
2178 		       &session->s_cap_release_work)) {
2179 		dout("cap release work queued\n");
2180 	} else {
2181 		ceph_put_mds_session(session);
2182 		dout("failed to queue cap release work\n");
2183 	}
2184 }
2185 
2186 /*
2187  * caller holds session->s_cap_lock
2188  */
2189 void __ceph_queue_cap_release(struct ceph_mds_session *session,
2190 			      struct ceph_cap *cap)
2191 {
2192 	list_add_tail(&cap->session_caps, &session->s_cap_releases);
2193 	session->s_num_cap_releases++;
2194 
2195 	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2196 		ceph_flush_cap_releases(session->s_mdsc, session);
2197 }
2198 
2199 static void ceph_cap_reclaim_work(struct work_struct *work)
2200 {
2201 	struct ceph_mds_client *mdsc =
2202 		container_of(work, struct ceph_mds_client, cap_reclaim_work);
2203 	int ret = ceph_trim_dentries(mdsc);
2204 	if (ret == -EAGAIN)
2205 		ceph_queue_cap_reclaim_work(mdsc);
2206 }
2207 
2208 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2209 {
2210 	if (mdsc->stopping)
2211 		return;
2212 
2213         if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2214                 dout("caps reclaim work queued\n");
2215         } else {
2216                 dout("failed to queue caps release work\n");
2217         }
2218 }
2219 
2220 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2221 {
2222 	int val;
2223 	if (!nr)
2224 		return;
2225 	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2226 	if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2227 		atomic_set(&mdsc->cap_reclaim_pending, 0);
2228 		ceph_queue_cap_reclaim_work(mdsc);
2229 	}
2230 }
2231 
2232 /*
2233  * requests
2234  */
2235 
2236 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2237 				    struct inode *dir)
2238 {
2239 	struct ceph_inode_info *ci = ceph_inode(dir);
2240 	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2241 	struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2242 	size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2243 	unsigned int num_entries;
2244 	int order;
2245 
2246 	spin_lock(&ci->i_ceph_lock);
2247 	num_entries = ci->i_files + ci->i_subdirs;
2248 	spin_unlock(&ci->i_ceph_lock);
2249 	num_entries = max(num_entries, 1U);
2250 	num_entries = min(num_entries, opt->max_readdir);
2251 
2252 	order = get_order(size * num_entries);
2253 	while (order >= 0) {
2254 		rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2255 							     __GFP_NOWARN,
2256 							     order);
2257 		if (rinfo->dir_entries)
2258 			break;
2259 		order--;
2260 	}
2261 	if (!rinfo->dir_entries)
2262 		return -ENOMEM;
2263 
2264 	num_entries = (PAGE_SIZE << order) / size;
2265 	num_entries = min(num_entries, opt->max_readdir);
2266 
2267 	rinfo->dir_buf_size = PAGE_SIZE << order;
2268 	req->r_num_caps = num_entries + 1;
2269 	req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2270 	req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2271 	return 0;
2272 }
2273 
2274 /*
2275  * Create an mds request.
2276  */
2277 struct ceph_mds_request *
2278 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2279 {
2280 	struct ceph_mds_request *req;
2281 
2282 	req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2283 	if (!req)
2284 		return ERR_PTR(-ENOMEM);
2285 
2286 	mutex_init(&req->r_fill_mutex);
2287 	req->r_mdsc = mdsc;
2288 	req->r_started = jiffies;
2289 	req->r_start_latency = ktime_get();
2290 	req->r_resend_mds = -1;
2291 	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2292 	INIT_LIST_HEAD(&req->r_unsafe_target_item);
2293 	req->r_fmode = -1;
2294 	kref_init(&req->r_kref);
2295 	RB_CLEAR_NODE(&req->r_node);
2296 	INIT_LIST_HEAD(&req->r_wait);
2297 	init_completion(&req->r_completion);
2298 	init_completion(&req->r_safe_completion);
2299 	INIT_LIST_HEAD(&req->r_unsafe_item);
2300 
2301 	ktime_get_coarse_real_ts64(&req->r_stamp);
2302 
2303 	req->r_op = op;
2304 	req->r_direct_mode = mode;
2305 	return req;
2306 }
2307 
2308 /*
2309  * return oldest (lowest) request, tid in request tree, 0 if none.
2310  *
2311  * called under mdsc->mutex.
2312  */
2313 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2314 {
2315 	if (RB_EMPTY_ROOT(&mdsc->request_tree))
2316 		return NULL;
2317 	return rb_entry(rb_first(&mdsc->request_tree),
2318 			struct ceph_mds_request, r_node);
2319 }
2320 
2321 static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2322 {
2323 	return mdsc->oldest_tid;
2324 }
2325 
2326 /*
2327  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
2328  * on build_path_from_dentry in fs/cifs/dir.c.
2329  *
2330  * If @stop_on_nosnap, generate path relative to the first non-snapped
2331  * inode.
2332  *
2333  * Encode hidden .snap dirs as a double /, i.e.
2334  *   foo/.snap/bar -> foo//bar
2335  */
2336 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2337 			   int stop_on_nosnap)
2338 {
2339 	struct dentry *temp;
2340 	char *path;
2341 	int pos;
2342 	unsigned seq;
2343 	u64 base;
2344 
2345 	if (!dentry)
2346 		return ERR_PTR(-EINVAL);
2347 
2348 	path = __getname();
2349 	if (!path)
2350 		return ERR_PTR(-ENOMEM);
2351 retry:
2352 	pos = PATH_MAX - 1;
2353 	path[pos] = '\0';
2354 
2355 	seq = read_seqbegin(&rename_lock);
2356 	rcu_read_lock();
2357 	temp = dentry;
2358 	for (;;) {
2359 		struct inode *inode;
2360 
2361 		spin_lock(&temp->d_lock);
2362 		inode = d_inode(temp);
2363 		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2364 			dout("build_path path+%d: %p SNAPDIR\n",
2365 			     pos, temp);
2366 		} else if (stop_on_nosnap && inode && dentry != temp &&
2367 			   ceph_snap(inode) == CEPH_NOSNAP) {
2368 			spin_unlock(&temp->d_lock);
2369 			pos++; /* get rid of any prepended '/' */
2370 			break;
2371 		} else {
2372 			pos -= temp->d_name.len;
2373 			if (pos < 0) {
2374 				spin_unlock(&temp->d_lock);
2375 				break;
2376 			}
2377 			memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2378 		}
2379 		spin_unlock(&temp->d_lock);
2380 		temp = READ_ONCE(temp->d_parent);
2381 
2382 		/* Are we at the root? */
2383 		if (IS_ROOT(temp))
2384 			break;
2385 
2386 		/* Are we out of buffer? */
2387 		if (--pos < 0)
2388 			break;
2389 
2390 		path[pos] = '/';
2391 	}
2392 	base = ceph_ino(d_inode(temp));
2393 	rcu_read_unlock();
2394 
2395 	if (read_seqretry(&rename_lock, seq))
2396 		goto retry;
2397 
2398 	if (pos < 0) {
2399 		/*
2400 		 * A rename didn't occur, but somehow we didn't end up where
2401 		 * we thought we would. Throw a warning and try again.
2402 		 */
2403 		pr_warn("build_path did not end path lookup where "
2404 			"expected, pos is %d\n", pos);
2405 		goto retry;
2406 	}
2407 
2408 	*pbase = base;
2409 	*plen = PATH_MAX - 1 - pos;
2410 	dout("build_path on %p %d built %llx '%.*s'\n",
2411 	     dentry, d_count(dentry), base, *plen, path + pos);
2412 	return path + pos;
2413 }
2414 
2415 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2416 			     const char **ppath, int *ppathlen, u64 *pino,
2417 			     bool *pfreepath, bool parent_locked)
2418 {
2419 	char *path;
2420 
2421 	rcu_read_lock();
2422 	if (!dir)
2423 		dir = d_inode_rcu(dentry->d_parent);
2424 	if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2425 		*pino = ceph_ino(dir);
2426 		rcu_read_unlock();
2427 		*ppath = dentry->d_name.name;
2428 		*ppathlen = dentry->d_name.len;
2429 		return 0;
2430 	}
2431 	rcu_read_unlock();
2432 	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2433 	if (IS_ERR(path))
2434 		return PTR_ERR(path);
2435 	*ppath = path;
2436 	*pfreepath = true;
2437 	return 0;
2438 }
2439 
2440 static int build_inode_path(struct inode *inode,
2441 			    const char **ppath, int *ppathlen, u64 *pino,
2442 			    bool *pfreepath)
2443 {
2444 	struct dentry *dentry;
2445 	char *path;
2446 
2447 	if (ceph_snap(inode) == CEPH_NOSNAP) {
2448 		*pino = ceph_ino(inode);
2449 		*ppathlen = 0;
2450 		return 0;
2451 	}
2452 	dentry = d_find_alias(inode);
2453 	path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2454 	dput(dentry);
2455 	if (IS_ERR(path))
2456 		return PTR_ERR(path);
2457 	*ppath = path;
2458 	*pfreepath = true;
2459 	return 0;
2460 }
2461 
2462 /*
2463  * request arguments may be specified via an inode *, a dentry *, or
2464  * an explicit ino+path.
2465  */
2466 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2467 				  struct inode *rdiri, const char *rpath,
2468 				  u64 rino, const char **ppath, int *pathlen,
2469 				  u64 *ino, bool *freepath, bool parent_locked)
2470 {
2471 	int r = 0;
2472 
2473 	if (rinode) {
2474 		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2475 		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2476 		     ceph_snap(rinode));
2477 	} else if (rdentry) {
2478 		r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2479 					freepath, parent_locked);
2480 		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2481 		     *ppath);
2482 	} else if (rpath || rino) {
2483 		*ino = rino;
2484 		*ppath = rpath;
2485 		*pathlen = rpath ? strlen(rpath) : 0;
2486 		dout(" path %.*s\n", *pathlen, rpath);
2487 	}
2488 
2489 	return r;
2490 }
2491 
2492 static void encode_timestamp_and_gids(void **p,
2493 				      const struct ceph_mds_request *req)
2494 {
2495 	struct ceph_timespec ts;
2496 	int i;
2497 
2498 	ceph_encode_timespec64(&ts, &req->r_stamp);
2499 	ceph_encode_copy(p, &ts, sizeof(ts));
2500 
2501 	/* gid_list */
2502 	ceph_encode_32(p, req->r_cred->group_info->ngroups);
2503 	for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2504 		ceph_encode_64(p, from_kgid(&init_user_ns,
2505 					    req->r_cred->group_info->gid[i]));
2506 }
2507 
2508 /*
2509  * called under mdsc->mutex
2510  */
2511 static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2512 					       struct ceph_mds_request *req,
2513 					       bool drop_cap_releases)
2514 {
2515 	int mds = session->s_mds;
2516 	struct ceph_mds_client *mdsc = session->s_mdsc;
2517 	struct ceph_msg *msg;
2518 	struct ceph_mds_request_head_old *head;
2519 	const char *path1 = NULL;
2520 	const char *path2 = NULL;
2521 	u64 ino1 = 0, ino2 = 0;
2522 	int pathlen1 = 0, pathlen2 = 0;
2523 	bool freepath1 = false, freepath2 = false;
2524 	int len;
2525 	u16 releases;
2526 	void *p, *end;
2527 	int ret;
2528 	bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
2529 
2530 	ret = set_request_path_attr(req->r_inode, req->r_dentry,
2531 			      req->r_parent, req->r_path1, req->r_ino1.ino,
2532 			      &path1, &pathlen1, &ino1, &freepath1,
2533 			      test_bit(CEPH_MDS_R_PARENT_LOCKED,
2534 					&req->r_req_flags));
2535 	if (ret < 0) {
2536 		msg = ERR_PTR(ret);
2537 		goto out;
2538 	}
2539 
2540 	/* If r_old_dentry is set, then assume that its parent is locked */
2541 	ret = set_request_path_attr(NULL, req->r_old_dentry,
2542 			      req->r_old_dentry_dir,
2543 			      req->r_path2, req->r_ino2.ino,
2544 			      &path2, &pathlen2, &ino2, &freepath2, true);
2545 	if (ret < 0) {
2546 		msg = ERR_PTR(ret);
2547 		goto out_free1;
2548 	}
2549 
2550 	len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head);
2551 	len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2552 		sizeof(struct ceph_timespec);
2553 	len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
2554 
2555 	/* calculate (max) length for cap releases */
2556 	len += sizeof(struct ceph_mds_request_release) *
2557 		(!!req->r_inode_drop + !!req->r_dentry_drop +
2558 		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2559 
2560 	if (req->r_dentry_drop)
2561 		len += pathlen1;
2562 	if (req->r_old_dentry_drop)
2563 		len += pathlen2;
2564 
2565 	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2566 	if (!msg) {
2567 		msg = ERR_PTR(-ENOMEM);
2568 		goto out_free2;
2569 	}
2570 
2571 	msg->hdr.tid = cpu_to_le64(req->r_tid);
2572 
2573 	/*
2574 	 * The old ceph_mds_request_head didn't contain a version field, and
2575 	 * one was added when we moved the message version from 3->4.
2576 	 */
2577 	if (legacy) {
2578 		msg->hdr.version = cpu_to_le16(3);
2579 		head = msg->front.iov_base;
2580 		p = msg->front.iov_base + sizeof(*head);
2581 	} else {
2582 		struct ceph_mds_request_head *new_head = msg->front.iov_base;
2583 
2584 		msg->hdr.version = cpu_to_le16(4);
2585 		new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
2586 		head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2587 		p = msg->front.iov_base + sizeof(*new_head);
2588 	}
2589 
2590 	end = msg->front.iov_base + msg->front.iov_len;
2591 
2592 	head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2593 	head->op = cpu_to_le32(req->r_op);
2594 	head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
2595 						 req->r_cred->fsuid));
2596 	head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
2597 						 req->r_cred->fsgid));
2598 	head->ino = cpu_to_le64(req->r_deleg_ino);
2599 	head->args = req->r_args;
2600 
2601 	ceph_encode_filepath(&p, end, ino1, path1);
2602 	ceph_encode_filepath(&p, end, ino2, path2);
2603 
2604 	/* make note of release offset, in case we need to replay */
2605 	req->r_request_release_offset = p - msg->front.iov_base;
2606 
2607 	/* cap releases */
2608 	releases = 0;
2609 	if (req->r_inode_drop)
2610 		releases += ceph_encode_inode_release(&p,
2611 		      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2612 		      mds, req->r_inode_drop, req->r_inode_unless,
2613 		      req->r_op == CEPH_MDS_OP_READDIR);
2614 	if (req->r_dentry_drop)
2615 		releases += ceph_encode_dentry_release(&p, req->r_dentry,
2616 				req->r_parent, mds, req->r_dentry_drop,
2617 				req->r_dentry_unless);
2618 	if (req->r_old_dentry_drop)
2619 		releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2620 				req->r_old_dentry_dir, mds,
2621 				req->r_old_dentry_drop,
2622 				req->r_old_dentry_unless);
2623 	if (req->r_old_inode_drop)
2624 		releases += ceph_encode_inode_release(&p,
2625 		      d_inode(req->r_old_dentry),
2626 		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2627 
2628 	if (drop_cap_releases) {
2629 		releases = 0;
2630 		p = msg->front.iov_base + req->r_request_release_offset;
2631 	}
2632 
2633 	head->num_releases = cpu_to_le16(releases);
2634 
2635 	encode_timestamp_and_gids(&p, req);
2636 
2637 	if (WARN_ON_ONCE(p > end)) {
2638 		ceph_msg_put(msg);
2639 		msg = ERR_PTR(-ERANGE);
2640 		goto out_free2;
2641 	}
2642 
2643 	msg->front.iov_len = p - msg->front.iov_base;
2644 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2645 
2646 	if (req->r_pagelist) {
2647 		struct ceph_pagelist *pagelist = req->r_pagelist;
2648 		ceph_msg_data_add_pagelist(msg, pagelist);
2649 		msg->hdr.data_len = cpu_to_le32(pagelist->length);
2650 	} else {
2651 		msg->hdr.data_len = 0;
2652 	}
2653 
2654 	msg->hdr.data_off = cpu_to_le16(0);
2655 
2656 out_free2:
2657 	if (freepath2)
2658 		ceph_mdsc_free_path((char *)path2, pathlen2);
2659 out_free1:
2660 	if (freepath1)
2661 		ceph_mdsc_free_path((char *)path1, pathlen1);
2662 out:
2663 	return msg;
2664 }
2665 
2666 /*
2667  * called under mdsc->mutex if error, under no mutex if
2668  * success.
2669  */
2670 static void complete_request(struct ceph_mds_client *mdsc,
2671 			     struct ceph_mds_request *req)
2672 {
2673 	req->r_end_latency = ktime_get();
2674 
2675 	if (req->r_callback)
2676 		req->r_callback(mdsc, req);
2677 	complete_all(&req->r_completion);
2678 }
2679 
2680 static struct ceph_mds_request_head_old *
2681 find_old_request_head(void *p, u64 features)
2682 {
2683 	bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2684 	struct ceph_mds_request_head *new_head;
2685 
2686 	if (legacy)
2687 		return (struct ceph_mds_request_head_old *)p;
2688 	new_head = (struct ceph_mds_request_head *)p;
2689 	return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid;
2690 }
2691 
2692 /*
2693  * called under mdsc->mutex
2694  */
2695 static int __prepare_send_request(struct ceph_mds_session *session,
2696 				  struct ceph_mds_request *req,
2697 				  bool drop_cap_releases)
2698 {
2699 	int mds = session->s_mds;
2700 	struct ceph_mds_client *mdsc = session->s_mdsc;
2701 	struct ceph_mds_request_head_old *rhead;
2702 	struct ceph_msg *msg;
2703 	int flags = 0;
2704 
2705 	req->r_attempts++;
2706 	if (req->r_inode) {
2707 		struct ceph_cap *cap =
2708 			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2709 
2710 		if (cap)
2711 			req->r_sent_on_mseq = cap->mseq;
2712 		else
2713 			req->r_sent_on_mseq = -1;
2714 	}
2715 	dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2716 	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2717 
2718 	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2719 		void *p;
2720 
2721 		/*
2722 		 * Replay.  Do not regenerate message (and rebuild
2723 		 * paths, etc.); just use the original message.
2724 		 * Rebuilding paths will break for renames because
2725 		 * d_move mangles the src name.
2726 		 */
2727 		msg = req->r_request;
2728 		rhead = find_old_request_head(msg->front.iov_base,
2729 					      session->s_con.peer_features);
2730 
2731 		flags = le32_to_cpu(rhead->flags);
2732 		flags |= CEPH_MDS_FLAG_REPLAY;
2733 		rhead->flags = cpu_to_le32(flags);
2734 
2735 		if (req->r_target_inode)
2736 			rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2737 
2738 		rhead->num_retry = req->r_attempts - 1;
2739 
2740 		/* remove cap/dentry releases from message */
2741 		rhead->num_releases = 0;
2742 
2743 		p = msg->front.iov_base + req->r_request_release_offset;
2744 		encode_timestamp_and_gids(&p, req);
2745 
2746 		msg->front.iov_len = p - msg->front.iov_base;
2747 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2748 		return 0;
2749 	}
2750 
2751 	if (req->r_request) {
2752 		ceph_msg_put(req->r_request);
2753 		req->r_request = NULL;
2754 	}
2755 	msg = create_request_message(session, req, drop_cap_releases);
2756 	if (IS_ERR(msg)) {
2757 		req->r_err = PTR_ERR(msg);
2758 		return PTR_ERR(msg);
2759 	}
2760 	req->r_request = msg;
2761 
2762 	rhead = find_old_request_head(msg->front.iov_base,
2763 				      session->s_con.peer_features);
2764 	rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2765 	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2766 		flags |= CEPH_MDS_FLAG_REPLAY;
2767 	if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2768 		flags |= CEPH_MDS_FLAG_ASYNC;
2769 	if (req->r_parent)
2770 		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2771 	rhead->flags = cpu_to_le32(flags);
2772 	rhead->num_fwd = req->r_num_fwd;
2773 	rhead->num_retry = req->r_attempts - 1;
2774 
2775 	dout(" r_parent = %p\n", req->r_parent);
2776 	return 0;
2777 }
2778 
2779 /*
2780  * called under mdsc->mutex
2781  */
2782 static int __send_request(struct ceph_mds_session *session,
2783 			  struct ceph_mds_request *req,
2784 			  bool drop_cap_releases)
2785 {
2786 	int err;
2787 
2788 	err = __prepare_send_request(session, req, drop_cap_releases);
2789 	if (!err) {
2790 		ceph_msg_get(req->r_request);
2791 		ceph_con_send(&session->s_con, req->r_request);
2792 	}
2793 
2794 	return err;
2795 }
2796 
2797 /*
2798  * send request, or put it on the appropriate wait list.
2799  */
2800 static void __do_request(struct ceph_mds_client *mdsc,
2801 			struct ceph_mds_request *req)
2802 {
2803 	struct ceph_mds_session *session = NULL;
2804 	int mds = -1;
2805 	int err = 0;
2806 	bool random;
2807 
2808 	if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2809 		if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2810 			__unregister_request(mdsc, req);
2811 		return;
2812 	}
2813 
2814 	if (req->r_timeout &&
2815 	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2816 		dout("do_request timed out\n");
2817 		err = -ETIMEDOUT;
2818 		goto finish;
2819 	}
2820 	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2821 		dout("do_request forced umount\n");
2822 		err = -EIO;
2823 		goto finish;
2824 	}
2825 	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2826 		if (mdsc->mdsmap_err) {
2827 			err = mdsc->mdsmap_err;
2828 			dout("do_request mdsmap err %d\n", err);
2829 			goto finish;
2830 		}
2831 		if (mdsc->mdsmap->m_epoch == 0) {
2832 			dout("do_request no mdsmap, waiting for map\n");
2833 			list_add(&req->r_wait, &mdsc->waiting_for_map);
2834 			return;
2835 		}
2836 		if (!(mdsc->fsc->mount_options->flags &
2837 		      CEPH_MOUNT_OPT_MOUNTWAIT) &&
2838 		    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2839 			err = -EHOSTUNREACH;
2840 			goto finish;
2841 		}
2842 	}
2843 
2844 	put_request_session(req);
2845 
2846 	mds = __choose_mds(mdsc, req, &random);
2847 	if (mds < 0 ||
2848 	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2849 		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2850 			err = -EJUKEBOX;
2851 			goto finish;
2852 		}
2853 		dout("do_request no mds or not active, waiting for map\n");
2854 		list_add(&req->r_wait, &mdsc->waiting_for_map);
2855 		return;
2856 	}
2857 
2858 	/* get, open session */
2859 	session = __ceph_lookup_mds_session(mdsc, mds);
2860 	if (!session) {
2861 		session = register_session(mdsc, mds);
2862 		if (IS_ERR(session)) {
2863 			err = PTR_ERR(session);
2864 			goto finish;
2865 		}
2866 	}
2867 	req->r_session = ceph_get_mds_session(session);
2868 
2869 	dout("do_request mds%d session %p state %s\n", mds, session,
2870 	     ceph_session_state_name(session->s_state));
2871 	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2872 	    session->s_state != CEPH_MDS_SESSION_HUNG) {
2873 		/*
2874 		 * We cannot queue async requests since the caps and delegated
2875 		 * inodes are bound to the session. Just return -EJUKEBOX and
2876 		 * let the caller retry a sync request in that case.
2877 		 */
2878 		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2879 			err = -EJUKEBOX;
2880 			goto out_session;
2881 		}
2882 
2883 		/*
2884 		 * If the session has been REJECTED, then return a hard error,
2885 		 * unless it's a CLEANRECOVER mount, in which case we'll queue
2886 		 * it to the mdsc queue.
2887 		 */
2888 		if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2889 			if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
2890 				list_add(&req->r_wait, &mdsc->waiting_for_map);
2891 			else
2892 				err = -EACCES;
2893 			goto out_session;
2894 		}
2895 
2896 		if (session->s_state == CEPH_MDS_SESSION_NEW ||
2897 		    session->s_state == CEPH_MDS_SESSION_CLOSING) {
2898 			err = __open_session(mdsc, session);
2899 			if (err)
2900 				goto out_session;
2901 			/* retry the same mds later */
2902 			if (random)
2903 				req->r_resend_mds = mds;
2904 		}
2905 		list_add(&req->r_wait, &session->s_waiting);
2906 		goto out_session;
2907 	}
2908 
2909 	/* send request */
2910 	req->r_resend_mds = -1;   /* forget any previous mds hint */
2911 
2912 	if (req->r_request_started == 0)   /* note request start time */
2913 		req->r_request_started = jiffies;
2914 
2915 	err = __send_request(session, req, false);
2916 
2917 out_session:
2918 	ceph_put_mds_session(session);
2919 finish:
2920 	if (err) {
2921 		dout("__do_request early error %d\n", err);
2922 		req->r_err = err;
2923 		complete_request(mdsc, req);
2924 		__unregister_request(mdsc, req);
2925 	}
2926 	return;
2927 }
2928 
2929 /*
2930  * called under mdsc->mutex
2931  */
2932 static void __wake_requests(struct ceph_mds_client *mdsc,
2933 			    struct list_head *head)
2934 {
2935 	struct ceph_mds_request *req;
2936 	LIST_HEAD(tmp_list);
2937 
2938 	list_splice_init(head, &tmp_list);
2939 
2940 	while (!list_empty(&tmp_list)) {
2941 		req = list_entry(tmp_list.next,
2942 				 struct ceph_mds_request, r_wait);
2943 		list_del_init(&req->r_wait);
2944 		dout(" wake request %p tid %llu\n", req, req->r_tid);
2945 		__do_request(mdsc, req);
2946 	}
2947 }
2948 
2949 /*
2950  * Wake up threads with requests pending for @mds, so that they can
2951  * resubmit their requests to a possibly different mds.
2952  */
2953 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2954 {
2955 	struct ceph_mds_request *req;
2956 	struct rb_node *p = rb_first(&mdsc->request_tree);
2957 
2958 	dout("kick_requests mds%d\n", mds);
2959 	while (p) {
2960 		req = rb_entry(p, struct ceph_mds_request, r_node);
2961 		p = rb_next(p);
2962 		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2963 			continue;
2964 		if (req->r_attempts > 0)
2965 			continue; /* only new requests */
2966 		if (req->r_session &&
2967 		    req->r_session->s_mds == mds) {
2968 			dout(" kicking tid %llu\n", req->r_tid);
2969 			list_del_init(&req->r_wait);
2970 			__do_request(mdsc, req);
2971 		}
2972 	}
2973 }
2974 
2975 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2976 			      struct ceph_mds_request *req)
2977 {
2978 	int err = 0;
2979 
2980 	/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2981 	if (req->r_inode)
2982 		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2983 	if (req->r_parent) {
2984 		struct ceph_inode_info *ci = ceph_inode(req->r_parent);
2985 		int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
2986 			    CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
2987 		spin_lock(&ci->i_ceph_lock);
2988 		ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
2989 		__ceph_touch_fmode(ci, mdsc, fmode);
2990 		spin_unlock(&ci->i_ceph_lock);
2991 		ihold(req->r_parent);
2992 	}
2993 	if (req->r_old_dentry_dir)
2994 		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2995 				  CEPH_CAP_PIN);
2996 
2997 	if (req->r_inode) {
2998 		err = ceph_wait_on_async_create(req->r_inode);
2999 		if (err) {
3000 			dout("%s: wait for async create returned: %d\n",
3001 			     __func__, err);
3002 			return err;
3003 		}
3004 	}
3005 
3006 	if (!err && req->r_old_inode) {
3007 		err = ceph_wait_on_async_create(req->r_old_inode);
3008 		if (err) {
3009 			dout("%s: wait for async create returned: %d\n",
3010 			     __func__, err);
3011 			return err;
3012 		}
3013 	}
3014 
3015 	dout("submit_request on %p for inode %p\n", req, dir);
3016 	mutex_lock(&mdsc->mutex);
3017 	__register_request(mdsc, req, dir);
3018 	__do_request(mdsc, req);
3019 	err = req->r_err;
3020 	mutex_unlock(&mdsc->mutex);
3021 	return err;
3022 }
3023 
3024 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3025 				  struct ceph_mds_request *req)
3026 {
3027 	int err;
3028 
3029 	/* wait */
3030 	dout("do_request waiting\n");
3031 	if (!req->r_timeout && req->r_wait_for_completion) {
3032 		err = req->r_wait_for_completion(mdsc, req);
3033 	} else {
3034 		long timeleft = wait_for_completion_killable_timeout(
3035 					&req->r_completion,
3036 					ceph_timeout_jiffies(req->r_timeout));
3037 		if (timeleft > 0)
3038 			err = 0;
3039 		else if (!timeleft)
3040 			err = -ETIMEDOUT;  /* timed out */
3041 		else
3042 			err = timeleft;  /* killed */
3043 	}
3044 	dout("do_request waited, got %d\n", err);
3045 	mutex_lock(&mdsc->mutex);
3046 
3047 	/* only abort if we didn't race with a real reply */
3048 	if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3049 		err = le32_to_cpu(req->r_reply_info.head->result);
3050 	} else if (err < 0) {
3051 		dout("aborted request %lld with %d\n", req->r_tid, err);
3052 
3053 		/*
3054 		 * ensure we aren't running concurrently with
3055 		 * ceph_fill_trace or ceph_readdir_prepopulate, which
3056 		 * rely on locks (dir mutex) held by our caller.
3057 		 */
3058 		mutex_lock(&req->r_fill_mutex);
3059 		req->r_err = err;
3060 		set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3061 		mutex_unlock(&req->r_fill_mutex);
3062 
3063 		if (req->r_parent &&
3064 		    (req->r_op & CEPH_MDS_OP_WRITE))
3065 			ceph_invalidate_dir_request(req);
3066 	} else {
3067 		err = req->r_err;
3068 	}
3069 
3070 	mutex_unlock(&mdsc->mutex);
3071 	return err;
3072 }
3073 
3074 /*
3075  * Synchrously perform an mds request.  Take care of all of the
3076  * session setup, forwarding, retry details.
3077  */
3078 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3079 			 struct inode *dir,
3080 			 struct ceph_mds_request *req)
3081 {
3082 	int err;
3083 
3084 	dout("do_request on %p\n", req);
3085 
3086 	/* issue */
3087 	err = ceph_mdsc_submit_request(mdsc, dir, req);
3088 	if (!err)
3089 		err = ceph_mdsc_wait_request(mdsc, req);
3090 	dout("do_request %p done, result %d\n", req, err);
3091 	return err;
3092 }
3093 
3094 /*
3095  * Invalidate dir's completeness, dentry lease state on an aborted MDS
3096  * namespace request.
3097  */
3098 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3099 {
3100 	struct inode *dir = req->r_parent;
3101 	struct inode *old_dir = req->r_old_dentry_dir;
3102 
3103 	dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3104 
3105 	ceph_dir_clear_complete(dir);
3106 	if (old_dir)
3107 		ceph_dir_clear_complete(old_dir);
3108 	if (req->r_dentry)
3109 		ceph_invalidate_dentry_lease(req->r_dentry);
3110 	if (req->r_old_dentry)
3111 		ceph_invalidate_dentry_lease(req->r_old_dentry);
3112 }
3113 
3114 /*
3115  * Handle mds reply.
3116  *
3117  * We take the session mutex and parse and process the reply immediately.
3118  * This preserves the logical ordering of replies, capabilities, etc., sent
3119  * by the MDS as they are applied to our local cache.
3120  */
3121 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3122 {
3123 	struct ceph_mds_client *mdsc = session->s_mdsc;
3124 	struct ceph_mds_request *req;
3125 	struct ceph_mds_reply_head *head = msg->front.iov_base;
3126 	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
3127 	struct ceph_snap_realm *realm;
3128 	u64 tid;
3129 	int err, result;
3130 	int mds = session->s_mds;
3131 
3132 	if (msg->front.iov_len < sizeof(*head)) {
3133 		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3134 		ceph_msg_dump(msg);
3135 		return;
3136 	}
3137 
3138 	/* get request, session */
3139 	tid = le64_to_cpu(msg->hdr.tid);
3140 	mutex_lock(&mdsc->mutex);
3141 	req = lookup_get_request(mdsc, tid);
3142 	if (!req) {
3143 		dout("handle_reply on unknown tid %llu\n", tid);
3144 		mutex_unlock(&mdsc->mutex);
3145 		return;
3146 	}
3147 	dout("handle_reply %p\n", req);
3148 
3149 	/* correct session? */
3150 	if (req->r_session != session) {
3151 		pr_err("mdsc_handle_reply got %llu on session mds%d"
3152 		       " not mds%d\n", tid, session->s_mds,
3153 		       req->r_session ? req->r_session->s_mds : -1);
3154 		mutex_unlock(&mdsc->mutex);
3155 		goto out;
3156 	}
3157 
3158 	/* dup? */
3159 	if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3160 	    (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3161 		pr_warn("got a dup %s reply on %llu from mds%d\n",
3162 			   head->safe ? "safe" : "unsafe", tid, mds);
3163 		mutex_unlock(&mdsc->mutex);
3164 		goto out;
3165 	}
3166 	if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3167 		pr_warn("got unsafe after safe on %llu from mds%d\n",
3168 			   tid, mds);
3169 		mutex_unlock(&mdsc->mutex);
3170 		goto out;
3171 	}
3172 
3173 	result = le32_to_cpu(head->result);
3174 
3175 	/*
3176 	 * Handle an ESTALE
3177 	 * if we're not talking to the authority, send to them
3178 	 * if the authority has changed while we weren't looking,
3179 	 * send to new authority
3180 	 * Otherwise we just have to return an ESTALE
3181 	 */
3182 	if (result == -ESTALE) {
3183 		dout("got ESTALE on request %llu\n", req->r_tid);
3184 		req->r_resend_mds = -1;
3185 		if (req->r_direct_mode != USE_AUTH_MDS) {
3186 			dout("not using auth, setting for that now\n");
3187 			req->r_direct_mode = USE_AUTH_MDS;
3188 			__do_request(mdsc, req);
3189 			mutex_unlock(&mdsc->mutex);
3190 			goto out;
3191 		} else  {
3192 			int mds = __choose_mds(mdsc, req, NULL);
3193 			if (mds >= 0 && mds != req->r_session->s_mds) {
3194 				dout("but auth changed, so resending\n");
3195 				__do_request(mdsc, req);
3196 				mutex_unlock(&mdsc->mutex);
3197 				goto out;
3198 			}
3199 		}
3200 		dout("have to return ESTALE on request %llu\n", req->r_tid);
3201 	}
3202 
3203 
3204 	if (head->safe) {
3205 		set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3206 		__unregister_request(mdsc, req);
3207 
3208 		/* last request during umount? */
3209 		if (mdsc->stopping && !__get_oldest_req(mdsc))
3210 			complete_all(&mdsc->safe_umount_waiters);
3211 
3212 		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3213 			/*
3214 			 * We already handled the unsafe response, now do the
3215 			 * cleanup.  No need to examine the response; the MDS
3216 			 * doesn't include any result info in the safe
3217 			 * response.  And even if it did, there is nothing
3218 			 * useful we could do with a revised return value.
3219 			 */
3220 			dout("got safe reply %llu, mds%d\n", tid, mds);
3221 
3222 			mutex_unlock(&mdsc->mutex);
3223 			goto out;
3224 		}
3225 	} else {
3226 		set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3227 		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3228 	}
3229 
3230 	dout("handle_reply tid %lld result %d\n", tid, result);
3231 	rinfo = &req->r_reply_info;
3232 	if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3233 		err = parse_reply_info(session, msg, rinfo, (u64)-1);
3234 	else
3235 		err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
3236 	mutex_unlock(&mdsc->mutex);
3237 
3238 	/* Must find target inode outside of mutexes to avoid deadlocks */
3239 	if ((err >= 0) && rinfo->head->is_target) {
3240 		struct inode *in;
3241 		struct ceph_vino tvino = {
3242 			.ino  = le64_to_cpu(rinfo->targeti.in->ino),
3243 			.snap = le64_to_cpu(rinfo->targeti.in->snapid)
3244 		};
3245 
3246 		in = ceph_get_inode(mdsc->fsc->sb, tvino);
3247 		if (IS_ERR(in)) {
3248 			err = PTR_ERR(in);
3249 			mutex_lock(&session->s_mutex);
3250 			goto out_err;
3251 		}
3252 		req->r_target_inode = in;
3253 	}
3254 
3255 	mutex_lock(&session->s_mutex);
3256 	if (err < 0) {
3257 		pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3258 		ceph_msg_dump(msg);
3259 		goto out_err;
3260 	}
3261 
3262 	/* snap trace */
3263 	realm = NULL;
3264 	if (rinfo->snapblob_len) {
3265 		down_write(&mdsc->snap_rwsem);
3266 		ceph_update_snap_trace(mdsc, rinfo->snapblob,
3267 				rinfo->snapblob + rinfo->snapblob_len,
3268 				le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3269 				&realm);
3270 		downgrade_write(&mdsc->snap_rwsem);
3271 	} else {
3272 		down_read(&mdsc->snap_rwsem);
3273 	}
3274 
3275 	/* insert trace into our cache */
3276 	mutex_lock(&req->r_fill_mutex);
3277 	current->journal_info = req;
3278 	err = ceph_fill_trace(mdsc->fsc->sb, req);
3279 	if (err == 0) {
3280 		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3281 				    req->r_op == CEPH_MDS_OP_LSSNAP))
3282 			ceph_readdir_prepopulate(req, req->r_session);
3283 	}
3284 	current->journal_info = NULL;
3285 	mutex_unlock(&req->r_fill_mutex);
3286 
3287 	up_read(&mdsc->snap_rwsem);
3288 	if (realm)
3289 		ceph_put_snap_realm(mdsc, realm);
3290 
3291 	if (err == 0) {
3292 		if (req->r_target_inode &&
3293 		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3294 			struct ceph_inode_info *ci =
3295 				ceph_inode(req->r_target_inode);
3296 			spin_lock(&ci->i_unsafe_lock);
3297 			list_add_tail(&req->r_unsafe_target_item,
3298 				      &ci->i_unsafe_iops);
3299 			spin_unlock(&ci->i_unsafe_lock);
3300 		}
3301 
3302 		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3303 	}
3304 out_err:
3305 	mutex_lock(&mdsc->mutex);
3306 	if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3307 		if (err) {
3308 			req->r_err = err;
3309 		} else {
3310 			req->r_reply =  ceph_msg_get(msg);
3311 			set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3312 		}
3313 	} else {
3314 		dout("reply arrived after request %lld was aborted\n", tid);
3315 	}
3316 	mutex_unlock(&mdsc->mutex);
3317 
3318 	mutex_unlock(&session->s_mutex);
3319 
3320 	/* kick calling process */
3321 	complete_request(mdsc, req);
3322 
3323 	ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
3324 				     req->r_end_latency, err);
3325 out:
3326 	ceph_mdsc_put_request(req);
3327 	return;
3328 }
3329 
3330 
3331 
3332 /*
3333  * handle mds notification that our request has been forwarded.
3334  */
3335 static void handle_forward(struct ceph_mds_client *mdsc,
3336 			   struct ceph_mds_session *session,
3337 			   struct ceph_msg *msg)
3338 {
3339 	struct ceph_mds_request *req;
3340 	u64 tid = le64_to_cpu(msg->hdr.tid);
3341 	u32 next_mds;
3342 	u32 fwd_seq;
3343 	int err = -EINVAL;
3344 	void *p = msg->front.iov_base;
3345 	void *end = p + msg->front.iov_len;
3346 
3347 	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3348 	next_mds = ceph_decode_32(&p);
3349 	fwd_seq = ceph_decode_32(&p);
3350 
3351 	mutex_lock(&mdsc->mutex);
3352 	req = lookup_get_request(mdsc, tid);
3353 	if (!req) {
3354 		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3355 		goto out;  /* dup reply? */
3356 	}
3357 
3358 	if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3359 		dout("forward tid %llu aborted, unregistering\n", tid);
3360 		__unregister_request(mdsc, req);
3361 	} else if (fwd_seq <= req->r_num_fwd) {
3362 		dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3363 		     tid, next_mds, req->r_num_fwd, fwd_seq);
3364 	} else {
3365 		/* resend. forward race not possible; mds would drop */
3366 		dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3367 		BUG_ON(req->r_err);
3368 		BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3369 		req->r_attempts = 0;
3370 		req->r_num_fwd = fwd_seq;
3371 		req->r_resend_mds = next_mds;
3372 		put_request_session(req);
3373 		__do_request(mdsc, req);
3374 	}
3375 	ceph_mdsc_put_request(req);
3376 out:
3377 	mutex_unlock(&mdsc->mutex);
3378 	return;
3379 
3380 bad:
3381 	pr_err("mdsc_handle_forward decode error err=%d\n", err);
3382 }
3383 
3384 static int __decode_session_metadata(void **p, void *end,
3385 				     bool *blocklisted)
3386 {
3387 	/* map<string,string> */
3388 	u32 n;
3389 	bool err_str;
3390 	ceph_decode_32_safe(p, end, n, bad);
3391 	while (n-- > 0) {
3392 		u32 len;
3393 		ceph_decode_32_safe(p, end, len, bad);
3394 		ceph_decode_need(p, end, len, bad);
3395 		err_str = !strncmp(*p, "error_string", len);
3396 		*p += len;
3397 		ceph_decode_32_safe(p, end, len, bad);
3398 		ceph_decode_need(p, end, len, bad);
3399 		/*
3400 		 * Match "blocklisted (blacklisted)" from newer MDSes,
3401 		 * or "blacklisted" from older MDSes.
3402 		 */
3403 		if (err_str && strnstr(*p, "blacklisted", len))
3404 			*blocklisted = true;
3405 		*p += len;
3406 	}
3407 	return 0;
3408 bad:
3409 	return -1;
3410 }
3411 
3412 /*
3413  * handle a mds session control message
3414  */
3415 static void handle_session(struct ceph_mds_session *session,
3416 			   struct ceph_msg *msg)
3417 {
3418 	struct ceph_mds_client *mdsc = session->s_mdsc;
3419 	int mds = session->s_mds;
3420 	int msg_version = le16_to_cpu(msg->hdr.version);
3421 	void *p = msg->front.iov_base;
3422 	void *end = p + msg->front.iov_len;
3423 	struct ceph_mds_session_head *h;
3424 	u32 op;
3425 	u64 seq, features = 0;
3426 	int wake = 0;
3427 	bool blocklisted = false;
3428 
3429 	/* decode */
3430 	ceph_decode_need(&p, end, sizeof(*h), bad);
3431 	h = p;
3432 	p += sizeof(*h);
3433 
3434 	op = le32_to_cpu(h->op);
3435 	seq = le64_to_cpu(h->seq);
3436 
3437 	if (msg_version >= 3) {
3438 		u32 len;
3439 		/* version >= 2, metadata */
3440 		if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3441 			goto bad;
3442 		/* version >= 3, feature bits */
3443 		ceph_decode_32_safe(&p, end, len, bad);
3444 		if (len) {
3445 			ceph_decode_64_safe(&p, end, features, bad);
3446 			p += len - sizeof(features);
3447 		}
3448 	}
3449 
3450 	mutex_lock(&mdsc->mutex);
3451 	if (op == CEPH_SESSION_CLOSE) {
3452 		ceph_get_mds_session(session);
3453 		__unregister_session(mdsc, session);
3454 	}
3455 	/* FIXME: this ttl calculation is generous */
3456 	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3457 	mutex_unlock(&mdsc->mutex);
3458 
3459 	mutex_lock(&session->s_mutex);
3460 
3461 	dout("handle_session mds%d %s %p state %s seq %llu\n",
3462 	     mds, ceph_session_op_name(op), session,
3463 	     ceph_session_state_name(session->s_state), seq);
3464 
3465 	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3466 		session->s_state = CEPH_MDS_SESSION_OPEN;
3467 		pr_info("mds%d came back\n", session->s_mds);
3468 	}
3469 
3470 	switch (op) {
3471 	case CEPH_SESSION_OPEN:
3472 		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3473 			pr_info("mds%d reconnect success\n", session->s_mds);
3474 		session->s_state = CEPH_MDS_SESSION_OPEN;
3475 		session->s_features = features;
3476 		renewed_caps(mdsc, session, 0);
3477 		if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
3478 			metric_schedule_delayed(&mdsc->metric);
3479 		wake = 1;
3480 		if (mdsc->stopping)
3481 			__close_session(mdsc, session);
3482 		break;
3483 
3484 	case CEPH_SESSION_RENEWCAPS:
3485 		if (session->s_renew_seq == seq)
3486 			renewed_caps(mdsc, session, 1);
3487 		break;
3488 
3489 	case CEPH_SESSION_CLOSE:
3490 		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3491 			pr_info("mds%d reconnect denied\n", session->s_mds);
3492 		session->s_state = CEPH_MDS_SESSION_CLOSED;
3493 		cleanup_session_requests(mdsc, session);
3494 		remove_session_caps(session);
3495 		wake = 2; /* for good measure */
3496 		wake_up_all(&mdsc->session_close_wq);
3497 		break;
3498 
3499 	case CEPH_SESSION_STALE:
3500 		pr_info("mds%d caps went stale, renewing\n",
3501 			session->s_mds);
3502 		spin_lock(&session->s_gen_ttl_lock);
3503 		session->s_cap_gen++;
3504 		session->s_cap_ttl = jiffies - 1;
3505 		spin_unlock(&session->s_gen_ttl_lock);
3506 		send_renew_caps(mdsc, session);
3507 		break;
3508 
3509 	case CEPH_SESSION_RECALL_STATE:
3510 		ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3511 		break;
3512 
3513 	case CEPH_SESSION_FLUSHMSG:
3514 		send_flushmsg_ack(mdsc, session, seq);
3515 		break;
3516 
3517 	case CEPH_SESSION_FORCE_RO:
3518 		dout("force_session_readonly %p\n", session);
3519 		spin_lock(&session->s_cap_lock);
3520 		session->s_readonly = true;
3521 		spin_unlock(&session->s_cap_lock);
3522 		wake_up_session_caps(session, FORCE_RO);
3523 		break;
3524 
3525 	case CEPH_SESSION_REJECT:
3526 		WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3527 		pr_info("mds%d rejected session\n", session->s_mds);
3528 		session->s_state = CEPH_MDS_SESSION_REJECTED;
3529 		cleanup_session_requests(mdsc, session);
3530 		remove_session_caps(session);
3531 		if (blocklisted)
3532 			mdsc->fsc->blocklisted = true;
3533 		wake = 2; /* for good measure */
3534 		break;
3535 
3536 	default:
3537 		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3538 		WARN_ON(1);
3539 	}
3540 
3541 	mutex_unlock(&session->s_mutex);
3542 	if (wake) {
3543 		mutex_lock(&mdsc->mutex);
3544 		__wake_requests(mdsc, &session->s_waiting);
3545 		if (wake == 2)
3546 			kick_requests(mdsc, mds);
3547 		mutex_unlock(&mdsc->mutex);
3548 	}
3549 	if (op == CEPH_SESSION_CLOSE)
3550 		ceph_put_mds_session(session);
3551 	return;
3552 
3553 bad:
3554 	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3555 	       (int)msg->front.iov_len);
3556 	ceph_msg_dump(msg);
3557 	return;
3558 }
3559 
3560 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3561 {
3562 	int dcaps;
3563 
3564 	dcaps = xchg(&req->r_dir_caps, 0);
3565 	if (dcaps) {
3566 		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3567 		ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3568 	}
3569 }
3570 
3571 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3572 {
3573 	int dcaps;
3574 
3575 	dcaps = xchg(&req->r_dir_caps, 0);
3576 	if (dcaps) {
3577 		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3578 		ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3579 						dcaps);
3580 	}
3581 }
3582 
3583 /*
3584  * called under session->mutex.
3585  */
3586 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3587 				   struct ceph_mds_session *session)
3588 {
3589 	struct ceph_mds_request *req, *nreq;
3590 	struct rb_node *p;
3591 
3592 	dout("replay_unsafe_requests mds%d\n", session->s_mds);
3593 
3594 	mutex_lock(&mdsc->mutex);
3595 	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3596 		__send_request(session, req, true);
3597 
3598 	/*
3599 	 * also re-send old requests when MDS enters reconnect stage. So that MDS
3600 	 * can process completed request in clientreplay stage.
3601 	 */
3602 	p = rb_first(&mdsc->request_tree);
3603 	while (p) {
3604 		req = rb_entry(p, struct ceph_mds_request, r_node);
3605 		p = rb_next(p);
3606 		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3607 			continue;
3608 		if (req->r_attempts == 0)
3609 			continue; /* only old requests */
3610 		if (!req->r_session)
3611 			continue;
3612 		if (req->r_session->s_mds != session->s_mds)
3613 			continue;
3614 
3615 		ceph_mdsc_release_dir_caps_no_check(req);
3616 
3617 		__send_request(session, req, true);
3618 	}
3619 	mutex_unlock(&mdsc->mutex);
3620 }
3621 
3622 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3623 {
3624 	struct ceph_msg *reply;
3625 	struct ceph_pagelist *_pagelist;
3626 	struct page *page;
3627 	__le32 *addr;
3628 	int err = -ENOMEM;
3629 
3630 	if (!recon_state->allow_multi)
3631 		return -ENOSPC;
3632 
3633 	/* can't handle message that contains both caps and realm */
3634 	BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3635 
3636 	/* pre-allocate new pagelist */
3637 	_pagelist = ceph_pagelist_alloc(GFP_NOFS);
3638 	if (!_pagelist)
3639 		return -ENOMEM;
3640 
3641 	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3642 	if (!reply)
3643 		goto fail_msg;
3644 
3645 	/* placeholder for nr_caps */
3646 	err = ceph_pagelist_encode_32(_pagelist, 0);
3647 	if (err < 0)
3648 		goto fail;
3649 
3650 	if (recon_state->nr_caps) {
3651 		/* currently encoding caps */
3652 		err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3653 		if (err)
3654 			goto fail;
3655 	} else {
3656 		/* placeholder for nr_realms (currently encoding relams) */
3657 		err = ceph_pagelist_encode_32(_pagelist, 0);
3658 		if (err < 0)
3659 			goto fail;
3660 	}
3661 
3662 	err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3663 	if (err)
3664 		goto fail;
3665 
3666 	page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3667 	addr = kmap_atomic(page);
3668 	if (recon_state->nr_caps) {
3669 		/* currently encoding caps */
3670 		*addr = cpu_to_le32(recon_state->nr_caps);
3671 	} else {
3672 		/* currently encoding relams */
3673 		*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3674 	}
3675 	kunmap_atomic(addr);
3676 
3677 	reply->hdr.version = cpu_to_le16(5);
3678 	reply->hdr.compat_version = cpu_to_le16(4);
3679 
3680 	reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3681 	ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3682 
3683 	ceph_con_send(&recon_state->session->s_con, reply);
3684 	ceph_pagelist_release(recon_state->pagelist);
3685 
3686 	recon_state->pagelist = _pagelist;
3687 	recon_state->nr_caps = 0;
3688 	recon_state->nr_realms = 0;
3689 	recon_state->msg_version = 5;
3690 	return 0;
3691 fail:
3692 	ceph_msg_put(reply);
3693 fail_msg:
3694 	ceph_pagelist_release(_pagelist);
3695 	return err;
3696 }
3697 
3698 static struct dentry* d_find_primary(struct inode *inode)
3699 {
3700 	struct dentry *alias, *dn = NULL;
3701 
3702 	if (hlist_empty(&inode->i_dentry))
3703 		return NULL;
3704 
3705 	spin_lock(&inode->i_lock);
3706 	if (hlist_empty(&inode->i_dentry))
3707 		goto out_unlock;
3708 
3709 	if (S_ISDIR(inode->i_mode)) {
3710 		alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
3711 		if (!IS_ROOT(alias))
3712 			dn = dget(alias);
3713 		goto out_unlock;
3714 	}
3715 
3716 	hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
3717 		spin_lock(&alias->d_lock);
3718 		if (!d_unhashed(alias) &&
3719 		    (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
3720 			dn = dget_dlock(alias);
3721 		}
3722 		spin_unlock(&alias->d_lock);
3723 		if (dn)
3724 			break;
3725 	}
3726 out_unlock:
3727 	spin_unlock(&inode->i_lock);
3728 	return dn;
3729 }
3730 
3731 /*
3732  * Encode information about a cap for a reconnect with the MDS.
3733  */
3734 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
3735 			  void *arg)
3736 {
3737 	union {
3738 		struct ceph_mds_cap_reconnect v2;
3739 		struct ceph_mds_cap_reconnect_v1 v1;
3740 	} rec;
3741 	struct ceph_inode_info *ci = cap->ci;
3742 	struct ceph_reconnect_state *recon_state = arg;
3743 	struct ceph_pagelist *pagelist = recon_state->pagelist;
3744 	struct dentry *dentry;
3745 	char *path;
3746 	int pathlen, err;
3747 	u64 pathbase;
3748 	u64 snap_follows;
3749 
3750 	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3751 	     inode, ceph_vinop(inode), cap, cap->cap_id,
3752 	     ceph_cap_string(cap->issued));
3753 
3754 	dentry = d_find_primary(inode);
3755 	if (dentry) {
3756 		/* set pathbase to parent dir when msg_version >= 2 */
3757 		path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
3758 					    recon_state->msg_version >= 2);
3759 		dput(dentry);
3760 		if (IS_ERR(path)) {
3761 			err = PTR_ERR(path);
3762 			goto out_err;
3763 		}
3764 	} else {
3765 		path = NULL;
3766 		pathlen = 0;
3767 		pathbase = 0;
3768 	}
3769 
3770 	spin_lock(&ci->i_ceph_lock);
3771 	cap->seq = 0;        /* reset cap seq */
3772 	cap->issue_seq = 0;  /* and issue_seq */
3773 	cap->mseq = 0;       /* and migrate_seq */
3774 	cap->cap_gen = cap->session->s_cap_gen;
3775 
3776 	/* These are lost when the session goes away */
3777 	if (S_ISDIR(inode->i_mode)) {
3778 		if (cap->issued & CEPH_CAP_DIR_CREATE) {
3779 			ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3780 			memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3781 		}
3782 		cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3783 	}
3784 
3785 	if (recon_state->msg_version >= 2) {
3786 		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3787 		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3788 		rec.v2.issued = cpu_to_le32(cap->issued);
3789 		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3790 		rec.v2.pathbase = cpu_to_le64(pathbase);
3791 		rec.v2.flock_len = (__force __le32)
3792 			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3793 	} else {
3794 		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3795 		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3796 		rec.v1.issued = cpu_to_le32(cap->issued);
3797 		rec.v1.size = cpu_to_le64(i_size_read(inode));
3798 		ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3799 		ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3800 		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3801 		rec.v1.pathbase = cpu_to_le64(pathbase);
3802 	}
3803 
3804 	if (list_empty(&ci->i_cap_snaps)) {
3805 		snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3806 	} else {
3807 		struct ceph_cap_snap *capsnap =
3808 			list_first_entry(&ci->i_cap_snaps,
3809 					 struct ceph_cap_snap, ci_item);
3810 		snap_follows = capsnap->follows;
3811 	}
3812 	spin_unlock(&ci->i_ceph_lock);
3813 
3814 	if (recon_state->msg_version >= 2) {
3815 		int num_fcntl_locks, num_flock_locks;
3816 		struct ceph_filelock *flocks = NULL;
3817 		size_t struct_len, total_len = sizeof(u64);
3818 		u8 struct_v = 0;
3819 
3820 encode_again:
3821 		if (rec.v2.flock_len) {
3822 			ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3823 		} else {
3824 			num_fcntl_locks = 0;
3825 			num_flock_locks = 0;
3826 		}
3827 		if (num_fcntl_locks + num_flock_locks > 0) {
3828 			flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3829 					       sizeof(struct ceph_filelock),
3830 					       GFP_NOFS);
3831 			if (!flocks) {
3832 				err = -ENOMEM;
3833 				goto out_err;
3834 			}
3835 			err = ceph_encode_locks_to_buffer(inode, flocks,
3836 							  num_fcntl_locks,
3837 							  num_flock_locks);
3838 			if (err) {
3839 				kfree(flocks);
3840 				flocks = NULL;
3841 				if (err == -ENOSPC)
3842 					goto encode_again;
3843 				goto out_err;
3844 			}
3845 		} else {
3846 			kfree(flocks);
3847 			flocks = NULL;
3848 		}
3849 
3850 		if (recon_state->msg_version >= 3) {
3851 			/* version, compat_version and struct_len */
3852 			total_len += 2 * sizeof(u8) + sizeof(u32);
3853 			struct_v = 2;
3854 		}
3855 		/*
3856 		 * number of encoded locks is stable, so copy to pagelist
3857 		 */
3858 		struct_len = 2 * sizeof(u32) +
3859 			    (num_fcntl_locks + num_flock_locks) *
3860 			    sizeof(struct ceph_filelock);
3861 		rec.v2.flock_len = cpu_to_le32(struct_len);
3862 
3863 		struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
3864 
3865 		if (struct_v >= 2)
3866 			struct_len += sizeof(u64); /* snap_follows */
3867 
3868 		total_len += struct_len;
3869 
3870 		if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3871 			err = send_reconnect_partial(recon_state);
3872 			if (err)
3873 				goto out_freeflocks;
3874 			pagelist = recon_state->pagelist;
3875 		}
3876 
3877 		err = ceph_pagelist_reserve(pagelist, total_len);
3878 		if (err)
3879 			goto out_freeflocks;
3880 
3881 		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3882 		if (recon_state->msg_version >= 3) {
3883 			ceph_pagelist_encode_8(pagelist, struct_v);
3884 			ceph_pagelist_encode_8(pagelist, 1);
3885 			ceph_pagelist_encode_32(pagelist, struct_len);
3886 		}
3887 		ceph_pagelist_encode_string(pagelist, path, pathlen);
3888 		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3889 		ceph_locks_to_pagelist(flocks, pagelist,
3890 				       num_fcntl_locks, num_flock_locks);
3891 		if (struct_v >= 2)
3892 			ceph_pagelist_encode_64(pagelist, snap_follows);
3893 out_freeflocks:
3894 		kfree(flocks);
3895 	} else {
3896 		err = ceph_pagelist_reserve(pagelist,
3897 					    sizeof(u64) + sizeof(u32) +
3898 					    pathlen + sizeof(rec.v1));
3899 		if (err)
3900 			goto out_err;
3901 
3902 		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3903 		ceph_pagelist_encode_string(pagelist, path, pathlen);
3904 		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3905 	}
3906 
3907 out_err:
3908 	ceph_mdsc_free_path(path, pathlen);
3909 	if (!err)
3910 		recon_state->nr_caps++;
3911 	return err;
3912 }
3913 
3914 static int encode_snap_realms(struct ceph_mds_client *mdsc,
3915 			      struct ceph_reconnect_state *recon_state)
3916 {
3917 	struct rb_node *p;
3918 	struct ceph_pagelist *pagelist = recon_state->pagelist;
3919 	int err = 0;
3920 
3921 	if (recon_state->msg_version >= 4) {
3922 		err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3923 		if (err < 0)
3924 			goto fail;
3925 	}
3926 
3927 	/*
3928 	 * snaprealms.  we provide mds with the ino, seq (version), and
3929 	 * parent for all of our realms.  If the mds has any newer info,
3930 	 * it will tell us.
3931 	 */
3932 	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3933 		struct ceph_snap_realm *realm =
3934 		       rb_entry(p, struct ceph_snap_realm, node);
3935 		struct ceph_mds_snaprealm_reconnect sr_rec;
3936 
3937 		if (recon_state->msg_version >= 4) {
3938 			size_t need = sizeof(u8) * 2 + sizeof(u32) +
3939 				      sizeof(sr_rec);
3940 
3941 			if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3942 				err = send_reconnect_partial(recon_state);
3943 				if (err)
3944 					goto fail;
3945 				pagelist = recon_state->pagelist;
3946 			}
3947 
3948 			err = ceph_pagelist_reserve(pagelist, need);
3949 			if (err)
3950 				goto fail;
3951 
3952 			ceph_pagelist_encode_8(pagelist, 1);
3953 			ceph_pagelist_encode_8(pagelist, 1);
3954 			ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3955 		}
3956 
3957 		dout(" adding snap realm %llx seq %lld parent %llx\n",
3958 		     realm->ino, realm->seq, realm->parent_ino);
3959 		sr_rec.ino = cpu_to_le64(realm->ino);
3960 		sr_rec.seq = cpu_to_le64(realm->seq);
3961 		sr_rec.parent = cpu_to_le64(realm->parent_ino);
3962 
3963 		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3964 		if (err)
3965 			goto fail;
3966 
3967 		recon_state->nr_realms++;
3968 	}
3969 fail:
3970 	return err;
3971 }
3972 
3973 
3974 /*
3975  * If an MDS fails and recovers, clients need to reconnect in order to
3976  * reestablish shared state.  This includes all caps issued through
3977  * this session _and_ the snap_realm hierarchy.  Because it's not
3978  * clear which snap realms the mds cares about, we send everything we
3979  * know about.. that ensures we'll then get any new info the
3980  * recovering MDS might have.
3981  *
3982  * This is a relatively heavyweight operation, but it's rare.
3983  */
3984 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3985 			       struct ceph_mds_session *session)
3986 {
3987 	struct ceph_msg *reply;
3988 	int mds = session->s_mds;
3989 	int err = -ENOMEM;
3990 	struct ceph_reconnect_state recon_state = {
3991 		.session = session,
3992 	};
3993 	LIST_HEAD(dispose);
3994 
3995 	pr_info("mds%d reconnect start\n", mds);
3996 
3997 	recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3998 	if (!recon_state.pagelist)
3999 		goto fail_nopagelist;
4000 
4001 	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4002 	if (!reply)
4003 		goto fail_nomsg;
4004 
4005 	xa_destroy(&session->s_delegated_inos);
4006 
4007 	mutex_lock(&session->s_mutex);
4008 	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4009 	session->s_seq = 0;
4010 
4011 	dout("session %p state %s\n", session,
4012 	     ceph_session_state_name(session->s_state));
4013 
4014 	spin_lock(&session->s_gen_ttl_lock);
4015 	session->s_cap_gen++;
4016 	spin_unlock(&session->s_gen_ttl_lock);
4017 
4018 	spin_lock(&session->s_cap_lock);
4019 	/* don't know if session is readonly */
4020 	session->s_readonly = 0;
4021 	/*
4022 	 * notify __ceph_remove_cap() that we are composing cap reconnect.
4023 	 * If a cap get released before being added to the cap reconnect,
4024 	 * __ceph_remove_cap() should skip queuing cap release.
4025 	 */
4026 	session->s_cap_reconnect = 1;
4027 	/* drop old cap expires; we're about to reestablish that state */
4028 	detach_cap_releases(session, &dispose);
4029 	spin_unlock(&session->s_cap_lock);
4030 	dispose_cap_releases(mdsc, &dispose);
4031 
4032 	/* trim unused caps to reduce MDS's cache rejoin time */
4033 	if (mdsc->fsc->sb->s_root)
4034 		shrink_dcache_parent(mdsc->fsc->sb->s_root);
4035 
4036 	ceph_con_close(&session->s_con);
4037 	ceph_con_open(&session->s_con,
4038 		      CEPH_ENTITY_TYPE_MDS, mds,
4039 		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4040 
4041 	/* replay unsafe requests */
4042 	replay_unsafe_requests(mdsc, session);
4043 
4044 	ceph_early_kick_flushing_caps(mdsc, session);
4045 
4046 	down_read(&mdsc->snap_rwsem);
4047 
4048 	/* placeholder for nr_caps */
4049 	err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4050 	if (err)
4051 		goto fail;
4052 
4053 	if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4054 		recon_state.msg_version = 3;
4055 		recon_state.allow_multi = true;
4056 	} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4057 		recon_state.msg_version = 3;
4058 	} else {
4059 		recon_state.msg_version = 2;
4060 	}
4061 	/* trsaverse this session's caps */
4062 	err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4063 
4064 	spin_lock(&session->s_cap_lock);
4065 	session->s_cap_reconnect = 0;
4066 	spin_unlock(&session->s_cap_lock);
4067 
4068 	if (err < 0)
4069 		goto fail;
4070 
4071 	/* check if all realms can be encoded into current message */
4072 	if (mdsc->num_snap_realms) {
4073 		size_t total_len =
4074 			recon_state.pagelist->length +
4075 			mdsc->num_snap_realms *
4076 			sizeof(struct ceph_mds_snaprealm_reconnect);
4077 		if (recon_state.msg_version >= 4) {
4078 			/* number of realms */
4079 			total_len += sizeof(u32);
4080 			/* version, compat_version and struct_len */
4081 			total_len += mdsc->num_snap_realms *
4082 				     (2 * sizeof(u8) + sizeof(u32));
4083 		}
4084 		if (total_len > RECONNECT_MAX_SIZE) {
4085 			if (!recon_state.allow_multi) {
4086 				err = -ENOSPC;
4087 				goto fail;
4088 			}
4089 			if (recon_state.nr_caps) {
4090 				err = send_reconnect_partial(&recon_state);
4091 				if (err)
4092 					goto fail;
4093 			}
4094 			recon_state.msg_version = 5;
4095 		}
4096 	}
4097 
4098 	err = encode_snap_realms(mdsc, &recon_state);
4099 	if (err < 0)
4100 		goto fail;
4101 
4102 	if (recon_state.msg_version >= 5) {
4103 		err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4104 		if (err < 0)
4105 			goto fail;
4106 	}
4107 
4108 	if (recon_state.nr_caps || recon_state.nr_realms) {
4109 		struct page *page =
4110 			list_first_entry(&recon_state.pagelist->head,
4111 					struct page, lru);
4112 		__le32 *addr = kmap_atomic(page);
4113 		if (recon_state.nr_caps) {
4114 			WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4115 			*addr = cpu_to_le32(recon_state.nr_caps);
4116 		} else if (recon_state.msg_version >= 4) {
4117 			*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4118 		}
4119 		kunmap_atomic(addr);
4120 	}
4121 
4122 	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4123 	if (recon_state.msg_version >= 4)
4124 		reply->hdr.compat_version = cpu_to_le16(4);
4125 
4126 	reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4127 	ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4128 
4129 	ceph_con_send(&session->s_con, reply);
4130 
4131 	mutex_unlock(&session->s_mutex);
4132 
4133 	mutex_lock(&mdsc->mutex);
4134 	__wake_requests(mdsc, &session->s_waiting);
4135 	mutex_unlock(&mdsc->mutex);
4136 
4137 	up_read(&mdsc->snap_rwsem);
4138 	ceph_pagelist_release(recon_state.pagelist);
4139 	return;
4140 
4141 fail:
4142 	ceph_msg_put(reply);
4143 	up_read(&mdsc->snap_rwsem);
4144 	mutex_unlock(&session->s_mutex);
4145 fail_nomsg:
4146 	ceph_pagelist_release(recon_state.pagelist);
4147 fail_nopagelist:
4148 	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4149 	return;
4150 }
4151 
4152 
4153 /*
4154  * compare old and new mdsmaps, kicking requests
4155  * and closing out old connections as necessary
4156  *
4157  * called under mdsc->mutex.
4158  */
4159 static void check_new_map(struct ceph_mds_client *mdsc,
4160 			  struct ceph_mdsmap *newmap,
4161 			  struct ceph_mdsmap *oldmap)
4162 {
4163 	int i;
4164 	int oldstate, newstate;
4165 	struct ceph_mds_session *s;
4166 
4167 	dout("check_new_map new %u old %u\n",
4168 	     newmap->m_epoch, oldmap->m_epoch);
4169 
4170 	for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4171 		if (!mdsc->sessions[i])
4172 			continue;
4173 		s = mdsc->sessions[i];
4174 		oldstate = ceph_mdsmap_get_state(oldmap, i);
4175 		newstate = ceph_mdsmap_get_state(newmap, i);
4176 
4177 		dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4178 		     i, ceph_mds_state_name(oldstate),
4179 		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4180 		     ceph_mds_state_name(newstate),
4181 		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4182 		     ceph_session_state_name(s->s_state));
4183 
4184 		if (i >= newmap->possible_max_rank) {
4185 			/* force close session for stopped mds */
4186 			ceph_get_mds_session(s);
4187 			__unregister_session(mdsc, s);
4188 			__wake_requests(mdsc, &s->s_waiting);
4189 			mutex_unlock(&mdsc->mutex);
4190 
4191 			mutex_lock(&s->s_mutex);
4192 			cleanup_session_requests(mdsc, s);
4193 			remove_session_caps(s);
4194 			mutex_unlock(&s->s_mutex);
4195 
4196 			ceph_put_mds_session(s);
4197 
4198 			mutex_lock(&mdsc->mutex);
4199 			kick_requests(mdsc, i);
4200 			continue;
4201 		}
4202 
4203 		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4204 			   ceph_mdsmap_get_addr(newmap, i),
4205 			   sizeof(struct ceph_entity_addr))) {
4206 			/* just close it */
4207 			mutex_unlock(&mdsc->mutex);
4208 			mutex_lock(&s->s_mutex);
4209 			mutex_lock(&mdsc->mutex);
4210 			ceph_con_close(&s->s_con);
4211 			mutex_unlock(&s->s_mutex);
4212 			s->s_state = CEPH_MDS_SESSION_RESTARTING;
4213 		} else if (oldstate == newstate) {
4214 			continue;  /* nothing new with this mds */
4215 		}
4216 
4217 		/*
4218 		 * send reconnect?
4219 		 */
4220 		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4221 		    newstate >= CEPH_MDS_STATE_RECONNECT) {
4222 			mutex_unlock(&mdsc->mutex);
4223 			send_mds_reconnect(mdsc, s);
4224 			mutex_lock(&mdsc->mutex);
4225 		}
4226 
4227 		/*
4228 		 * kick request on any mds that has gone active.
4229 		 */
4230 		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4231 		    newstate >= CEPH_MDS_STATE_ACTIVE) {
4232 			if (oldstate != CEPH_MDS_STATE_CREATING &&
4233 			    oldstate != CEPH_MDS_STATE_STARTING)
4234 				pr_info("mds%d recovery completed\n", s->s_mds);
4235 			kick_requests(mdsc, i);
4236 			mutex_unlock(&mdsc->mutex);
4237 			mutex_lock(&s->s_mutex);
4238 			mutex_lock(&mdsc->mutex);
4239 			ceph_kick_flushing_caps(mdsc, s);
4240 			mutex_unlock(&s->s_mutex);
4241 			wake_up_session_caps(s, RECONNECT);
4242 		}
4243 	}
4244 
4245 	for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4246 		s = mdsc->sessions[i];
4247 		if (!s)
4248 			continue;
4249 		if (!ceph_mdsmap_is_laggy(newmap, i))
4250 			continue;
4251 		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4252 		    s->s_state == CEPH_MDS_SESSION_HUNG ||
4253 		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
4254 			dout(" connecting to export targets of laggy mds%d\n",
4255 			     i);
4256 			__open_export_target_sessions(mdsc, s);
4257 		}
4258 	}
4259 }
4260 
4261 
4262 
4263 /*
4264  * leases
4265  */
4266 
4267 /*
4268  * caller must hold session s_mutex, dentry->d_lock
4269  */
4270 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4271 {
4272 	struct ceph_dentry_info *di = ceph_dentry(dentry);
4273 
4274 	ceph_put_mds_session(di->lease_session);
4275 	di->lease_session = NULL;
4276 }
4277 
4278 static void handle_lease(struct ceph_mds_client *mdsc,
4279 			 struct ceph_mds_session *session,
4280 			 struct ceph_msg *msg)
4281 {
4282 	struct super_block *sb = mdsc->fsc->sb;
4283 	struct inode *inode;
4284 	struct dentry *parent, *dentry;
4285 	struct ceph_dentry_info *di;
4286 	int mds = session->s_mds;
4287 	struct ceph_mds_lease *h = msg->front.iov_base;
4288 	u32 seq;
4289 	struct ceph_vino vino;
4290 	struct qstr dname;
4291 	int release = 0;
4292 
4293 	dout("handle_lease from mds%d\n", mds);
4294 
4295 	/* decode */
4296 	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4297 		goto bad;
4298 	vino.ino = le64_to_cpu(h->ino);
4299 	vino.snap = CEPH_NOSNAP;
4300 	seq = le32_to_cpu(h->seq);
4301 	dname.len = get_unaligned_le32(h + 1);
4302 	if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4303 		goto bad;
4304 	dname.name = (void *)(h + 1) + sizeof(u32);
4305 
4306 	/* lookup inode */
4307 	inode = ceph_find_inode(sb, vino);
4308 	dout("handle_lease %s, ino %llx %p %.*s\n",
4309 	     ceph_lease_op_name(h->action), vino.ino, inode,
4310 	     dname.len, dname.name);
4311 
4312 	mutex_lock(&session->s_mutex);
4313 	inc_session_sequence(session);
4314 
4315 	if (!inode) {
4316 		dout("handle_lease no inode %llx\n", vino.ino);
4317 		goto release;
4318 	}
4319 
4320 	/* dentry */
4321 	parent = d_find_alias(inode);
4322 	if (!parent) {
4323 		dout("no parent dentry on inode %p\n", inode);
4324 		WARN_ON(1);
4325 		goto release;  /* hrm... */
4326 	}
4327 	dname.hash = full_name_hash(parent, dname.name, dname.len);
4328 	dentry = d_lookup(parent, &dname);
4329 	dput(parent);
4330 	if (!dentry)
4331 		goto release;
4332 
4333 	spin_lock(&dentry->d_lock);
4334 	di = ceph_dentry(dentry);
4335 	switch (h->action) {
4336 	case CEPH_MDS_LEASE_REVOKE:
4337 		if (di->lease_session == session) {
4338 			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4339 				h->seq = cpu_to_le32(di->lease_seq);
4340 			__ceph_mdsc_drop_dentry_lease(dentry);
4341 		}
4342 		release = 1;
4343 		break;
4344 
4345 	case CEPH_MDS_LEASE_RENEW:
4346 		if (di->lease_session == session &&
4347 		    di->lease_gen == session->s_cap_gen &&
4348 		    di->lease_renew_from &&
4349 		    di->lease_renew_after == 0) {
4350 			unsigned long duration =
4351 				msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4352 
4353 			di->lease_seq = seq;
4354 			di->time = di->lease_renew_from + duration;
4355 			di->lease_renew_after = di->lease_renew_from +
4356 				(duration >> 1);
4357 			di->lease_renew_from = 0;
4358 		}
4359 		break;
4360 	}
4361 	spin_unlock(&dentry->d_lock);
4362 	dput(dentry);
4363 
4364 	if (!release)
4365 		goto out;
4366 
4367 release:
4368 	/* let's just reuse the same message */
4369 	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4370 	ceph_msg_get(msg);
4371 	ceph_con_send(&session->s_con, msg);
4372 
4373 out:
4374 	mutex_unlock(&session->s_mutex);
4375 	/* avoid calling iput_final() in mds dispatch threads */
4376 	ceph_async_iput(inode);
4377 	return;
4378 
4379 bad:
4380 	pr_err("corrupt lease message\n");
4381 	ceph_msg_dump(msg);
4382 }
4383 
4384 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4385 			      struct dentry *dentry, char action,
4386 			      u32 seq)
4387 {
4388 	struct ceph_msg *msg;
4389 	struct ceph_mds_lease *lease;
4390 	struct inode *dir;
4391 	int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4392 
4393 	dout("lease_send_msg identry %p %s to mds%d\n",
4394 	     dentry, ceph_lease_op_name(action), session->s_mds);
4395 
4396 	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4397 	if (!msg)
4398 		return;
4399 	lease = msg->front.iov_base;
4400 	lease->action = action;
4401 	lease->seq = cpu_to_le32(seq);
4402 
4403 	spin_lock(&dentry->d_lock);
4404 	dir = d_inode(dentry->d_parent);
4405 	lease->ino = cpu_to_le64(ceph_ino(dir));
4406 	lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4407 
4408 	put_unaligned_le32(dentry->d_name.len, lease + 1);
4409 	memcpy((void *)(lease + 1) + 4,
4410 	       dentry->d_name.name, dentry->d_name.len);
4411 	spin_unlock(&dentry->d_lock);
4412 	/*
4413 	 * if this is a preemptive lease RELEASE, no need to
4414 	 * flush request stream, since the actual request will
4415 	 * soon follow.
4416 	 */
4417 	msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4418 
4419 	ceph_con_send(&session->s_con, msg);
4420 }
4421 
4422 /*
4423  * lock unlock sessions, to wait ongoing session activities
4424  */
4425 static void lock_unlock_sessions(struct ceph_mds_client *mdsc)
4426 {
4427 	int i;
4428 
4429 	mutex_lock(&mdsc->mutex);
4430 	for (i = 0; i < mdsc->max_sessions; i++) {
4431 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4432 		if (!s)
4433 			continue;
4434 		mutex_unlock(&mdsc->mutex);
4435 		mutex_lock(&s->s_mutex);
4436 		mutex_unlock(&s->s_mutex);
4437 		ceph_put_mds_session(s);
4438 		mutex_lock(&mdsc->mutex);
4439 	}
4440 	mutex_unlock(&mdsc->mutex);
4441 }
4442 
4443 static void maybe_recover_session(struct ceph_mds_client *mdsc)
4444 {
4445 	struct ceph_fs_client *fsc = mdsc->fsc;
4446 
4447 	if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4448 		return;
4449 
4450 	if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4451 		return;
4452 
4453 	if (!READ_ONCE(fsc->blocklisted))
4454 		return;
4455 
4456 	pr_info("auto reconnect after blocklisted\n");
4457 	ceph_force_reconnect(fsc->sb);
4458 }
4459 
4460 bool check_session_state(struct ceph_mds_session *s)
4461 {
4462 	switch (s->s_state) {
4463 	case CEPH_MDS_SESSION_OPEN:
4464 		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4465 			s->s_state = CEPH_MDS_SESSION_HUNG;
4466 			pr_info("mds%d hung\n", s->s_mds);
4467 		}
4468 		break;
4469 	case CEPH_MDS_SESSION_CLOSING:
4470 		/* Should never reach this when we're unmounting */
4471 		WARN_ON_ONCE(true);
4472 		fallthrough;
4473 	case CEPH_MDS_SESSION_NEW:
4474 	case CEPH_MDS_SESSION_RESTARTING:
4475 	case CEPH_MDS_SESSION_CLOSED:
4476 	case CEPH_MDS_SESSION_REJECTED:
4477 		return false;
4478 	}
4479 
4480 	return true;
4481 }
4482 
4483 /*
4484  * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
4485  * then we need to retransmit that request.
4486  */
4487 void inc_session_sequence(struct ceph_mds_session *s)
4488 {
4489 	lockdep_assert_held(&s->s_mutex);
4490 
4491 	s->s_seq++;
4492 
4493 	if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4494 		int ret;
4495 
4496 		dout("resending session close request for mds%d\n", s->s_mds);
4497 		ret = request_close_session(s);
4498 		if (ret < 0)
4499 			pr_err("unable to close session to mds%d: %d\n",
4500 			       s->s_mds, ret);
4501 	}
4502 }
4503 
4504 /*
4505  * delayed work -- periodically trim expired leases, renew caps with mds
4506  */
4507 static void schedule_delayed(struct ceph_mds_client *mdsc)
4508 {
4509 	int delay = 5;
4510 	unsigned hz = round_jiffies_relative(HZ * delay);
4511 	schedule_delayed_work(&mdsc->delayed_work, hz);
4512 }
4513 
4514 static void delayed_work(struct work_struct *work)
4515 {
4516 	int i;
4517 	struct ceph_mds_client *mdsc =
4518 		container_of(work, struct ceph_mds_client, delayed_work.work);
4519 	int renew_interval;
4520 	int renew_caps;
4521 
4522 	dout("mdsc delayed_work\n");
4523 
4524 	if (mdsc->stopping)
4525 		return;
4526 
4527 	mutex_lock(&mdsc->mutex);
4528 	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4529 	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4530 				   mdsc->last_renew_caps);
4531 	if (renew_caps)
4532 		mdsc->last_renew_caps = jiffies;
4533 
4534 	for (i = 0; i < mdsc->max_sessions; i++) {
4535 		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4536 		if (!s)
4537 			continue;
4538 
4539 		if (!check_session_state(s)) {
4540 			ceph_put_mds_session(s);
4541 			continue;
4542 		}
4543 		mutex_unlock(&mdsc->mutex);
4544 
4545 		mutex_lock(&s->s_mutex);
4546 		if (renew_caps)
4547 			send_renew_caps(mdsc, s);
4548 		else
4549 			ceph_con_keepalive(&s->s_con);
4550 		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4551 		    s->s_state == CEPH_MDS_SESSION_HUNG)
4552 			ceph_send_cap_releases(mdsc, s);
4553 		mutex_unlock(&s->s_mutex);
4554 		ceph_put_mds_session(s);
4555 
4556 		mutex_lock(&mdsc->mutex);
4557 	}
4558 	mutex_unlock(&mdsc->mutex);
4559 
4560 	ceph_check_delayed_caps(mdsc);
4561 
4562 	ceph_queue_cap_reclaim_work(mdsc);
4563 
4564 	ceph_trim_snapid_map(mdsc);
4565 
4566 	maybe_recover_session(mdsc);
4567 
4568 	schedule_delayed(mdsc);
4569 }
4570 
4571 int ceph_mdsc_init(struct ceph_fs_client *fsc)
4572 
4573 {
4574 	struct ceph_mds_client *mdsc;
4575 	int err;
4576 
4577 	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4578 	if (!mdsc)
4579 		return -ENOMEM;
4580 	mdsc->fsc = fsc;
4581 	mutex_init(&mdsc->mutex);
4582 	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4583 	if (!mdsc->mdsmap) {
4584 		err = -ENOMEM;
4585 		goto err_mdsc;
4586 	}
4587 
4588 	init_completion(&mdsc->safe_umount_waiters);
4589 	init_waitqueue_head(&mdsc->session_close_wq);
4590 	INIT_LIST_HEAD(&mdsc->waiting_for_map);
4591 	mdsc->sessions = NULL;
4592 	atomic_set(&mdsc->num_sessions, 0);
4593 	mdsc->max_sessions = 0;
4594 	mdsc->stopping = 0;
4595 	atomic64_set(&mdsc->quotarealms_count, 0);
4596 	mdsc->quotarealms_inodes = RB_ROOT;
4597 	mutex_init(&mdsc->quotarealms_inodes_mutex);
4598 	mdsc->last_snap_seq = 0;
4599 	init_rwsem(&mdsc->snap_rwsem);
4600 	mdsc->snap_realms = RB_ROOT;
4601 	INIT_LIST_HEAD(&mdsc->snap_empty);
4602 	mdsc->num_snap_realms = 0;
4603 	spin_lock_init(&mdsc->snap_empty_lock);
4604 	mdsc->last_tid = 0;
4605 	mdsc->oldest_tid = 0;
4606 	mdsc->request_tree = RB_ROOT;
4607 	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4608 	mdsc->last_renew_caps = jiffies;
4609 	INIT_LIST_HEAD(&mdsc->cap_delay_list);
4610 	INIT_LIST_HEAD(&mdsc->cap_wait_list);
4611 	spin_lock_init(&mdsc->cap_delay_lock);
4612 	INIT_LIST_HEAD(&mdsc->snap_flush_list);
4613 	spin_lock_init(&mdsc->snap_flush_lock);
4614 	mdsc->last_cap_flush_tid = 1;
4615 	INIT_LIST_HEAD(&mdsc->cap_flush_list);
4616 	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4617 	mdsc->num_cap_flushing = 0;
4618 	spin_lock_init(&mdsc->cap_dirty_lock);
4619 	init_waitqueue_head(&mdsc->cap_flushing_wq);
4620 	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4621 	atomic_set(&mdsc->cap_reclaim_pending, 0);
4622 	err = ceph_metric_init(&mdsc->metric);
4623 	if (err)
4624 		goto err_mdsmap;
4625 
4626 	spin_lock_init(&mdsc->dentry_list_lock);
4627 	INIT_LIST_HEAD(&mdsc->dentry_leases);
4628 	INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4629 
4630 	ceph_caps_init(mdsc);
4631 	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4632 
4633 	spin_lock_init(&mdsc->snapid_map_lock);
4634 	mdsc->snapid_map_tree = RB_ROOT;
4635 	INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4636 
4637 	init_rwsem(&mdsc->pool_perm_rwsem);
4638 	mdsc->pool_perm_tree = RB_ROOT;
4639 
4640 	strscpy(mdsc->nodename, utsname()->nodename,
4641 		sizeof(mdsc->nodename));
4642 
4643 	fsc->mdsc = mdsc;
4644 	return 0;
4645 
4646 err_mdsmap:
4647 	kfree(mdsc->mdsmap);
4648 err_mdsc:
4649 	kfree(mdsc);
4650 	return err;
4651 }
4652 
4653 /*
4654  * Wait for safe replies on open mds requests.  If we time out, drop
4655  * all requests from the tree to avoid dangling dentry refs.
4656  */
4657 static void wait_requests(struct ceph_mds_client *mdsc)
4658 {
4659 	struct ceph_options *opts = mdsc->fsc->client->options;
4660 	struct ceph_mds_request *req;
4661 
4662 	mutex_lock(&mdsc->mutex);
4663 	if (__get_oldest_req(mdsc)) {
4664 		mutex_unlock(&mdsc->mutex);
4665 
4666 		dout("wait_requests waiting for requests\n");
4667 		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4668 				    ceph_timeout_jiffies(opts->mount_timeout));
4669 
4670 		/* tear down remaining requests */
4671 		mutex_lock(&mdsc->mutex);
4672 		while ((req = __get_oldest_req(mdsc))) {
4673 			dout("wait_requests timed out on tid %llu\n",
4674 			     req->r_tid);
4675 			list_del_init(&req->r_wait);
4676 			__unregister_request(mdsc, req);
4677 		}
4678 	}
4679 	mutex_unlock(&mdsc->mutex);
4680 	dout("wait_requests done\n");
4681 }
4682 
4683 /*
4684  * called before mount is ro, and before dentries are torn down.
4685  * (hmm, does this still race with new lookups?)
4686  */
4687 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4688 {
4689 	dout("pre_umount\n");
4690 	mdsc->stopping = 1;
4691 
4692 	lock_unlock_sessions(mdsc);
4693 	ceph_flush_dirty_caps(mdsc);
4694 	wait_requests(mdsc);
4695 
4696 	/*
4697 	 * wait for reply handlers to drop their request refs and
4698 	 * their inode/dcache refs
4699 	 */
4700 	ceph_msgr_flush();
4701 
4702 	ceph_cleanup_quotarealms_inodes(mdsc);
4703 }
4704 
4705 /*
4706  * wait for all write mds requests to flush.
4707  */
4708 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4709 {
4710 	struct ceph_mds_request *req = NULL, *nextreq;
4711 	struct rb_node *n;
4712 
4713 	mutex_lock(&mdsc->mutex);
4714 	dout("wait_unsafe_requests want %lld\n", want_tid);
4715 restart:
4716 	req = __get_oldest_req(mdsc);
4717 	while (req && req->r_tid <= want_tid) {
4718 		/* find next request */
4719 		n = rb_next(&req->r_node);
4720 		if (n)
4721 			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4722 		else
4723 			nextreq = NULL;
4724 		if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4725 		    (req->r_op & CEPH_MDS_OP_WRITE)) {
4726 			/* write op */
4727 			ceph_mdsc_get_request(req);
4728 			if (nextreq)
4729 				ceph_mdsc_get_request(nextreq);
4730 			mutex_unlock(&mdsc->mutex);
4731 			dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
4732 			     req->r_tid, want_tid);
4733 			wait_for_completion(&req->r_safe_completion);
4734 			mutex_lock(&mdsc->mutex);
4735 			ceph_mdsc_put_request(req);
4736 			if (!nextreq)
4737 				break;  /* next dne before, so we're done! */
4738 			if (RB_EMPTY_NODE(&nextreq->r_node)) {
4739 				/* next request was removed from tree */
4740 				ceph_mdsc_put_request(nextreq);
4741 				goto restart;
4742 			}
4743 			ceph_mdsc_put_request(nextreq);  /* won't go away */
4744 		}
4745 		req = nextreq;
4746 	}
4747 	mutex_unlock(&mdsc->mutex);
4748 	dout("wait_unsafe_requests done\n");
4749 }
4750 
4751 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4752 {
4753 	u64 want_tid, want_flush;
4754 
4755 	if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
4756 		return;
4757 
4758 	dout("sync\n");
4759 	mutex_lock(&mdsc->mutex);
4760 	want_tid = mdsc->last_tid;
4761 	mutex_unlock(&mdsc->mutex);
4762 
4763 	ceph_flush_dirty_caps(mdsc);
4764 	spin_lock(&mdsc->cap_dirty_lock);
4765 	want_flush = mdsc->last_cap_flush_tid;
4766 	if (!list_empty(&mdsc->cap_flush_list)) {
4767 		struct ceph_cap_flush *cf =
4768 			list_last_entry(&mdsc->cap_flush_list,
4769 					struct ceph_cap_flush, g_list);
4770 		cf->wake = true;
4771 	}
4772 	spin_unlock(&mdsc->cap_dirty_lock);
4773 
4774 	dout("sync want tid %lld flush_seq %lld\n",
4775 	     want_tid, want_flush);
4776 
4777 	wait_unsafe_requests(mdsc, want_tid);
4778 	wait_caps_flush(mdsc, want_flush);
4779 }
4780 
4781 /*
4782  * true if all sessions are closed, or we force unmount
4783  */
4784 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4785 {
4786 	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4787 		return true;
4788 	return atomic_read(&mdsc->num_sessions) <= skipped;
4789 }
4790 
4791 /*
4792  * called after sb is ro.
4793  */
4794 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4795 {
4796 	struct ceph_options *opts = mdsc->fsc->client->options;
4797 	struct ceph_mds_session *session;
4798 	int i;
4799 	int skipped = 0;
4800 
4801 	dout("close_sessions\n");
4802 
4803 	/* close sessions */
4804 	mutex_lock(&mdsc->mutex);
4805 	for (i = 0; i < mdsc->max_sessions; i++) {
4806 		session = __ceph_lookup_mds_session(mdsc, i);
4807 		if (!session)
4808 			continue;
4809 		mutex_unlock(&mdsc->mutex);
4810 		mutex_lock(&session->s_mutex);
4811 		if (__close_session(mdsc, session) <= 0)
4812 			skipped++;
4813 		mutex_unlock(&session->s_mutex);
4814 		ceph_put_mds_session(session);
4815 		mutex_lock(&mdsc->mutex);
4816 	}
4817 	mutex_unlock(&mdsc->mutex);
4818 
4819 	dout("waiting for sessions to close\n");
4820 	wait_event_timeout(mdsc->session_close_wq,
4821 			   done_closing_sessions(mdsc, skipped),
4822 			   ceph_timeout_jiffies(opts->mount_timeout));
4823 
4824 	/* tear down remaining sessions */
4825 	mutex_lock(&mdsc->mutex);
4826 	for (i = 0; i < mdsc->max_sessions; i++) {
4827 		if (mdsc->sessions[i]) {
4828 			session = ceph_get_mds_session(mdsc->sessions[i]);
4829 			__unregister_session(mdsc, session);
4830 			mutex_unlock(&mdsc->mutex);
4831 			mutex_lock(&session->s_mutex);
4832 			remove_session_caps(session);
4833 			mutex_unlock(&session->s_mutex);
4834 			ceph_put_mds_session(session);
4835 			mutex_lock(&mdsc->mutex);
4836 		}
4837 	}
4838 	WARN_ON(!list_empty(&mdsc->cap_delay_list));
4839 	mutex_unlock(&mdsc->mutex);
4840 
4841 	ceph_cleanup_snapid_map(mdsc);
4842 	ceph_cleanup_empty_realms(mdsc);
4843 
4844 	cancel_work_sync(&mdsc->cap_reclaim_work);
4845 	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4846 
4847 	dout("stopped\n");
4848 }
4849 
4850 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4851 {
4852 	struct ceph_mds_session *session;
4853 	int mds;
4854 
4855 	dout("force umount\n");
4856 
4857 	mutex_lock(&mdsc->mutex);
4858 	for (mds = 0; mds < mdsc->max_sessions; mds++) {
4859 		session = __ceph_lookup_mds_session(mdsc, mds);
4860 		if (!session)
4861 			continue;
4862 
4863 		if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4864 			__unregister_session(mdsc, session);
4865 		__wake_requests(mdsc, &session->s_waiting);
4866 		mutex_unlock(&mdsc->mutex);
4867 
4868 		mutex_lock(&session->s_mutex);
4869 		__close_session(mdsc, session);
4870 		if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4871 			cleanup_session_requests(mdsc, session);
4872 			remove_session_caps(session);
4873 		}
4874 		mutex_unlock(&session->s_mutex);
4875 		ceph_put_mds_session(session);
4876 
4877 		mutex_lock(&mdsc->mutex);
4878 		kick_requests(mdsc, mds);
4879 	}
4880 	__wake_requests(mdsc, &mdsc->waiting_for_map);
4881 	mutex_unlock(&mdsc->mutex);
4882 }
4883 
4884 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4885 {
4886 	dout("stop\n");
4887 	/*
4888 	 * Make sure the delayed work stopped before releasing
4889 	 * the resources.
4890 	 *
4891 	 * Because the cancel_delayed_work_sync() will only
4892 	 * guarantee that the work finishes executing. But the
4893 	 * delayed work will re-arm itself again after that.
4894 	 */
4895 	flush_delayed_work(&mdsc->delayed_work);
4896 
4897 	if (mdsc->mdsmap)
4898 		ceph_mdsmap_destroy(mdsc->mdsmap);
4899 	kfree(mdsc->sessions);
4900 	ceph_caps_finalize(mdsc);
4901 	ceph_pool_perm_destroy(mdsc);
4902 }
4903 
4904 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4905 {
4906 	struct ceph_mds_client *mdsc = fsc->mdsc;
4907 	dout("mdsc_destroy %p\n", mdsc);
4908 
4909 	if (!mdsc)
4910 		return;
4911 
4912 	/* flush out any connection work with references to us */
4913 	ceph_msgr_flush();
4914 
4915 	ceph_mdsc_stop(mdsc);
4916 
4917 	ceph_metric_destroy(&mdsc->metric);
4918 
4919 	flush_delayed_work(&mdsc->metric.delayed_work);
4920 	fsc->mdsc = NULL;
4921 	kfree(mdsc);
4922 	dout("mdsc_destroy %p done\n", mdsc);
4923 }
4924 
4925 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4926 {
4927 	struct ceph_fs_client *fsc = mdsc->fsc;
4928 	const char *mds_namespace = fsc->mount_options->mds_namespace;
4929 	void *p = msg->front.iov_base;
4930 	void *end = p + msg->front.iov_len;
4931 	u32 epoch;
4932 	u32 num_fs;
4933 	u32 mount_fscid = (u32)-1;
4934 	int err = -EINVAL;
4935 
4936 	ceph_decode_need(&p, end, sizeof(u32), bad);
4937 	epoch = ceph_decode_32(&p);
4938 
4939 	dout("handle_fsmap epoch %u\n", epoch);
4940 
4941 	/* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
4942 	ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
4943 
4944 	ceph_decode_32_safe(&p, end, num_fs, bad);
4945 	while (num_fs-- > 0) {
4946 		void *info_p, *info_end;
4947 		u32 info_len;
4948 		u32 fscid, namelen;
4949 
4950 		ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4951 		p += 2;		// info_v, info_cv
4952 		info_len = ceph_decode_32(&p);
4953 		ceph_decode_need(&p, end, info_len, bad);
4954 		info_p = p;
4955 		info_end = p + info_len;
4956 		p = info_end;
4957 
4958 		ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4959 		fscid = ceph_decode_32(&info_p);
4960 		namelen = ceph_decode_32(&info_p);
4961 		ceph_decode_need(&info_p, info_end, namelen, bad);
4962 
4963 		if (mds_namespace &&
4964 		    strlen(mds_namespace) == namelen &&
4965 		    !strncmp(mds_namespace, (char *)info_p, namelen)) {
4966 			mount_fscid = fscid;
4967 			break;
4968 		}
4969 	}
4970 
4971 	ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4972 	if (mount_fscid != (u32)-1) {
4973 		fsc->client->monc.fs_cluster_id = mount_fscid;
4974 		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4975 				   0, true);
4976 		ceph_monc_renew_subs(&fsc->client->monc);
4977 	} else {
4978 		err = -ENOENT;
4979 		goto err_out;
4980 	}
4981 	return;
4982 
4983 bad:
4984 	pr_err("error decoding fsmap\n");
4985 err_out:
4986 	mutex_lock(&mdsc->mutex);
4987 	mdsc->mdsmap_err = err;
4988 	__wake_requests(mdsc, &mdsc->waiting_for_map);
4989 	mutex_unlock(&mdsc->mutex);
4990 }
4991 
4992 /*
4993  * handle mds map update.
4994  */
4995 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4996 {
4997 	u32 epoch;
4998 	u32 maplen;
4999 	void *p = msg->front.iov_base;
5000 	void *end = p + msg->front.iov_len;
5001 	struct ceph_mdsmap *newmap, *oldmap;
5002 	struct ceph_fsid fsid;
5003 	int err = -EINVAL;
5004 
5005 	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5006 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
5007 	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
5008 		return;
5009 	epoch = ceph_decode_32(&p);
5010 	maplen = ceph_decode_32(&p);
5011 	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5012 
5013 	/* do we need it? */
5014 	mutex_lock(&mdsc->mutex);
5015 	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5016 		dout("handle_map epoch %u <= our %u\n",
5017 		     epoch, mdsc->mdsmap->m_epoch);
5018 		mutex_unlock(&mdsc->mutex);
5019 		return;
5020 	}
5021 
5022 	newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client));
5023 	if (IS_ERR(newmap)) {
5024 		err = PTR_ERR(newmap);
5025 		goto bad_unlock;
5026 	}
5027 
5028 	/* swap into place */
5029 	if (mdsc->mdsmap) {
5030 		oldmap = mdsc->mdsmap;
5031 		mdsc->mdsmap = newmap;
5032 		check_new_map(mdsc, newmap, oldmap);
5033 		ceph_mdsmap_destroy(oldmap);
5034 	} else {
5035 		mdsc->mdsmap = newmap;  /* first mds map */
5036 	}
5037 	mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5038 					MAX_LFS_FILESIZE);
5039 
5040 	__wake_requests(mdsc, &mdsc->waiting_for_map);
5041 	ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5042 			  mdsc->mdsmap->m_epoch);
5043 
5044 	mutex_unlock(&mdsc->mutex);
5045 	schedule_delayed(mdsc);
5046 	return;
5047 
5048 bad_unlock:
5049 	mutex_unlock(&mdsc->mutex);
5050 bad:
5051 	pr_err("error decoding mdsmap %d\n", err);
5052 	return;
5053 }
5054 
5055 static struct ceph_connection *mds_get_con(struct ceph_connection *con)
5056 {
5057 	struct ceph_mds_session *s = con->private;
5058 
5059 	if (ceph_get_mds_session(s))
5060 		return con;
5061 	return NULL;
5062 }
5063 
5064 static void mds_put_con(struct ceph_connection *con)
5065 {
5066 	struct ceph_mds_session *s = con->private;
5067 
5068 	ceph_put_mds_session(s);
5069 }
5070 
5071 /*
5072  * if the client is unresponsive for long enough, the mds will kill
5073  * the session entirely.
5074  */
5075 static void mds_peer_reset(struct ceph_connection *con)
5076 {
5077 	struct ceph_mds_session *s = con->private;
5078 	struct ceph_mds_client *mdsc = s->s_mdsc;
5079 
5080 	pr_warn("mds%d closed our session\n", s->s_mds);
5081 	send_mds_reconnect(mdsc, s);
5082 }
5083 
5084 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5085 {
5086 	struct ceph_mds_session *s = con->private;
5087 	struct ceph_mds_client *mdsc = s->s_mdsc;
5088 	int type = le16_to_cpu(msg->hdr.type);
5089 
5090 	mutex_lock(&mdsc->mutex);
5091 	if (__verify_registered_session(mdsc, s) < 0) {
5092 		mutex_unlock(&mdsc->mutex);
5093 		goto out;
5094 	}
5095 	mutex_unlock(&mdsc->mutex);
5096 
5097 	switch (type) {
5098 	case CEPH_MSG_MDS_MAP:
5099 		ceph_mdsc_handle_mdsmap(mdsc, msg);
5100 		break;
5101 	case CEPH_MSG_FS_MAP_USER:
5102 		ceph_mdsc_handle_fsmap(mdsc, msg);
5103 		break;
5104 	case CEPH_MSG_CLIENT_SESSION:
5105 		handle_session(s, msg);
5106 		break;
5107 	case CEPH_MSG_CLIENT_REPLY:
5108 		handle_reply(s, msg);
5109 		break;
5110 	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5111 		handle_forward(mdsc, s, msg);
5112 		break;
5113 	case CEPH_MSG_CLIENT_CAPS:
5114 		ceph_handle_caps(s, msg);
5115 		break;
5116 	case CEPH_MSG_CLIENT_SNAP:
5117 		ceph_handle_snap(mdsc, s, msg);
5118 		break;
5119 	case CEPH_MSG_CLIENT_LEASE:
5120 		handle_lease(mdsc, s, msg);
5121 		break;
5122 	case CEPH_MSG_CLIENT_QUOTA:
5123 		ceph_handle_quota(mdsc, s, msg);
5124 		break;
5125 
5126 	default:
5127 		pr_err("received unknown message type %d %s\n", type,
5128 		       ceph_msg_type_name(type));
5129 	}
5130 out:
5131 	ceph_msg_put(msg);
5132 }
5133 
5134 /*
5135  * authentication
5136  */
5137 
5138 /*
5139  * Note: returned pointer is the address of a structure that's
5140  * managed separately.  Caller must *not* attempt to free it.
5141  */
5142 static struct ceph_auth_handshake *
5143 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5144 {
5145 	struct ceph_mds_session *s = con->private;
5146 	struct ceph_mds_client *mdsc = s->s_mdsc;
5147 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5148 	struct ceph_auth_handshake *auth = &s->s_auth;
5149 	int ret;
5150 
5151 	ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5152 					 force_new, proto, NULL, NULL);
5153 	if (ret)
5154 		return ERR_PTR(ret);
5155 
5156 	return auth;
5157 }
5158 
5159 static int mds_add_authorizer_challenge(struct ceph_connection *con,
5160 				    void *challenge_buf, int challenge_buf_len)
5161 {
5162 	struct ceph_mds_session *s = con->private;
5163 	struct ceph_mds_client *mdsc = s->s_mdsc;
5164 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5165 
5166 	return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5167 					    challenge_buf, challenge_buf_len);
5168 }
5169 
5170 static int mds_verify_authorizer_reply(struct ceph_connection *con)
5171 {
5172 	struct ceph_mds_session *s = con->private;
5173 	struct ceph_mds_client *mdsc = s->s_mdsc;
5174 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5175 	struct ceph_auth_handshake *auth = &s->s_auth;
5176 
5177 	return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5178 		auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5179 		NULL, NULL, NULL, NULL);
5180 }
5181 
5182 static int mds_invalidate_authorizer(struct ceph_connection *con)
5183 {
5184 	struct ceph_mds_session *s = con->private;
5185 	struct ceph_mds_client *mdsc = s->s_mdsc;
5186 	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5187 
5188 	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5189 
5190 	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5191 }
5192 
5193 static int mds_get_auth_request(struct ceph_connection *con,
5194 				void *buf, int *buf_len,
5195 				void **authorizer, int *authorizer_len)
5196 {
5197 	struct ceph_mds_session *s = con->private;
5198 	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5199 	struct ceph_auth_handshake *auth = &s->s_auth;
5200 	int ret;
5201 
5202 	ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5203 				       buf, buf_len);
5204 	if (ret)
5205 		return ret;
5206 
5207 	*authorizer = auth->authorizer_buf;
5208 	*authorizer_len = auth->authorizer_buf_len;
5209 	return 0;
5210 }
5211 
5212 static int mds_handle_auth_reply_more(struct ceph_connection *con,
5213 				      void *reply, int reply_len,
5214 				      void *buf, int *buf_len,
5215 				      void **authorizer, int *authorizer_len)
5216 {
5217 	struct ceph_mds_session *s = con->private;
5218 	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5219 	struct ceph_auth_handshake *auth = &s->s_auth;
5220 	int ret;
5221 
5222 	ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5223 					      buf, buf_len);
5224 	if (ret)
5225 		return ret;
5226 
5227 	*authorizer = auth->authorizer_buf;
5228 	*authorizer_len = auth->authorizer_buf_len;
5229 	return 0;
5230 }
5231 
5232 static int mds_handle_auth_done(struct ceph_connection *con,
5233 				u64 global_id, void *reply, int reply_len,
5234 				u8 *session_key, int *session_key_len,
5235 				u8 *con_secret, int *con_secret_len)
5236 {
5237 	struct ceph_mds_session *s = con->private;
5238 	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5239 	struct ceph_auth_handshake *auth = &s->s_auth;
5240 
5241 	return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5242 					       session_key, session_key_len,
5243 					       con_secret, con_secret_len);
5244 }
5245 
5246 static int mds_handle_auth_bad_method(struct ceph_connection *con,
5247 				      int used_proto, int result,
5248 				      const int *allowed_protos, int proto_cnt,
5249 				      const int *allowed_modes, int mode_cnt)
5250 {
5251 	struct ceph_mds_session *s = con->private;
5252 	struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
5253 	int ret;
5254 
5255 	if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
5256 					    used_proto, result,
5257 					    allowed_protos, proto_cnt,
5258 					    allowed_modes, mode_cnt)) {
5259 		ret = ceph_monc_validate_auth(monc);
5260 		if (ret)
5261 			return ret;
5262 	}
5263 
5264 	return -EACCES;
5265 }
5266 
5267 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5268 				struct ceph_msg_header *hdr, int *skip)
5269 {
5270 	struct ceph_msg *msg;
5271 	int type = (int) le16_to_cpu(hdr->type);
5272 	int front_len = (int) le32_to_cpu(hdr->front_len);
5273 
5274 	if (con->in_msg)
5275 		return con->in_msg;
5276 
5277 	*skip = 0;
5278 	msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5279 	if (!msg) {
5280 		pr_err("unable to allocate msg type %d len %d\n",
5281 		       type, front_len);
5282 		return NULL;
5283 	}
5284 
5285 	return msg;
5286 }
5287 
5288 static int mds_sign_message(struct ceph_msg *msg)
5289 {
5290        struct ceph_mds_session *s = msg->con->private;
5291        struct ceph_auth_handshake *auth = &s->s_auth;
5292 
5293        return ceph_auth_sign_message(auth, msg);
5294 }
5295 
5296 static int mds_check_message_signature(struct ceph_msg *msg)
5297 {
5298        struct ceph_mds_session *s = msg->con->private;
5299        struct ceph_auth_handshake *auth = &s->s_auth;
5300 
5301        return ceph_auth_check_message_signature(auth, msg);
5302 }
5303 
5304 static const struct ceph_connection_operations mds_con_ops = {
5305 	.get = mds_get_con,
5306 	.put = mds_put_con,
5307 	.alloc_msg = mds_alloc_msg,
5308 	.dispatch = mds_dispatch,
5309 	.peer_reset = mds_peer_reset,
5310 	.get_authorizer = mds_get_authorizer,
5311 	.add_authorizer_challenge = mds_add_authorizer_challenge,
5312 	.verify_authorizer_reply = mds_verify_authorizer_reply,
5313 	.invalidate_authorizer = mds_invalidate_authorizer,
5314 	.sign_message = mds_sign_message,
5315 	.check_message_signature = mds_check_message_signature,
5316 	.get_auth_request = mds_get_auth_request,
5317 	.handle_auth_reply_more = mds_handle_auth_reply_more,
5318 	.handle_auth_done = mds_handle_auth_done,
5319 	.handle_auth_bad_method = mds_handle_auth_bad_method,
5320 };
5321 
5322 /* eof */
5323