xref: /openbmc/linux/fs/ceph/mds_client.c (revision 9144f784f852f9a125cabe9927b986d909bfa439)
1  // SPDX-License-Identifier: GPL-2.0
2  #include <linux/ceph/ceph_debug.h>
3  
4  #include <linux/fs.h>
5  #include <linux/wait.h>
6  #include <linux/slab.h>
7  #include <linux/gfp.h>
8  #include <linux/sched.h>
9  #include <linux/debugfs.h>
10  #include <linux/seq_file.h>
11  #include <linux/ratelimit.h>
12  #include <linux/bits.h>
13  #include <linux/ktime.h>
14  #include <linux/bitmap.h>
15  
16  #include "super.h"
17  #include "mds_client.h"
18  #include "crypto.h"
19  
20  #include <linux/ceph/ceph_features.h>
21  #include <linux/ceph/messenger.h>
22  #include <linux/ceph/decode.h>
23  #include <linux/ceph/pagelist.h>
24  #include <linux/ceph/auth.h>
25  #include <linux/ceph/debugfs.h>
26  
27  #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
28  
29  /*
30   * A cluster of MDS (metadata server) daemons is responsible for
31   * managing the file system namespace (the directory hierarchy and
32   * inodes) and for coordinating shared access to storage.  Metadata is
33   * partitioning hierarchically across a number of servers, and that
34   * partition varies over time as the cluster adjusts the distribution
35   * in order to balance load.
36   *
37   * The MDS client is primarily responsible to managing synchronous
38   * metadata requests for operations like open, unlink, and so forth.
39   * If there is a MDS failure, we find out about it when we (possibly
40   * request and) receive a new MDS map, and can resubmit affected
41   * requests.
42   *
43   * For the most part, though, we take advantage of a lossless
44   * communications channel to the MDS, and do not need to worry about
45   * timing out or resubmitting requests.
46   *
47   * We maintain a stateful "session" with each MDS we interact with.
48   * Within each session, we sent periodic heartbeat messages to ensure
49   * any capabilities or leases we have been issues remain valid.  If
50   * the session times out and goes stale, our leases and capabilities
51   * are no longer valid.
52   */
53  
54  struct ceph_reconnect_state {
55  	struct ceph_mds_session *session;
56  	int nr_caps, nr_realms;
57  	struct ceph_pagelist *pagelist;
58  	unsigned msg_version;
59  	bool allow_multi;
60  };
61  
62  static void __wake_requests(struct ceph_mds_client *mdsc,
63  			    struct list_head *head);
64  static void ceph_cap_release_work(struct work_struct *work);
65  static void ceph_cap_reclaim_work(struct work_struct *work);
66  
67  static const struct ceph_connection_operations mds_con_ops;
68  
69  
70  /*
71   * mds reply parsing
72   */
73  
parse_reply_info_quota(void ** p,void * end,struct ceph_mds_reply_info_in * info)74  static int parse_reply_info_quota(void **p, void *end,
75  				  struct ceph_mds_reply_info_in *info)
76  {
77  	u8 struct_v, struct_compat;
78  	u32 struct_len;
79  
80  	ceph_decode_8_safe(p, end, struct_v, bad);
81  	ceph_decode_8_safe(p, end, struct_compat, bad);
82  	/* struct_v is expected to be >= 1. we only
83  	 * understand encoding with struct_compat == 1. */
84  	if (!struct_v || struct_compat != 1)
85  		goto bad;
86  	ceph_decode_32_safe(p, end, struct_len, bad);
87  	ceph_decode_need(p, end, struct_len, bad);
88  	end = *p + struct_len;
89  	ceph_decode_64_safe(p, end, info->max_bytes, bad);
90  	ceph_decode_64_safe(p, end, info->max_files, bad);
91  	*p = end;
92  	return 0;
93  bad:
94  	return -EIO;
95  }
96  
97  /*
98   * parse individual inode info
99   */
parse_reply_info_in(void ** p,void * end,struct ceph_mds_reply_info_in * info,u64 features)100  static int parse_reply_info_in(void **p, void *end,
101  			       struct ceph_mds_reply_info_in *info,
102  			       u64 features)
103  {
104  	int err = 0;
105  	u8 struct_v = 0;
106  
107  	if (features == (u64)-1) {
108  		u32 struct_len;
109  		u8 struct_compat;
110  		ceph_decode_8_safe(p, end, struct_v, bad);
111  		ceph_decode_8_safe(p, end, struct_compat, bad);
112  		/* struct_v is expected to be >= 1. we only understand
113  		 * encoding with struct_compat == 1. */
114  		if (!struct_v || struct_compat != 1)
115  			goto bad;
116  		ceph_decode_32_safe(p, end, struct_len, bad);
117  		ceph_decode_need(p, end, struct_len, bad);
118  		end = *p + struct_len;
119  	}
120  
121  	ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
122  	info->in = *p;
123  	*p += sizeof(struct ceph_mds_reply_inode) +
124  		sizeof(*info->in->fragtree.splits) *
125  		le32_to_cpu(info->in->fragtree.nsplits);
126  
127  	ceph_decode_32_safe(p, end, info->symlink_len, bad);
128  	ceph_decode_need(p, end, info->symlink_len, bad);
129  	info->symlink = *p;
130  	*p += info->symlink_len;
131  
132  	ceph_decode_copy_safe(p, end, &info->dir_layout,
133  			      sizeof(info->dir_layout), bad);
134  	ceph_decode_32_safe(p, end, info->xattr_len, bad);
135  	ceph_decode_need(p, end, info->xattr_len, bad);
136  	info->xattr_data = *p;
137  	*p += info->xattr_len;
138  
139  	if (features == (u64)-1) {
140  		/* inline data */
141  		ceph_decode_64_safe(p, end, info->inline_version, bad);
142  		ceph_decode_32_safe(p, end, info->inline_len, bad);
143  		ceph_decode_need(p, end, info->inline_len, bad);
144  		info->inline_data = *p;
145  		*p += info->inline_len;
146  		/* quota */
147  		err = parse_reply_info_quota(p, end, info);
148  		if (err < 0)
149  			goto out_bad;
150  		/* pool namespace */
151  		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
152  		if (info->pool_ns_len > 0) {
153  			ceph_decode_need(p, end, info->pool_ns_len, bad);
154  			info->pool_ns_data = *p;
155  			*p += info->pool_ns_len;
156  		}
157  
158  		/* btime */
159  		ceph_decode_need(p, end, sizeof(info->btime), bad);
160  		ceph_decode_copy(p, &info->btime, sizeof(info->btime));
161  
162  		/* change attribute */
163  		ceph_decode_64_safe(p, end, info->change_attr, bad);
164  
165  		/* dir pin */
166  		if (struct_v >= 2) {
167  			ceph_decode_32_safe(p, end, info->dir_pin, bad);
168  		} else {
169  			info->dir_pin = -ENODATA;
170  		}
171  
172  		/* snapshot birth time, remains zero for v<=2 */
173  		if (struct_v >= 3) {
174  			ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
175  			ceph_decode_copy(p, &info->snap_btime,
176  					 sizeof(info->snap_btime));
177  		} else {
178  			memset(&info->snap_btime, 0, sizeof(info->snap_btime));
179  		}
180  
181  		/* snapshot count, remains zero for v<=3 */
182  		if (struct_v >= 4) {
183  			ceph_decode_64_safe(p, end, info->rsnaps, bad);
184  		} else {
185  			info->rsnaps = 0;
186  		}
187  
188  		if (struct_v >= 5) {
189  			u32 alen;
190  
191  			ceph_decode_32_safe(p, end, alen, bad);
192  
193  			while (alen--) {
194  				u32 len;
195  
196  				/* key */
197  				ceph_decode_32_safe(p, end, len, bad);
198  				ceph_decode_skip_n(p, end, len, bad);
199  				/* value */
200  				ceph_decode_32_safe(p, end, len, bad);
201  				ceph_decode_skip_n(p, end, len, bad);
202  			}
203  		}
204  
205  		/* fscrypt flag -- ignore */
206  		if (struct_v >= 6)
207  			ceph_decode_skip_8(p, end, bad);
208  
209  		info->fscrypt_auth = NULL;
210  		info->fscrypt_auth_len = 0;
211  		info->fscrypt_file = NULL;
212  		info->fscrypt_file_len = 0;
213  		if (struct_v >= 7) {
214  			ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad);
215  			if (info->fscrypt_auth_len) {
216  				info->fscrypt_auth = kmalloc(info->fscrypt_auth_len,
217  							     GFP_KERNEL);
218  				if (!info->fscrypt_auth)
219  					return -ENOMEM;
220  				ceph_decode_copy_safe(p, end, info->fscrypt_auth,
221  						      info->fscrypt_auth_len, bad);
222  			}
223  			ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad);
224  			if (info->fscrypt_file_len) {
225  				info->fscrypt_file = kmalloc(info->fscrypt_file_len,
226  							     GFP_KERNEL);
227  				if (!info->fscrypt_file)
228  					return -ENOMEM;
229  				ceph_decode_copy_safe(p, end, info->fscrypt_file,
230  						      info->fscrypt_file_len, bad);
231  			}
232  		}
233  		*p = end;
234  	} else {
235  		/* legacy (unversioned) struct */
236  		if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
237  			ceph_decode_64_safe(p, end, info->inline_version, bad);
238  			ceph_decode_32_safe(p, end, info->inline_len, bad);
239  			ceph_decode_need(p, end, info->inline_len, bad);
240  			info->inline_data = *p;
241  			*p += info->inline_len;
242  		} else
243  			info->inline_version = CEPH_INLINE_NONE;
244  
245  		if (features & CEPH_FEATURE_MDS_QUOTA) {
246  			err = parse_reply_info_quota(p, end, info);
247  			if (err < 0)
248  				goto out_bad;
249  		} else {
250  			info->max_bytes = 0;
251  			info->max_files = 0;
252  		}
253  
254  		info->pool_ns_len = 0;
255  		info->pool_ns_data = NULL;
256  		if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
257  			ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
258  			if (info->pool_ns_len > 0) {
259  				ceph_decode_need(p, end, info->pool_ns_len, bad);
260  				info->pool_ns_data = *p;
261  				*p += info->pool_ns_len;
262  			}
263  		}
264  
265  		if (features & CEPH_FEATURE_FS_BTIME) {
266  			ceph_decode_need(p, end, sizeof(info->btime), bad);
267  			ceph_decode_copy(p, &info->btime, sizeof(info->btime));
268  			ceph_decode_64_safe(p, end, info->change_attr, bad);
269  		}
270  
271  		info->dir_pin = -ENODATA;
272  		/* info->snap_btime and info->rsnaps remain zero */
273  	}
274  	return 0;
275  bad:
276  	err = -EIO;
277  out_bad:
278  	return err;
279  }
280  
parse_reply_info_dir(void ** p,void * end,struct ceph_mds_reply_dirfrag ** dirfrag,u64 features)281  static int parse_reply_info_dir(void **p, void *end,
282  				struct ceph_mds_reply_dirfrag **dirfrag,
283  				u64 features)
284  {
285  	if (features == (u64)-1) {
286  		u8 struct_v, struct_compat;
287  		u32 struct_len;
288  		ceph_decode_8_safe(p, end, struct_v, bad);
289  		ceph_decode_8_safe(p, end, struct_compat, bad);
290  		/* struct_v is expected to be >= 1. we only understand
291  		 * encoding whose struct_compat == 1. */
292  		if (!struct_v || struct_compat != 1)
293  			goto bad;
294  		ceph_decode_32_safe(p, end, struct_len, bad);
295  		ceph_decode_need(p, end, struct_len, bad);
296  		end = *p + struct_len;
297  	}
298  
299  	ceph_decode_need(p, end, sizeof(**dirfrag), bad);
300  	*dirfrag = *p;
301  	*p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
302  	if (unlikely(*p > end))
303  		goto bad;
304  	if (features == (u64)-1)
305  		*p = end;
306  	return 0;
307  bad:
308  	return -EIO;
309  }
310  
parse_reply_info_lease(void ** p,void * end,struct ceph_mds_reply_lease ** lease,u64 features,u32 * altname_len,u8 ** altname)311  static int parse_reply_info_lease(void **p, void *end,
312  				  struct ceph_mds_reply_lease **lease,
313  				  u64 features, u32 *altname_len, u8 **altname)
314  {
315  	u8 struct_v;
316  	u32 struct_len;
317  	void *lend;
318  
319  	if (features == (u64)-1) {
320  		u8 struct_compat;
321  
322  		ceph_decode_8_safe(p, end, struct_v, bad);
323  		ceph_decode_8_safe(p, end, struct_compat, bad);
324  
325  		/* struct_v is expected to be >= 1. we only understand
326  		 * encoding whose struct_compat == 1. */
327  		if (!struct_v || struct_compat != 1)
328  			goto bad;
329  
330  		ceph_decode_32_safe(p, end, struct_len, bad);
331  	} else {
332  		struct_len = sizeof(**lease);
333  		*altname_len = 0;
334  		*altname = NULL;
335  	}
336  
337  	lend = *p + struct_len;
338  	ceph_decode_need(p, end, struct_len, bad);
339  	*lease = *p;
340  	*p += sizeof(**lease);
341  
342  	if (features == (u64)-1) {
343  		if (struct_v >= 2) {
344  			ceph_decode_32_safe(p, end, *altname_len, bad);
345  			ceph_decode_need(p, end, *altname_len, bad);
346  			*altname = *p;
347  			*p += *altname_len;
348  		} else {
349  			*altname = NULL;
350  			*altname_len = 0;
351  		}
352  	}
353  	*p = lend;
354  	return 0;
355  bad:
356  	return -EIO;
357  }
358  
359  /*
360   * parse a normal reply, which may contain a (dir+)dentry and/or a
361   * target inode.
362   */
parse_reply_info_trace(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)363  static int parse_reply_info_trace(void **p, void *end,
364  				  struct ceph_mds_reply_info_parsed *info,
365  				  u64 features)
366  {
367  	int err;
368  
369  	if (info->head->is_dentry) {
370  		err = parse_reply_info_in(p, end, &info->diri, features);
371  		if (err < 0)
372  			goto out_bad;
373  
374  		err = parse_reply_info_dir(p, end, &info->dirfrag, features);
375  		if (err < 0)
376  			goto out_bad;
377  
378  		ceph_decode_32_safe(p, end, info->dname_len, bad);
379  		ceph_decode_need(p, end, info->dname_len, bad);
380  		info->dname = *p;
381  		*p += info->dname_len;
382  
383  		err = parse_reply_info_lease(p, end, &info->dlease, features,
384  					     &info->altname_len, &info->altname);
385  		if (err < 0)
386  			goto out_bad;
387  	}
388  
389  	if (info->head->is_target) {
390  		err = parse_reply_info_in(p, end, &info->targeti, features);
391  		if (err < 0)
392  			goto out_bad;
393  	}
394  
395  	if (unlikely(*p != end))
396  		goto bad;
397  	return 0;
398  
399  bad:
400  	err = -EIO;
401  out_bad:
402  	pr_err("problem parsing mds trace %d\n", err);
403  	return err;
404  }
405  
406  /*
407   * parse readdir results
408   */
parse_reply_info_readdir(void ** p,void * end,struct ceph_mds_request * req,u64 features)409  static int parse_reply_info_readdir(void **p, void *end,
410  				    struct ceph_mds_request *req,
411  				    u64 features)
412  {
413  	struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
414  	u32 num, i = 0;
415  	int err;
416  
417  	err = parse_reply_info_dir(p, end, &info->dir_dir, features);
418  	if (err < 0)
419  		goto out_bad;
420  
421  	ceph_decode_need(p, end, sizeof(num) + 2, bad);
422  	num = ceph_decode_32(p);
423  	{
424  		u16 flags = ceph_decode_16(p);
425  		info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
426  		info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
427  		info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
428  		info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
429  	}
430  	if (num == 0)
431  		goto done;
432  
433  	BUG_ON(!info->dir_entries);
434  	if ((unsigned long)(info->dir_entries + num) >
435  	    (unsigned long)info->dir_entries + info->dir_buf_size) {
436  		pr_err("dir contents are larger than expected\n");
437  		WARN_ON(1);
438  		goto bad;
439  	}
440  
441  	info->dir_nr = num;
442  	while (num) {
443  		struct inode *inode = d_inode(req->r_dentry);
444  		struct ceph_inode_info *ci = ceph_inode(inode);
445  		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
446  		struct fscrypt_str tname = FSTR_INIT(NULL, 0);
447  		struct fscrypt_str oname = FSTR_INIT(NULL, 0);
448  		struct ceph_fname fname;
449  		u32 altname_len, _name_len;
450  		u8 *altname, *_name;
451  
452  		/* dentry */
453  		ceph_decode_32_safe(p, end, _name_len, bad);
454  		ceph_decode_need(p, end, _name_len, bad);
455  		_name = *p;
456  		*p += _name_len;
457  		dout("parsed dir dname '%.*s'\n", _name_len, _name);
458  
459  		if (info->hash_order)
460  			rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
461  						      _name, _name_len);
462  
463  		/* dentry lease */
464  		err = parse_reply_info_lease(p, end, &rde->lease, features,
465  					     &altname_len, &altname);
466  		if (err)
467  			goto out_bad;
468  
469  		/*
470  		 * Try to dencrypt the dentry names and update them
471  		 * in the ceph_mds_reply_dir_entry struct.
472  		 */
473  		fname.dir = inode;
474  		fname.name = _name;
475  		fname.name_len = _name_len;
476  		fname.ctext = altname;
477  		fname.ctext_len = altname_len;
478  		/*
479  		 * The _name_len maybe larger than altname_len, such as
480  		 * when the human readable name length is in range of
481  		 * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE),
482  		 * then the copy in ceph_fname_to_usr will corrupt the
483  		 * data if there has no encryption key.
484  		 *
485  		 * Just set the no_copy flag and then if there has no
486  		 * encryption key the oname.name will be assigned to
487  		 * _name always.
488  		 */
489  		fname.no_copy = true;
490  		if (altname_len == 0) {
491  			/*
492  			 * Set tname to _name, and this will be used
493  			 * to do the base64_decode in-place. It's
494  			 * safe because the decoded string should
495  			 * always be shorter, which is 3/4 of origin
496  			 * string.
497  			 */
498  			tname.name = _name;
499  
500  			/*
501  			 * Set oname to _name too, and this will be
502  			 * used to do the dencryption in-place.
503  			 */
504  			oname.name = _name;
505  			oname.len = _name_len;
506  		} else {
507  			/*
508  			 * This will do the decryption only in-place
509  			 * from altname cryptext directly.
510  			 */
511  			oname.name = altname;
512  			oname.len = altname_len;
513  		}
514  		rde->is_nokey = false;
515  		err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey);
516  		if (err) {
517  			pr_err("%s unable to decode %.*s, got %d\n", __func__,
518  			       _name_len, _name, err);
519  			goto out_bad;
520  		}
521  		rde->name = oname.name;
522  		rde->name_len = oname.len;
523  
524  		/* inode */
525  		err = parse_reply_info_in(p, end, &rde->inode, features);
526  		if (err < 0)
527  			goto out_bad;
528  		/* ceph_readdir_prepopulate() will update it */
529  		rde->offset = 0;
530  		i++;
531  		num--;
532  	}
533  
534  done:
535  	/* Skip over any unrecognized fields */
536  	*p = end;
537  	return 0;
538  
539  bad:
540  	err = -EIO;
541  out_bad:
542  	pr_err("problem parsing dir contents %d\n", err);
543  	return err;
544  }
545  
546  /*
547   * parse fcntl F_GETLK results
548   */
parse_reply_info_filelock(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)549  static int parse_reply_info_filelock(void **p, void *end,
550  				     struct ceph_mds_reply_info_parsed *info,
551  				     u64 features)
552  {
553  	if (*p + sizeof(*info->filelock_reply) > end)
554  		goto bad;
555  
556  	info->filelock_reply = *p;
557  
558  	/* Skip over any unrecognized fields */
559  	*p = end;
560  	return 0;
561  bad:
562  	return -EIO;
563  }
564  
565  
566  #if BITS_PER_LONG == 64
567  
568  #define DELEGATED_INO_AVAILABLE		xa_mk_value(1)
569  
ceph_parse_deleg_inos(void ** p,void * end,struct ceph_mds_session * s)570  static int ceph_parse_deleg_inos(void **p, void *end,
571  				 struct ceph_mds_session *s)
572  {
573  	u32 sets;
574  
575  	ceph_decode_32_safe(p, end, sets, bad);
576  	dout("got %u sets of delegated inodes\n", sets);
577  	while (sets--) {
578  		u64 start, len;
579  
580  		ceph_decode_64_safe(p, end, start, bad);
581  		ceph_decode_64_safe(p, end, len, bad);
582  
583  		/* Don't accept a delegation of system inodes */
584  		if (start < CEPH_INO_SYSTEM_BASE) {
585  			pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
586  					start, len);
587  			continue;
588  		}
589  		while (len--) {
590  			int err = xa_insert(&s->s_delegated_inos, start++,
591  					    DELEGATED_INO_AVAILABLE,
592  					    GFP_KERNEL);
593  			if (!err) {
594  				dout("added delegated inode 0x%llx\n",
595  				     start - 1);
596  			} else if (err == -EBUSY) {
597  				pr_warn("MDS delegated inode 0x%llx more than once.\n",
598  					start - 1);
599  			} else {
600  				return err;
601  			}
602  		}
603  	}
604  	return 0;
605  bad:
606  	return -EIO;
607  }
608  
ceph_get_deleg_ino(struct ceph_mds_session * s)609  u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
610  {
611  	unsigned long ino;
612  	void *val;
613  
614  	xa_for_each(&s->s_delegated_inos, ino, val) {
615  		val = xa_erase(&s->s_delegated_inos, ino);
616  		if (val == DELEGATED_INO_AVAILABLE)
617  			return ino;
618  	}
619  	return 0;
620  }
621  
ceph_restore_deleg_ino(struct ceph_mds_session * s,u64 ino)622  int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
623  {
624  	return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
625  			 GFP_KERNEL);
626  }
627  #else /* BITS_PER_LONG == 64 */
628  /*
629   * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
630   * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
631   * and bottom words?
632   */
ceph_parse_deleg_inos(void ** p,void * end,struct ceph_mds_session * s)633  static int ceph_parse_deleg_inos(void **p, void *end,
634  				 struct ceph_mds_session *s)
635  {
636  	u32 sets;
637  
638  	ceph_decode_32_safe(p, end, sets, bad);
639  	if (sets)
640  		ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
641  	return 0;
642  bad:
643  	return -EIO;
644  }
645  
ceph_get_deleg_ino(struct ceph_mds_session * s)646  u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
647  {
648  	return 0;
649  }
650  
ceph_restore_deleg_ino(struct ceph_mds_session * s,u64 ino)651  int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
652  {
653  	return 0;
654  }
655  #endif /* BITS_PER_LONG == 64 */
656  
657  /*
658   * parse create results
659   */
parse_reply_info_create(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features,struct ceph_mds_session * s)660  static int parse_reply_info_create(void **p, void *end,
661  				  struct ceph_mds_reply_info_parsed *info,
662  				  u64 features, struct ceph_mds_session *s)
663  {
664  	int ret;
665  
666  	if (features == (u64)-1 ||
667  	    (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
668  		if (*p == end) {
669  			/* Malformed reply? */
670  			info->has_create_ino = false;
671  		} else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
672  			info->has_create_ino = true;
673  			/* struct_v, struct_compat, and len */
674  			ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
675  			ceph_decode_64_safe(p, end, info->ino, bad);
676  			ret = ceph_parse_deleg_inos(p, end, s);
677  			if (ret)
678  				return ret;
679  		} else {
680  			/* legacy */
681  			ceph_decode_64_safe(p, end, info->ino, bad);
682  			info->has_create_ino = true;
683  		}
684  	} else {
685  		if (*p != end)
686  			goto bad;
687  	}
688  
689  	/* Skip over any unrecognized fields */
690  	*p = end;
691  	return 0;
692  bad:
693  	return -EIO;
694  }
695  
parse_reply_info_getvxattr(void ** p,void * end,struct ceph_mds_reply_info_parsed * info,u64 features)696  static int parse_reply_info_getvxattr(void **p, void *end,
697  				      struct ceph_mds_reply_info_parsed *info,
698  				      u64 features)
699  {
700  	u32 value_len;
701  
702  	ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
703  	ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
704  	ceph_decode_skip_32(p, end, bad); /* skip payload length */
705  
706  	ceph_decode_32_safe(p, end, value_len, bad);
707  
708  	if (value_len == end - *p) {
709  	  info->xattr_info.xattr_value = *p;
710  	  info->xattr_info.xattr_value_len = value_len;
711  	  *p = end;
712  	  return value_len;
713  	}
714  bad:
715  	return -EIO;
716  }
717  
718  /*
719   * parse extra results
720   */
parse_reply_info_extra(void ** p,void * end,struct ceph_mds_request * req,u64 features,struct ceph_mds_session * s)721  static int parse_reply_info_extra(void **p, void *end,
722  				  struct ceph_mds_request *req,
723  				  u64 features, struct ceph_mds_session *s)
724  {
725  	struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
726  	u32 op = le32_to_cpu(info->head->op);
727  
728  	if (op == CEPH_MDS_OP_GETFILELOCK)
729  		return parse_reply_info_filelock(p, end, info, features);
730  	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
731  		return parse_reply_info_readdir(p, end, req, features);
732  	else if (op == CEPH_MDS_OP_CREATE)
733  		return parse_reply_info_create(p, end, info, features, s);
734  	else if (op == CEPH_MDS_OP_GETVXATTR)
735  		return parse_reply_info_getvxattr(p, end, info, features);
736  	else
737  		return -EIO;
738  }
739  
740  /*
741   * parse entire mds reply
742   */
parse_reply_info(struct ceph_mds_session * s,struct ceph_msg * msg,struct ceph_mds_request * req,u64 features)743  static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
744  			    struct ceph_mds_request *req, u64 features)
745  {
746  	struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
747  	void *p, *end;
748  	u32 len;
749  	int err;
750  
751  	info->head = msg->front.iov_base;
752  	p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
753  	end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
754  
755  	/* trace */
756  	ceph_decode_32_safe(&p, end, len, bad);
757  	if (len > 0) {
758  		ceph_decode_need(&p, end, len, bad);
759  		err = parse_reply_info_trace(&p, p+len, info, features);
760  		if (err < 0)
761  			goto out_bad;
762  	}
763  
764  	/* extra */
765  	ceph_decode_32_safe(&p, end, len, bad);
766  	if (len > 0) {
767  		ceph_decode_need(&p, end, len, bad);
768  		err = parse_reply_info_extra(&p, p+len, req, features, s);
769  		if (err < 0)
770  			goto out_bad;
771  	}
772  
773  	/* snap blob */
774  	ceph_decode_32_safe(&p, end, len, bad);
775  	info->snapblob_len = len;
776  	info->snapblob = p;
777  	p += len;
778  
779  	if (p != end)
780  		goto bad;
781  	return 0;
782  
783  bad:
784  	err = -EIO;
785  out_bad:
786  	pr_err("mds parse_reply err %d\n", err);
787  	ceph_msg_dump(msg);
788  	return err;
789  }
790  
destroy_reply_info(struct ceph_mds_reply_info_parsed * info)791  static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
792  {
793  	int i;
794  
795  	kfree(info->diri.fscrypt_auth);
796  	kfree(info->diri.fscrypt_file);
797  	kfree(info->targeti.fscrypt_auth);
798  	kfree(info->targeti.fscrypt_file);
799  	if (!info->dir_entries)
800  		return;
801  
802  	for (i = 0; i < info->dir_nr; i++) {
803  		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
804  
805  		kfree(rde->inode.fscrypt_auth);
806  		kfree(rde->inode.fscrypt_file);
807  	}
808  	free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
809  }
810  
811  /*
812   * In async unlink case the kclient won't wait for the first reply
813   * from MDS and just drop all the links and unhash the dentry and then
814   * succeeds immediately.
815   *
816   * For any new create/link/rename,etc requests followed by using the
817   * same file names we must wait for the first reply of the inflight
818   * unlink request, or the MDS possibly will fail these following
819   * requests with -EEXIST if the inflight async unlink request was
820   * delayed for some reasons.
821   *
822   * And the worst case is that for the none async openc request it will
823   * successfully open the file if the CDentry hasn't been unlinked yet,
824   * but later the previous delayed async unlink request will remove the
825   * CDenty. That means the just created file is possiblly deleted later
826   * by accident.
827   *
828   * We need to wait for the inflight async unlink requests to finish
829   * when creating new files/directories by using the same file names.
830   */
ceph_wait_on_conflict_unlink(struct dentry * dentry)831  int ceph_wait_on_conflict_unlink(struct dentry *dentry)
832  {
833  	struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
834  	struct dentry *pdentry = dentry->d_parent;
835  	struct dentry *udentry, *found = NULL;
836  	struct ceph_dentry_info *di;
837  	struct qstr dname;
838  	u32 hash = dentry->d_name.hash;
839  	int err;
840  
841  	dname.name = dentry->d_name.name;
842  	dname.len = dentry->d_name.len;
843  
844  	rcu_read_lock();
845  	hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
846  				   hnode, hash) {
847  		udentry = di->dentry;
848  
849  		spin_lock(&udentry->d_lock);
850  		if (udentry->d_name.hash != hash)
851  			goto next;
852  		if (unlikely(udentry->d_parent != pdentry))
853  			goto next;
854  		if (!hash_hashed(&di->hnode))
855  			goto next;
856  
857  		if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
858  			pr_warn("%s dentry %p:%pd async unlink bit is not set\n",
859  				__func__, dentry, dentry);
860  
861  		if (!d_same_name(udentry, pdentry, &dname))
862  			goto next;
863  
864  		found = dget_dlock(udentry);
865  		spin_unlock(&udentry->d_lock);
866  		break;
867  next:
868  		spin_unlock(&udentry->d_lock);
869  	}
870  	rcu_read_unlock();
871  
872  	if (likely(!found))
873  		return 0;
874  
875  	dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__,
876  	     dentry, dentry, found, found);
877  
878  	err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
879  			  TASK_KILLABLE);
880  	dput(found);
881  	return err;
882  }
883  
884  
885  /*
886   * sessions
887   */
ceph_session_state_name(int s)888  const char *ceph_session_state_name(int s)
889  {
890  	switch (s) {
891  	case CEPH_MDS_SESSION_NEW: return "new";
892  	case CEPH_MDS_SESSION_OPENING: return "opening";
893  	case CEPH_MDS_SESSION_OPEN: return "open";
894  	case CEPH_MDS_SESSION_HUNG: return "hung";
895  	case CEPH_MDS_SESSION_CLOSING: return "closing";
896  	case CEPH_MDS_SESSION_CLOSED: return "closed";
897  	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
898  	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
899  	case CEPH_MDS_SESSION_REJECTED: return "rejected";
900  	default: return "???";
901  	}
902  }
903  
ceph_get_mds_session(struct ceph_mds_session * s)904  struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
905  {
906  	if (refcount_inc_not_zero(&s->s_ref))
907  		return s;
908  	return NULL;
909  }
910  
ceph_put_mds_session(struct ceph_mds_session * s)911  void ceph_put_mds_session(struct ceph_mds_session *s)
912  {
913  	if (IS_ERR_OR_NULL(s))
914  		return;
915  
916  	if (refcount_dec_and_test(&s->s_ref)) {
917  		if (s->s_auth.authorizer)
918  			ceph_auth_destroy_authorizer(s->s_auth.authorizer);
919  		WARN_ON(mutex_is_locked(&s->s_mutex));
920  		xa_destroy(&s->s_delegated_inos);
921  		kfree(s);
922  	}
923  }
924  
925  /*
926   * called under mdsc->mutex
927   */
__ceph_lookup_mds_session(struct ceph_mds_client * mdsc,int mds)928  struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
929  						   int mds)
930  {
931  	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
932  		return NULL;
933  	return ceph_get_mds_session(mdsc->sessions[mds]);
934  }
935  
__have_session(struct ceph_mds_client * mdsc,int mds)936  static bool __have_session(struct ceph_mds_client *mdsc, int mds)
937  {
938  	if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
939  		return false;
940  	else
941  		return true;
942  }
943  
__verify_registered_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * s)944  static int __verify_registered_session(struct ceph_mds_client *mdsc,
945  				       struct ceph_mds_session *s)
946  {
947  	if (s->s_mds >= mdsc->max_sessions ||
948  	    mdsc->sessions[s->s_mds] != s)
949  		return -ENOENT;
950  	return 0;
951  }
952  
953  /*
954   * create+register a new session for given mds.
955   * called under mdsc->mutex.
956   */
register_session(struct ceph_mds_client * mdsc,int mds)957  static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
958  						 int mds)
959  {
960  	struct ceph_mds_session *s;
961  
962  	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
963  		return ERR_PTR(-EIO);
964  
965  	if (mds >= mdsc->mdsmap->possible_max_rank)
966  		return ERR_PTR(-EINVAL);
967  
968  	s = kzalloc(sizeof(*s), GFP_NOFS);
969  	if (!s)
970  		return ERR_PTR(-ENOMEM);
971  
972  	if (mds >= mdsc->max_sessions) {
973  		int newmax = 1 << get_count_order(mds + 1);
974  		struct ceph_mds_session **sa;
975  
976  		dout("%s: realloc to %d\n", __func__, newmax);
977  		sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
978  		if (!sa)
979  			goto fail_realloc;
980  		if (mdsc->sessions) {
981  			memcpy(sa, mdsc->sessions,
982  			       mdsc->max_sessions * sizeof(void *));
983  			kfree(mdsc->sessions);
984  		}
985  		mdsc->sessions = sa;
986  		mdsc->max_sessions = newmax;
987  	}
988  
989  	dout("%s: mds%d\n", __func__, mds);
990  	s->s_mdsc = mdsc;
991  	s->s_mds = mds;
992  	s->s_state = CEPH_MDS_SESSION_NEW;
993  	mutex_init(&s->s_mutex);
994  
995  	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
996  
997  	atomic_set(&s->s_cap_gen, 1);
998  	s->s_cap_ttl = jiffies - 1;
999  
1000  	spin_lock_init(&s->s_cap_lock);
1001  	INIT_LIST_HEAD(&s->s_caps);
1002  	refcount_set(&s->s_ref, 1);
1003  	INIT_LIST_HEAD(&s->s_waiting);
1004  	INIT_LIST_HEAD(&s->s_unsafe);
1005  	xa_init(&s->s_delegated_inos);
1006  	INIT_LIST_HEAD(&s->s_cap_releases);
1007  	INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
1008  
1009  	INIT_LIST_HEAD(&s->s_cap_dirty);
1010  	INIT_LIST_HEAD(&s->s_cap_flushing);
1011  
1012  	mdsc->sessions[mds] = s;
1013  	atomic_inc(&mdsc->num_sessions);
1014  	refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
1015  
1016  	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
1017  		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
1018  
1019  	return s;
1020  
1021  fail_realloc:
1022  	kfree(s);
1023  	return ERR_PTR(-ENOMEM);
1024  }
1025  
1026  /*
1027   * called under mdsc->mutex
1028   */
__unregister_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * s)1029  static void __unregister_session(struct ceph_mds_client *mdsc,
1030  			       struct ceph_mds_session *s)
1031  {
1032  	dout("__unregister_session mds%d %p\n", s->s_mds, s);
1033  	BUG_ON(mdsc->sessions[s->s_mds] != s);
1034  	mdsc->sessions[s->s_mds] = NULL;
1035  	ceph_con_close(&s->s_con);
1036  	ceph_put_mds_session(s);
1037  	atomic_dec(&mdsc->num_sessions);
1038  }
1039  
1040  /*
1041   * drop session refs in request.
1042   *
1043   * should be last request ref, or hold mdsc->mutex
1044   */
put_request_session(struct ceph_mds_request * req)1045  static void put_request_session(struct ceph_mds_request *req)
1046  {
1047  	if (req->r_session) {
1048  		ceph_put_mds_session(req->r_session);
1049  		req->r_session = NULL;
1050  	}
1051  }
1052  
ceph_mdsc_iterate_sessions(struct ceph_mds_client * mdsc,void (* cb)(struct ceph_mds_session *),bool check_state)1053  void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
1054  				void (*cb)(struct ceph_mds_session *),
1055  				bool check_state)
1056  {
1057  	int mds;
1058  
1059  	mutex_lock(&mdsc->mutex);
1060  	for (mds = 0; mds < mdsc->max_sessions; ++mds) {
1061  		struct ceph_mds_session *s;
1062  
1063  		s = __ceph_lookup_mds_session(mdsc, mds);
1064  		if (!s)
1065  			continue;
1066  
1067  		if (check_state && !check_session_state(s)) {
1068  			ceph_put_mds_session(s);
1069  			continue;
1070  		}
1071  
1072  		mutex_unlock(&mdsc->mutex);
1073  		cb(s);
1074  		ceph_put_mds_session(s);
1075  		mutex_lock(&mdsc->mutex);
1076  	}
1077  	mutex_unlock(&mdsc->mutex);
1078  }
1079  
ceph_mdsc_release_request(struct kref * kref)1080  void ceph_mdsc_release_request(struct kref *kref)
1081  {
1082  	struct ceph_mds_request *req = container_of(kref,
1083  						    struct ceph_mds_request,
1084  						    r_kref);
1085  	ceph_mdsc_release_dir_caps_no_check(req);
1086  	destroy_reply_info(&req->r_reply_info);
1087  	if (req->r_request)
1088  		ceph_msg_put(req->r_request);
1089  	if (req->r_reply)
1090  		ceph_msg_put(req->r_reply);
1091  	if (req->r_inode) {
1092  		ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1093  		iput(req->r_inode);
1094  	}
1095  	if (req->r_parent) {
1096  		ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
1097  		iput(req->r_parent);
1098  	}
1099  	iput(req->r_target_inode);
1100  	iput(req->r_new_inode);
1101  	if (req->r_dentry)
1102  		dput(req->r_dentry);
1103  	if (req->r_old_dentry)
1104  		dput(req->r_old_dentry);
1105  	if (req->r_old_dentry_dir) {
1106  		/*
1107  		 * track (and drop pins for) r_old_dentry_dir
1108  		 * separately, since r_old_dentry's d_parent may have
1109  		 * changed between the dir mutex being dropped and
1110  		 * this request being freed.
1111  		 */
1112  		ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
1113  				  CEPH_CAP_PIN);
1114  		iput(req->r_old_dentry_dir);
1115  	}
1116  	kfree(req->r_path1);
1117  	kfree(req->r_path2);
1118  	put_cred(req->r_cred);
1119  	if (req->r_pagelist)
1120  		ceph_pagelist_release(req->r_pagelist);
1121  	kfree(req->r_fscrypt_auth);
1122  	kfree(req->r_altname);
1123  	put_request_session(req);
1124  	ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
1125  	WARN_ON_ONCE(!list_empty(&req->r_wait));
1126  	kmem_cache_free(ceph_mds_request_cachep, req);
1127  }
1128  
DEFINE_RB_FUNCS(request,struct ceph_mds_request,r_tid,r_node)1129  DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
1130  
1131  /*
1132   * lookup session, bump ref if found.
1133   *
1134   * called under mdsc->mutex.
1135   */
1136  static struct ceph_mds_request *
1137  lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
1138  {
1139  	struct ceph_mds_request *req;
1140  
1141  	req = lookup_request(&mdsc->request_tree, tid);
1142  	if (req)
1143  		ceph_mdsc_get_request(req);
1144  
1145  	return req;
1146  }
1147  
1148  /*
1149   * Register an in-flight request, and assign a tid.  Link to directory
1150   * are modifying (if any).
1151   *
1152   * Called under mdsc->mutex.
1153   */
__register_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,struct inode * dir)1154  static void __register_request(struct ceph_mds_client *mdsc,
1155  			       struct ceph_mds_request *req,
1156  			       struct inode *dir)
1157  {
1158  	int ret = 0;
1159  
1160  	req->r_tid = ++mdsc->last_tid;
1161  	if (req->r_num_caps) {
1162  		ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
1163  					req->r_num_caps);
1164  		if (ret < 0) {
1165  			pr_err("__register_request %p "
1166  			       "failed to reserve caps: %d\n", req, ret);
1167  			/* set req->r_err to fail early from __do_request */
1168  			req->r_err = ret;
1169  			return;
1170  		}
1171  	}
1172  	dout("__register_request %p tid %lld\n", req, req->r_tid);
1173  	ceph_mdsc_get_request(req);
1174  	insert_request(&mdsc->request_tree, req);
1175  
1176  	req->r_cred = get_current_cred();
1177  
1178  	if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
1179  		mdsc->oldest_tid = req->r_tid;
1180  
1181  	if (dir) {
1182  		struct ceph_inode_info *ci = ceph_inode(dir);
1183  
1184  		ihold(dir);
1185  		req->r_unsafe_dir = dir;
1186  		spin_lock(&ci->i_unsafe_lock);
1187  		list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
1188  		spin_unlock(&ci->i_unsafe_lock);
1189  	}
1190  }
1191  
__unregister_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)1192  static void __unregister_request(struct ceph_mds_client *mdsc,
1193  				 struct ceph_mds_request *req)
1194  {
1195  	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
1196  
1197  	/* Never leave an unregistered request on an unsafe list! */
1198  	list_del_init(&req->r_unsafe_item);
1199  
1200  	if (req->r_tid == mdsc->oldest_tid) {
1201  		struct rb_node *p = rb_next(&req->r_node);
1202  		mdsc->oldest_tid = 0;
1203  		while (p) {
1204  			struct ceph_mds_request *next_req =
1205  				rb_entry(p, struct ceph_mds_request, r_node);
1206  			if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
1207  				mdsc->oldest_tid = next_req->r_tid;
1208  				break;
1209  			}
1210  			p = rb_next(p);
1211  		}
1212  	}
1213  
1214  	erase_request(&mdsc->request_tree, req);
1215  
1216  	if (req->r_unsafe_dir) {
1217  		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
1218  		spin_lock(&ci->i_unsafe_lock);
1219  		list_del_init(&req->r_unsafe_dir_item);
1220  		spin_unlock(&ci->i_unsafe_lock);
1221  	}
1222  	if (req->r_target_inode &&
1223  	    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
1224  		struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
1225  		spin_lock(&ci->i_unsafe_lock);
1226  		list_del_init(&req->r_unsafe_target_item);
1227  		spin_unlock(&ci->i_unsafe_lock);
1228  	}
1229  
1230  	if (req->r_unsafe_dir) {
1231  		iput(req->r_unsafe_dir);
1232  		req->r_unsafe_dir = NULL;
1233  	}
1234  
1235  	complete_all(&req->r_safe_completion);
1236  
1237  	ceph_mdsc_put_request(req);
1238  }
1239  
1240  /*
1241   * Walk back up the dentry tree until we hit a dentry representing a
1242   * non-snapshot inode. We do this using the rcu_read_lock (which must be held
1243   * when calling this) to ensure that the objects won't disappear while we're
1244   * working with them. Once we hit a candidate dentry, we attempt to take a
1245   * reference to it, and return that as the result.
1246   */
get_nonsnap_parent(struct dentry * dentry)1247  static struct inode *get_nonsnap_parent(struct dentry *dentry)
1248  {
1249  	struct inode *inode = NULL;
1250  
1251  	while (dentry && !IS_ROOT(dentry)) {
1252  		inode = d_inode_rcu(dentry);
1253  		if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1254  			break;
1255  		dentry = dentry->d_parent;
1256  	}
1257  	if (inode)
1258  		inode = igrab(inode);
1259  	return inode;
1260  }
1261  
1262  /*
1263   * Choose mds to send request to next.  If there is a hint set in the
1264   * request (e.g., due to a prior forward hint from the mds), use that.
1265   * Otherwise, consult frag tree and/or caps to identify the
1266   * appropriate mds.  If all else fails, choose randomly.
1267   *
1268   * Called under mdsc->mutex.
1269   */
__choose_mds(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,bool * random)1270  static int __choose_mds(struct ceph_mds_client *mdsc,
1271  			struct ceph_mds_request *req,
1272  			bool *random)
1273  {
1274  	struct inode *inode;
1275  	struct ceph_inode_info *ci;
1276  	struct ceph_cap *cap;
1277  	int mode = req->r_direct_mode;
1278  	int mds = -1;
1279  	u32 hash = req->r_direct_hash;
1280  	bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1281  
1282  	if (random)
1283  		*random = false;
1284  
1285  	/*
1286  	 * is there a specific mds we should try?  ignore hint if we have
1287  	 * no session and the mds is not up (active or recovering).
1288  	 */
1289  	if (req->r_resend_mds >= 0 &&
1290  	    (__have_session(mdsc, req->r_resend_mds) ||
1291  	     ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1292  		dout("%s using resend_mds mds%d\n", __func__,
1293  		     req->r_resend_mds);
1294  		return req->r_resend_mds;
1295  	}
1296  
1297  	if (mode == USE_RANDOM_MDS)
1298  		goto random;
1299  
1300  	inode = NULL;
1301  	if (req->r_inode) {
1302  		if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1303  			inode = req->r_inode;
1304  			ihold(inode);
1305  		} else {
1306  			/* req->r_dentry is non-null for LSSNAP request */
1307  			rcu_read_lock();
1308  			inode = get_nonsnap_parent(req->r_dentry);
1309  			rcu_read_unlock();
1310  			dout("%s using snapdir's parent %p\n", __func__, inode);
1311  		}
1312  	} else if (req->r_dentry) {
1313  		/* ignore race with rename; old or new d_parent is okay */
1314  		struct dentry *parent;
1315  		struct inode *dir;
1316  
1317  		rcu_read_lock();
1318  		parent = READ_ONCE(req->r_dentry->d_parent);
1319  		dir = req->r_parent ? : d_inode_rcu(parent);
1320  
1321  		if (!dir || dir->i_sb != mdsc->fsc->sb) {
1322  			/*  not this fs or parent went negative */
1323  			inode = d_inode(req->r_dentry);
1324  			if (inode)
1325  				ihold(inode);
1326  		} else if (ceph_snap(dir) != CEPH_NOSNAP) {
1327  			/* direct snapped/virtual snapdir requests
1328  			 * based on parent dir inode */
1329  			inode = get_nonsnap_parent(parent);
1330  			dout("%s using nonsnap parent %p\n", __func__, inode);
1331  		} else {
1332  			/* dentry target */
1333  			inode = d_inode(req->r_dentry);
1334  			if (!inode || mode == USE_AUTH_MDS) {
1335  				/* dir + name */
1336  				inode = igrab(dir);
1337  				hash = ceph_dentry_hash(dir, req->r_dentry);
1338  				is_hash = true;
1339  			} else {
1340  				ihold(inode);
1341  			}
1342  		}
1343  		rcu_read_unlock();
1344  	}
1345  
1346  	dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1347  	     hash, mode);
1348  	if (!inode)
1349  		goto random;
1350  	ci = ceph_inode(inode);
1351  
1352  	if (is_hash && S_ISDIR(inode->i_mode)) {
1353  		struct ceph_inode_frag frag;
1354  		int found;
1355  
1356  		ceph_choose_frag(ci, hash, &frag, &found);
1357  		if (found) {
1358  			if (mode == USE_ANY_MDS && frag.ndist > 0) {
1359  				u8 r;
1360  
1361  				/* choose a random replica */
1362  				get_random_bytes(&r, 1);
1363  				r %= frag.ndist;
1364  				mds = frag.dist[r];
1365  				dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1366  				     __func__, inode, ceph_vinop(inode),
1367  				     frag.frag, mds, (int)r, frag.ndist);
1368  				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1369  				    CEPH_MDS_STATE_ACTIVE &&
1370  				    !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1371  					goto out;
1372  			}
1373  
1374  			/* since this file/dir wasn't known to be
1375  			 * replicated, then we want to look for the
1376  			 * authoritative mds. */
1377  			if (frag.mds >= 0) {
1378  				/* choose auth mds */
1379  				mds = frag.mds;
1380  				dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1381  				     __func__, inode, ceph_vinop(inode),
1382  				     frag.frag, mds);
1383  				if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1384  				    CEPH_MDS_STATE_ACTIVE) {
1385  					if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1386  								  mds))
1387  						goto out;
1388  				}
1389  			}
1390  			mode = USE_AUTH_MDS;
1391  		}
1392  	}
1393  
1394  	spin_lock(&ci->i_ceph_lock);
1395  	cap = NULL;
1396  	if (mode == USE_AUTH_MDS)
1397  		cap = ci->i_auth_cap;
1398  	if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1399  		cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1400  	if (!cap) {
1401  		spin_unlock(&ci->i_ceph_lock);
1402  		iput(inode);
1403  		goto random;
1404  	}
1405  	mds = cap->session->s_mds;
1406  	dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1407  	     inode, ceph_vinop(inode), mds,
1408  	     cap == ci->i_auth_cap ? "auth " : "", cap);
1409  	spin_unlock(&ci->i_ceph_lock);
1410  out:
1411  	iput(inode);
1412  	return mds;
1413  
1414  random:
1415  	if (random)
1416  		*random = true;
1417  
1418  	mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1419  	dout("%s chose random mds%d\n", __func__, mds);
1420  	return mds;
1421  }
1422  
1423  
1424  /*
1425   * session messages
1426   */
ceph_create_session_msg(u32 op,u64 seq)1427  struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1428  {
1429  	struct ceph_msg *msg;
1430  	struct ceph_mds_session_head *h;
1431  
1432  	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1433  			   false);
1434  	if (!msg) {
1435  		pr_err("ENOMEM creating session %s msg\n",
1436  		       ceph_session_op_name(op));
1437  		return NULL;
1438  	}
1439  	h = msg->front.iov_base;
1440  	h->op = cpu_to_le32(op);
1441  	h->seq = cpu_to_le64(seq);
1442  
1443  	return msg;
1444  }
1445  
1446  static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1447  #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
encode_supported_features(void ** p,void * end)1448  static int encode_supported_features(void **p, void *end)
1449  {
1450  	static const size_t count = ARRAY_SIZE(feature_bits);
1451  
1452  	if (count > 0) {
1453  		size_t i;
1454  		size_t size = FEATURE_BYTES(count);
1455  		unsigned long bit;
1456  
1457  		if (WARN_ON_ONCE(*p + 4 + size > end))
1458  			return -ERANGE;
1459  
1460  		ceph_encode_32(p, size);
1461  		memset(*p, 0, size);
1462  		for (i = 0; i < count; i++) {
1463  			bit = feature_bits[i];
1464  			((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1465  		}
1466  		*p += size;
1467  	} else {
1468  		if (WARN_ON_ONCE(*p + 4 > end))
1469  			return -ERANGE;
1470  
1471  		ceph_encode_32(p, 0);
1472  	}
1473  
1474  	return 0;
1475  }
1476  
1477  static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1478  #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
encode_metric_spec(void ** p,void * end)1479  static int encode_metric_spec(void **p, void *end)
1480  {
1481  	static const size_t count = ARRAY_SIZE(metric_bits);
1482  
1483  	/* header */
1484  	if (WARN_ON_ONCE(*p + 2 > end))
1485  		return -ERANGE;
1486  
1487  	ceph_encode_8(p, 1); /* version */
1488  	ceph_encode_8(p, 1); /* compat */
1489  
1490  	if (count > 0) {
1491  		size_t i;
1492  		size_t size = METRIC_BYTES(count);
1493  
1494  		if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1495  			return -ERANGE;
1496  
1497  		/* metric spec info length */
1498  		ceph_encode_32(p, 4 + size);
1499  
1500  		/* metric spec */
1501  		ceph_encode_32(p, size);
1502  		memset(*p, 0, size);
1503  		for (i = 0; i < count; i++)
1504  			((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1505  		*p += size;
1506  	} else {
1507  		if (WARN_ON_ONCE(*p + 4 + 4 > end))
1508  			return -ERANGE;
1509  
1510  		/* metric spec info length */
1511  		ceph_encode_32(p, 4);
1512  		/* metric spec */
1513  		ceph_encode_32(p, 0);
1514  	}
1515  
1516  	return 0;
1517  }
1518  
1519  /*
1520   * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1521   * to include additional client metadata fields.
1522   */
create_session_open_msg(struct ceph_mds_client * mdsc,u64 seq)1523  static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1524  {
1525  	struct ceph_msg *msg;
1526  	struct ceph_mds_session_head *h;
1527  	int i;
1528  	int extra_bytes = 0;
1529  	int metadata_key_count = 0;
1530  	struct ceph_options *opt = mdsc->fsc->client->options;
1531  	struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1532  	size_t size, count;
1533  	void *p, *end;
1534  	int ret;
1535  
1536  	const char* metadata[][2] = {
1537  		{"hostname", mdsc->nodename},
1538  		{"kernel_version", init_utsname()->release},
1539  		{"entity_id", opt->name ? : ""},
1540  		{"root", fsopt->server_path ? : "/"},
1541  		{NULL, NULL}
1542  	};
1543  
1544  	/* Calculate serialized length of metadata */
1545  	extra_bytes = 4;  /* map length */
1546  	for (i = 0; metadata[i][0]; ++i) {
1547  		extra_bytes += 8 + strlen(metadata[i][0]) +
1548  			strlen(metadata[i][1]);
1549  		metadata_key_count++;
1550  	}
1551  
1552  	/* supported feature */
1553  	size = 0;
1554  	count = ARRAY_SIZE(feature_bits);
1555  	if (count > 0)
1556  		size = FEATURE_BYTES(count);
1557  	extra_bytes += 4 + size;
1558  
1559  	/* metric spec */
1560  	size = 0;
1561  	count = ARRAY_SIZE(metric_bits);
1562  	if (count > 0)
1563  		size = METRIC_BYTES(count);
1564  	extra_bytes += 2 + 4 + 4 + size;
1565  
1566  	/* Allocate the message */
1567  	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1568  			   GFP_NOFS, false);
1569  	if (!msg) {
1570  		pr_err("ENOMEM creating session open msg\n");
1571  		return ERR_PTR(-ENOMEM);
1572  	}
1573  	p = msg->front.iov_base;
1574  	end = p + msg->front.iov_len;
1575  
1576  	h = p;
1577  	h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1578  	h->seq = cpu_to_le64(seq);
1579  
1580  	/*
1581  	 * Serialize client metadata into waiting buffer space, using
1582  	 * the format that userspace expects for map<string, string>
1583  	 *
1584  	 * ClientSession messages with metadata are v4
1585  	 */
1586  	msg->hdr.version = cpu_to_le16(4);
1587  	msg->hdr.compat_version = cpu_to_le16(1);
1588  
1589  	/* The write pointer, following the session_head structure */
1590  	p += sizeof(*h);
1591  
1592  	/* Number of entries in the map */
1593  	ceph_encode_32(&p, metadata_key_count);
1594  
1595  	/* Two length-prefixed strings for each entry in the map */
1596  	for (i = 0; metadata[i][0]; ++i) {
1597  		size_t const key_len = strlen(metadata[i][0]);
1598  		size_t const val_len = strlen(metadata[i][1]);
1599  
1600  		ceph_encode_32(&p, key_len);
1601  		memcpy(p, metadata[i][0], key_len);
1602  		p += key_len;
1603  		ceph_encode_32(&p, val_len);
1604  		memcpy(p, metadata[i][1], val_len);
1605  		p += val_len;
1606  	}
1607  
1608  	ret = encode_supported_features(&p, end);
1609  	if (ret) {
1610  		pr_err("encode_supported_features failed!\n");
1611  		ceph_msg_put(msg);
1612  		return ERR_PTR(ret);
1613  	}
1614  
1615  	ret = encode_metric_spec(&p, end);
1616  	if (ret) {
1617  		pr_err("encode_metric_spec failed!\n");
1618  		ceph_msg_put(msg);
1619  		return ERR_PTR(ret);
1620  	}
1621  
1622  	msg->front.iov_len = p - msg->front.iov_base;
1623  	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1624  
1625  	return msg;
1626  }
1627  
1628  /*
1629   * send session open request.
1630   *
1631   * called under mdsc->mutex
1632   */
__open_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1633  static int __open_session(struct ceph_mds_client *mdsc,
1634  			  struct ceph_mds_session *session)
1635  {
1636  	struct ceph_msg *msg;
1637  	int mstate;
1638  	int mds = session->s_mds;
1639  
1640  	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
1641  		return -EIO;
1642  
1643  	/* wait for mds to go active? */
1644  	mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1645  	dout("open_session to mds%d (%s)\n", mds,
1646  	     ceph_mds_state_name(mstate));
1647  	session->s_state = CEPH_MDS_SESSION_OPENING;
1648  	session->s_renew_requested = jiffies;
1649  
1650  	/* send connect message */
1651  	msg = create_session_open_msg(mdsc, session->s_seq);
1652  	if (IS_ERR(msg))
1653  		return PTR_ERR(msg);
1654  	ceph_con_send(&session->s_con, msg);
1655  	return 0;
1656  }
1657  
1658  /*
1659   * open sessions for any export targets for the given mds
1660   *
1661   * called under mdsc->mutex
1662   */
1663  static struct ceph_mds_session *
__open_export_target_session(struct ceph_mds_client * mdsc,int target)1664  __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1665  {
1666  	struct ceph_mds_session *session;
1667  	int ret;
1668  
1669  	session = __ceph_lookup_mds_session(mdsc, target);
1670  	if (!session) {
1671  		session = register_session(mdsc, target);
1672  		if (IS_ERR(session))
1673  			return session;
1674  	}
1675  	if (session->s_state == CEPH_MDS_SESSION_NEW ||
1676  	    session->s_state == CEPH_MDS_SESSION_CLOSING) {
1677  		ret = __open_session(mdsc, session);
1678  		if (ret)
1679  			return ERR_PTR(ret);
1680  	}
1681  
1682  	return session;
1683  }
1684  
1685  struct ceph_mds_session *
ceph_mdsc_open_export_target_session(struct ceph_mds_client * mdsc,int target)1686  ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1687  {
1688  	struct ceph_mds_session *session;
1689  
1690  	dout("open_export_target_session to mds%d\n", target);
1691  
1692  	mutex_lock(&mdsc->mutex);
1693  	session = __open_export_target_session(mdsc, target);
1694  	mutex_unlock(&mdsc->mutex);
1695  
1696  	return session;
1697  }
1698  
__open_export_target_sessions(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1699  static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1700  					  struct ceph_mds_session *session)
1701  {
1702  	struct ceph_mds_info *mi;
1703  	struct ceph_mds_session *ts;
1704  	int i, mds = session->s_mds;
1705  
1706  	if (mds >= mdsc->mdsmap->possible_max_rank)
1707  		return;
1708  
1709  	mi = &mdsc->mdsmap->m_info[mds];
1710  	dout("open_export_target_sessions for mds%d (%d targets)\n",
1711  	     session->s_mds, mi->num_export_targets);
1712  
1713  	for (i = 0; i < mi->num_export_targets; i++) {
1714  		ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1715  		ceph_put_mds_session(ts);
1716  	}
1717  }
1718  
ceph_mdsc_open_export_target_sessions(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1719  void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1720  					   struct ceph_mds_session *session)
1721  {
1722  	mutex_lock(&mdsc->mutex);
1723  	__open_export_target_sessions(mdsc, session);
1724  	mutex_unlock(&mdsc->mutex);
1725  }
1726  
1727  /*
1728   * session caps
1729   */
1730  
detach_cap_releases(struct ceph_mds_session * session,struct list_head * target)1731  static void detach_cap_releases(struct ceph_mds_session *session,
1732  				struct list_head *target)
1733  {
1734  	lockdep_assert_held(&session->s_cap_lock);
1735  
1736  	list_splice_init(&session->s_cap_releases, target);
1737  	session->s_num_cap_releases = 0;
1738  	dout("dispose_cap_releases mds%d\n", session->s_mds);
1739  }
1740  
dispose_cap_releases(struct ceph_mds_client * mdsc,struct list_head * dispose)1741  static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1742  				 struct list_head *dispose)
1743  {
1744  	while (!list_empty(dispose)) {
1745  		struct ceph_cap *cap;
1746  		/* zero out the in-progress message */
1747  		cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1748  		list_del(&cap->session_caps);
1749  		ceph_put_cap(mdsc, cap);
1750  	}
1751  }
1752  
cleanup_session_requests(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1753  static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1754  				     struct ceph_mds_session *session)
1755  {
1756  	struct ceph_mds_request *req;
1757  	struct rb_node *p;
1758  
1759  	dout("cleanup_session_requests mds%d\n", session->s_mds);
1760  	mutex_lock(&mdsc->mutex);
1761  	while (!list_empty(&session->s_unsafe)) {
1762  		req = list_first_entry(&session->s_unsafe,
1763  				       struct ceph_mds_request, r_unsafe_item);
1764  		pr_warn_ratelimited(" dropping unsafe request %llu\n",
1765  				    req->r_tid);
1766  		if (req->r_target_inode)
1767  			mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1768  		if (req->r_unsafe_dir)
1769  			mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1770  		__unregister_request(mdsc, req);
1771  	}
1772  	/* zero r_attempts, so kick_requests() will re-send requests */
1773  	p = rb_first(&mdsc->request_tree);
1774  	while (p) {
1775  		req = rb_entry(p, struct ceph_mds_request, r_node);
1776  		p = rb_next(p);
1777  		if (req->r_session &&
1778  		    req->r_session->s_mds == session->s_mds)
1779  			req->r_attempts = 0;
1780  	}
1781  	mutex_unlock(&mdsc->mutex);
1782  }
1783  
1784  /*
1785   * Helper to safely iterate over all caps associated with a session, with
1786   * special care taken to handle a racing __ceph_remove_cap().
1787   *
1788   * Caller must hold session s_mutex.
1789   */
ceph_iterate_session_caps(struct ceph_mds_session * session,int (* cb)(struct inode *,int mds,void *),void * arg)1790  int ceph_iterate_session_caps(struct ceph_mds_session *session,
1791  			      int (*cb)(struct inode *, int mds, void *),
1792  			      void *arg)
1793  {
1794  	struct list_head *p;
1795  	struct ceph_cap *cap;
1796  	struct inode *inode, *last_inode = NULL;
1797  	struct ceph_cap *old_cap = NULL;
1798  	int ret;
1799  
1800  	dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1801  	spin_lock(&session->s_cap_lock);
1802  	p = session->s_caps.next;
1803  	while (p != &session->s_caps) {
1804  		int mds;
1805  
1806  		cap = list_entry(p, struct ceph_cap, session_caps);
1807  		inode = igrab(&cap->ci->netfs.inode);
1808  		if (!inode) {
1809  			p = p->next;
1810  			continue;
1811  		}
1812  		session->s_cap_iterator = cap;
1813  		mds = cap->mds;
1814  		spin_unlock(&session->s_cap_lock);
1815  
1816  		if (last_inode) {
1817  			iput(last_inode);
1818  			last_inode = NULL;
1819  		}
1820  		if (old_cap) {
1821  			ceph_put_cap(session->s_mdsc, old_cap);
1822  			old_cap = NULL;
1823  		}
1824  
1825  		ret = cb(inode, mds, arg);
1826  		last_inode = inode;
1827  
1828  		spin_lock(&session->s_cap_lock);
1829  		p = p->next;
1830  		if (!cap->ci) {
1831  			dout("iterate_session_caps  finishing cap %p removal\n",
1832  			     cap);
1833  			BUG_ON(cap->session != session);
1834  			cap->session = NULL;
1835  			list_del_init(&cap->session_caps);
1836  			session->s_nr_caps--;
1837  			atomic64_dec(&session->s_mdsc->metric.total_caps);
1838  			if (cap->queue_release)
1839  				__ceph_queue_cap_release(session, cap);
1840  			else
1841  				old_cap = cap;  /* put_cap it w/o locks held */
1842  		}
1843  		if (ret < 0)
1844  			goto out;
1845  	}
1846  	ret = 0;
1847  out:
1848  	session->s_cap_iterator = NULL;
1849  	spin_unlock(&session->s_cap_lock);
1850  
1851  	iput(last_inode);
1852  	if (old_cap)
1853  		ceph_put_cap(session->s_mdsc, old_cap);
1854  
1855  	return ret;
1856  }
1857  
remove_session_caps_cb(struct inode * inode,int mds,void * arg)1858  static int remove_session_caps_cb(struct inode *inode, int mds, void *arg)
1859  {
1860  	struct ceph_inode_info *ci = ceph_inode(inode);
1861  	bool invalidate = false;
1862  	struct ceph_cap *cap;
1863  	int iputs = 0;
1864  
1865  	spin_lock(&ci->i_ceph_lock);
1866  	cap = __get_cap_for_mds(ci, mds);
1867  	if (cap) {
1868  		dout(" removing cap %p, ci is %p, inode is %p\n",
1869  		     cap, ci, &ci->netfs.inode);
1870  
1871  		iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
1872  	}
1873  	spin_unlock(&ci->i_ceph_lock);
1874  
1875  	if (cap)
1876  		wake_up_all(&ci->i_cap_wq);
1877  	if (invalidate)
1878  		ceph_queue_invalidate(inode);
1879  	while (iputs--)
1880  		iput(inode);
1881  	return 0;
1882  }
1883  
1884  /*
1885   * caller must hold session s_mutex
1886   */
remove_session_caps(struct ceph_mds_session * session)1887  static void remove_session_caps(struct ceph_mds_session *session)
1888  {
1889  	struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1890  	struct super_block *sb = fsc->sb;
1891  	LIST_HEAD(dispose);
1892  
1893  	dout("remove_session_caps on %p\n", session);
1894  	ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1895  
1896  	wake_up_all(&fsc->mdsc->cap_flushing_wq);
1897  
1898  	spin_lock(&session->s_cap_lock);
1899  	if (session->s_nr_caps > 0) {
1900  		struct inode *inode;
1901  		struct ceph_cap *cap, *prev = NULL;
1902  		struct ceph_vino vino;
1903  		/*
1904  		 * iterate_session_caps() skips inodes that are being
1905  		 * deleted, we need to wait until deletions are complete.
1906  		 * __wait_on_freeing_inode() is designed for the job,
1907  		 * but it is not exported, so use lookup inode function
1908  		 * to access it.
1909  		 */
1910  		while (!list_empty(&session->s_caps)) {
1911  			cap = list_entry(session->s_caps.next,
1912  					 struct ceph_cap, session_caps);
1913  			if (cap == prev)
1914  				break;
1915  			prev = cap;
1916  			vino = cap->ci->i_vino;
1917  			spin_unlock(&session->s_cap_lock);
1918  
1919  			inode = ceph_find_inode(sb, vino);
1920  			iput(inode);
1921  
1922  			spin_lock(&session->s_cap_lock);
1923  		}
1924  	}
1925  
1926  	// drop cap expires and unlock s_cap_lock
1927  	detach_cap_releases(session, &dispose);
1928  
1929  	BUG_ON(session->s_nr_caps > 0);
1930  	BUG_ON(!list_empty(&session->s_cap_flushing));
1931  	spin_unlock(&session->s_cap_lock);
1932  	dispose_cap_releases(session->s_mdsc, &dispose);
1933  }
1934  
1935  enum {
1936  	RECONNECT,
1937  	RENEWCAPS,
1938  	FORCE_RO,
1939  };
1940  
1941  /*
1942   * wake up any threads waiting on this session's caps.  if the cap is
1943   * old (didn't get renewed on the client reconnect), remove it now.
1944   *
1945   * caller must hold s_mutex.
1946   */
wake_up_session_cb(struct inode * inode,int mds,void * arg)1947  static int wake_up_session_cb(struct inode *inode, int mds, void *arg)
1948  {
1949  	struct ceph_inode_info *ci = ceph_inode(inode);
1950  	unsigned long ev = (unsigned long)arg;
1951  
1952  	if (ev == RECONNECT) {
1953  		spin_lock(&ci->i_ceph_lock);
1954  		ci->i_wanted_max_size = 0;
1955  		ci->i_requested_max_size = 0;
1956  		spin_unlock(&ci->i_ceph_lock);
1957  	} else if (ev == RENEWCAPS) {
1958  		struct ceph_cap *cap;
1959  
1960  		spin_lock(&ci->i_ceph_lock);
1961  		cap = __get_cap_for_mds(ci, mds);
1962  		/* mds did not re-issue stale cap */
1963  		if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen))
1964  			cap->issued = cap->implemented = CEPH_CAP_PIN;
1965  		spin_unlock(&ci->i_ceph_lock);
1966  	} else if (ev == FORCE_RO) {
1967  	}
1968  	wake_up_all(&ci->i_cap_wq);
1969  	return 0;
1970  }
1971  
wake_up_session_caps(struct ceph_mds_session * session,int ev)1972  static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1973  {
1974  	dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1975  	ceph_iterate_session_caps(session, wake_up_session_cb,
1976  				  (void *)(unsigned long)ev);
1977  }
1978  
1979  /*
1980   * Send periodic message to MDS renewing all currently held caps.  The
1981   * ack will reset the expiration for all caps from this session.
1982   *
1983   * caller holds s_mutex
1984   */
send_renew_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)1985  static int send_renew_caps(struct ceph_mds_client *mdsc,
1986  			   struct ceph_mds_session *session)
1987  {
1988  	struct ceph_msg *msg;
1989  	int state;
1990  
1991  	if (time_after_eq(jiffies, session->s_cap_ttl) &&
1992  	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1993  		pr_info("mds%d caps stale\n", session->s_mds);
1994  	session->s_renew_requested = jiffies;
1995  
1996  	/* do not try to renew caps until a recovering mds has reconnected
1997  	 * with its clients. */
1998  	state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1999  	if (state < CEPH_MDS_STATE_RECONNECT) {
2000  		dout("send_renew_caps ignoring mds%d (%s)\n",
2001  		     session->s_mds, ceph_mds_state_name(state));
2002  		return 0;
2003  	}
2004  
2005  	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
2006  		ceph_mds_state_name(state));
2007  	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
2008  				      ++session->s_renew_seq);
2009  	if (!msg)
2010  		return -ENOMEM;
2011  	ceph_con_send(&session->s_con, msg);
2012  	return 0;
2013  }
2014  
send_flushmsg_ack(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,u64 seq)2015  static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
2016  			     struct ceph_mds_session *session, u64 seq)
2017  {
2018  	struct ceph_msg *msg;
2019  
2020  	dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
2021  	     session->s_mds, ceph_session_state_name(session->s_state), seq);
2022  	msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
2023  	if (!msg)
2024  		return -ENOMEM;
2025  	ceph_con_send(&session->s_con, msg);
2026  	return 0;
2027  }
2028  
2029  
2030  /*
2031   * Note new cap ttl, and any transition from stale -> not stale (fresh?).
2032   *
2033   * Called under session->s_mutex
2034   */
renewed_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,int is_renew)2035  static void renewed_caps(struct ceph_mds_client *mdsc,
2036  			 struct ceph_mds_session *session, int is_renew)
2037  {
2038  	int was_stale;
2039  	int wake = 0;
2040  
2041  	spin_lock(&session->s_cap_lock);
2042  	was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
2043  
2044  	session->s_cap_ttl = session->s_renew_requested +
2045  		mdsc->mdsmap->m_session_timeout*HZ;
2046  
2047  	if (was_stale) {
2048  		if (time_before(jiffies, session->s_cap_ttl)) {
2049  			pr_info("mds%d caps renewed\n", session->s_mds);
2050  			wake = 1;
2051  		} else {
2052  			pr_info("mds%d caps still stale\n", session->s_mds);
2053  		}
2054  	}
2055  	dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
2056  	     session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
2057  	     time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
2058  	spin_unlock(&session->s_cap_lock);
2059  
2060  	if (wake)
2061  		wake_up_session_caps(session, RENEWCAPS);
2062  }
2063  
2064  /*
2065   * send a session close request
2066   */
request_close_session(struct ceph_mds_session * session)2067  static int request_close_session(struct ceph_mds_session *session)
2068  {
2069  	struct ceph_msg *msg;
2070  
2071  	dout("request_close_session mds%d state %s seq %lld\n",
2072  	     session->s_mds, ceph_session_state_name(session->s_state),
2073  	     session->s_seq);
2074  	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
2075  				      session->s_seq);
2076  	if (!msg)
2077  		return -ENOMEM;
2078  	ceph_con_send(&session->s_con, msg);
2079  	return 1;
2080  }
2081  
2082  /*
2083   * Called with s_mutex held.
2084   */
__close_session(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2085  static int __close_session(struct ceph_mds_client *mdsc,
2086  			 struct ceph_mds_session *session)
2087  {
2088  	if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
2089  		return 0;
2090  	session->s_state = CEPH_MDS_SESSION_CLOSING;
2091  	return request_close_session(session);
2092  }
2093  
drop_negative_children(struct dentry * dentry)2094  static bool drop_negative_children(struct dentry *dentry)
2095  {
2096  	struct dentry *child;
2097  	bool all_negative = true;
2098  
2099  	if (!d_is_dir(dentry))
2100  		goto out;
2101  
2102  	spin_lock(&dentry->d_lock);
2103  	list_for_each_entry(child, &dentry->d_subdirs, d_child) {
2104  		if (d_really_is_positive(child)) {
2105  			all_negative = false;
2106  			break;
2107  		}
2108  	}
2109  	spin_unlock(&dentry->d_lock);
2110  
2111  	if (all_negative)
2112  		shrink_dcache_parent(dentry);
2113  out:
2114  	return all_negative;
2115  }
2116  
2117  /*
2118   * Trim old(er) caps.
2119   *
2120   * Because we can't cache an inode without one or more caps, we do
2121   * this indirectly: if a cap is unused, we prune its aliases, at which
2122   * point the inode will hopefully get dropped to.
2123   *
2124   * Yes, this is a bit sloppy.  Our only real goal here is to respond to
2125   * memory pressure from the MDS, though, so it needn't be perfect.
2126   */
trim_caps_cb(struct inode * inode,int mds,void * arg)2127  static int trim_caps_cb(struct inode *inode, int mds, void *arg)
2128  {
2129  	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2130  	int *remaining = arg;
2131  	struct ceph_inode_info *ci = ceph_inode(inode);
2132  	int used, wanted, oissued, mine;
2133  	struct ceph_cap *cap;
2134  
2135  	if (*remaining <= 0)
2136  		return -1;
2137  
2138  	spin_lock(&ci->i_ceph_lock);
2139  	cap = __get_cap_for_mds(ci, mds);
2140  	if (!cap) {
2141  		spin_unlock(&ci->i_ceph_lock);
2142  		return 0;
2143  	}
2144  	mine = cap->issued | cap->implemented;
2145  	used = __ceph_caps_used(ci);
2146  	wanted = __ceph_caps_file_wanted(ci);
2147  	oissued = __ceph_caps_issued_other(ci, cap);
2148  
2149  	dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
2150  	     inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
2151  	     ceph_cap_string(used), ceph_cap_string(wanted));
2152  	if (cap == ci->i_auth_cap) {
2153  		if (ci->i_dirty_caps || ci->i_flushing_caps ||
2154  		    !list_empty(&ci->i_cap_snaps))
2155  			goto out;
2156  		if ((used | wanted) & CEPH_CAP_ANY_WR)
2157  			goto out;
2158  		/* Note: it's possible that i_filelock_ref becomes non-zero
2159  		 * after dropping auth caps. It doesn't hurt because reply
2160  		 * of lock mds request will re-add auth caps. */
2161  		if (atomic_read(&ci->i_filelock_ref) > 0)
2162  			goto out;
2163  	}
2164  	/* The inode has cached pages, but it's no longer used.
2165  	 * we can safely drop it */
2166  	if (S_ISREG(inode->i_mode) &&
2167  	    wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2168  	    !(oissued & CEPH_CAP_FILE_CACHE)) {
2169  	  used = 0;
2170  	  oissued = 0;
2171  	}
2172  	if ((used | wanted) & ~oissued & mine)
2173  		goto out;   /* we need these caps */
2174  
2175  	if (oissued) {
2176  		/* we aren't the only cap.. just remove us */
2177  		ceph_remove_cap(mdsc, cap, true);
2178  		(*remaining)--;
2179  	} else {
2180  		struct dentry *dentry;
2181  		/* try dropping referring dentries */
2182  		spin_unlock(&ci->i_ceph_lock);
2183  		dentry = d_find_any_alias(inode);
2184  		if (dentry && drop_negative_children(dentry)) {
2185  			int count;
2186  			dput(dentry);
2187  			d_prune_aliases(inode);
2188  			count = atomic_read(&inode->i_count);
2189  			if (count == 1)
2190  				(*remaining)--;
2191  			dout("trim_caps_cb %p cap %p pruned, count now %d\n",
2192  			     inode, cap, count);
2193  		} else {
2194  			dput(dentry);
2195  		}
2196  		return 0;
2197  	}
2198  
2199  out:
2200  	spin_unlock(&ci->i_ceph_lock);
2201  	return 0;
2202  }
2203  
2204  /*
2205   * Trim session cap count down to some max number.
2206   */
ceph_trim_caps(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,int max_caps)2207  int ceph_trim_caps(struct ceph_mds_client *mdsc,
2208  		   struct ceph_mds_session *session,
2209  		   int max_caps)
2210  {
2211  	int trim_caps = session->s_nr_caps - max_caps;
2212  
2213  	dout("trim_caps mds%d start: %d / %d, trim %d\n",
2214  	     session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2215  	if (trim_caps > 0) {
2216  		int remaining = trim_caps;
2217  
2218  		ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2219  		dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2220  		     session->s_mds, session->s_nr_caps, max_caps,
2221  			trim_caps - remaining);
2222  	}
2223  
2224  	ceph_flush_cap_releases(mdsc, session);
2225  	return 0;
2226  }
2227  
check_caps_flush(struct ceph_mds_client * mdsc,u64 want_flush_tid)2228  static int check_caps_flush(struct ceph_mds_client *mdsc,
2229  			    u64 want_flush_tid)
2230  {
2231  	int ret = 1;
2232  
2233  	spin_lock(&mdsc->cap_dirty_lock);
2234  	if (!list_empty(&mdsc->cap_flush_list)) {
2235  		struct ceph_cap_flush *cf =
2236  			list_first_entry(&mdsc->cap_flush_list,
2237  					 struct ceph_cap_flush, g_list);
2238  		if (cf->tid <= want_flush_tid) {
2239  			dout("check_caps_flush still flushing tid "
2240  			     "%llu <= %llu\n", cf->tid, want_flush_tid);
2241  			ret = 0;
2242  		}
2243  	}
2244  	spin_unlock(&mdsc->cap_dirty_lock);
2245  	return ret;
2246  }
2247  
2248  /*
2249   * flush all dirty inode data to disk.
2250   *
2251   * returns true if we've flushed through want_flush_tid
2252   */
wait_caps_flush(struct ceph_mds_client * mdsc,u64 want_flush_tid)2253  static void wait_caps_flush(struct ceph_mds_client *mdsc,
2254  			    u64 want_flush_tid)
2255  {
2256  	dout("check_caps_flush want %llu\n", want_flush_tid);
2257  
2258  	wait_event(mdsc->cap_flushing_wq,
2259  		   check_caps_flush(mdsc, want_flush_tid));
2260  
2261  	dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2262  }
2263  
2264  /*
2265   * called under s_mutex
2266   */
ceph_send_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2267  static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2268  				   struct ceph_mds_session *session)
2269  {
2270  	struct ceph_msg *msg = NULL;
2271  	struct ceph_mds_cap_release *head;
2272  	struct ceph_mds_cap_item *item;
2273  	struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2274  	struct ceph_cap *cap;
2275  	LIST_HEAD(tmp_list);
2276  	int num_cap_releases;
2277  	__le32	barrier, *cap_barrier;
2278  
2279  	down_read(&osdc->lock);
2280  	barrier = cpu_to_le32(osdc->epoch_barrier);
2281  	up_read(&osdc->lock);
2282  
2283  	spin_lock(&session->s_cap_lock);
2284  again:
2285  	list_splice_init(&session->s_cap_releases, &tmp_list);
2286  	num_cap_releases = session->s_num_cap_releases;
2287  	session->s_num_cap_releases = 0;
2288  	spin_unlock(&session->s_cap_lock);
2289  
2290  	while (!list_empty(&tmp_list)) {
2291  		if (!msg) {
2292  			msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2293  					PAGE_SIZE, GFP_NOFS, false);
2294  			if (!msg)
2295  				goto out_err;
2296  			head = msg->front.iov_base;
2297  			head->num = cpu_to_le32(0);
2298  			msg->front.iov_len = sizeof(*head);
2299  
2300  			msg->hdr.version = cpu_to_le16(2);
2301  			msg->hdr.compat_version = cpu_to_le16(1);
2302  		}
2303  
2304  		cap = list_first_entry(&tmp_list, struct ceph_cap,
2305  					session_caps);
2306  		list_del(&cap->session_caps);
2307  		num_cap_releases--;
2308  
2309  		head = msg->front.iov_base;
2310  		put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2311  				   &head->num);
2312  		item = msg->front.iov_base + msg->front.iov_len;
2313  		item->ino = cpu_to_le64(cap->cap_ino);
2314  		item->cap_id = cpu_to_le64(cap->cap_id);
2315  		item->migrate_seq = cpu_to_le32(cap->mseq);
2316  		item->seq = cpu_to_le32(cap->issue_seq);
2317  		msg->front.iov_len += sizeof(*item);
2318  
2319  		ceph_put_cap(mdsc, cap);
2320  
2321  		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2322  			// Append cap_barrier field
2323  			cap_barrier = msg->front.iov_base + msg->front.iov_len;
2324  			*cap_barrier = barrier;
2325  			msg->front.iov_len += sizeof(*cap_barrier);
2326  
2327  			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2328  			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2329  			ceph_con_send(&session->s_con, msg);
2330  			msg = NULL;
2331  		}
2332  	}
2333  
2334  	BUG_ON(num_cap_releases != 0);
2335  
2336  	spin_lock(&session->s_cap_lock);
2337  	if (!list_empty(&session->s_cap_releases))
2338  		goto again;
2339  	spin_unlock(&session->s_cap_lock);
2340  
2341  	if (msg) {
2342  		// Append cap_barrier field
2343  		cap_barrier = msg->front.iov_base + msg->front.iov_len;
2344  		*cap_barrier = barrier;
2345  		msg->front.iov_len += sizeof(*cap_barrier);
2346  
2347  		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2348  		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2349  		ceph_con_send(&session->s_con, msg);
2350  	}
2351  	return;
2352  out_err:
2353  	pr_err("send_cap_releases mds%d, failed to allocate message\n",
2354  		session->s_mds);
2355  	spin_lock(&session->s_cap_lock);
2356  	list_splice(&tmp_list, &session->s_cap_releases);
2357  	session->s_num_cap_releases += num_cap_releases;
2358  	spin_unlock(&session->s_cap_lock);
2359  }
2360  
ceph_cap_release_work(struct work_struct * work)2361  static void ceph_cap_release_work(struct work_struct *work)
2362  {
2363  	struct ceph_mds_session *session =
2364  		container_of(work, struct ceph_mds_session, s_cap_release_work);
2365  
2366  	mutex_lock(&session->s_mutex);
2367  	if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2368  	    session->s_state == CEPH_MDS_SESSION_HUNG)
2369  		ceph_send_cap_releases(session->s_mdsc, session);
2370  	mutex_unlock(&session->s_mutex);
2371  	ceph_put_mds_session(session);
2372  }
2373  
ceph_flush_cap_releases(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)2374  void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2375  		             struct ceph_mds_session *session)
2376  {
2377  	if (mdsc->stopping)
2378  		return;
2379  
2380  	ceph_get_mds_session(session);
2381  	if (queue_work(mdsc->fsc->cap_wq,
2382  		       &session->s_cap_release_work)) {
2383  		dout("cap release work queued\n");
2384  	} else {
2385  		ceph_put_mds_session(session);
2386  		dout("failed to queue cap release work\n");
2387  	}
2388  }
2389  
2390  /*
2391   * caller holds session->s_cap_lock
2392   */
__ceph_queue_cap_release(struct ceph_mds_session * session,struct ceph_cap * cap)2393  void __ceph_queue_cap_release(struct ceph_mds_session *session,
2394  			      struct ceph_cap *cap)
2395  {
2396  	list_add_tail(&cap->session_caps, &session->s_cap_releases);
2397  	session->s_num_cap_releases++;
2398  
2399  	if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2400  		ceph_flush_cap_releases(session->s_mdsc, session);
2401  }
2402  
ceph_cap_reclaim_work(struct work_struct * work)2403  static void ceph_cap_reclaim_work(struct work_struct *work)
2404  {
2405  	struct ceph_mds_client *mdsc =
2406  		container_of(work, struct ceph_mds_client, cap_reclaim_work);
2407  	int ret = ceph_trim_dentries(mdsc);
2408  	if (ret == -EAGAIN)
2409  		ceph_queue_cap_reclaim_work(mdsc);
2410  }
2411  
ceph_queue_cap_reclaim_work(struct ceph_mds_client * mdsc)2412  void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2413  {
2414  	if (mdsc->stopping)
2415  		return;
2416  
2417          if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2418                  dout("caps reclaim work queued\n");
2419          } else {
2420                  dout("failed to queue caps release work\n");
2421          }
2422  }
2423  
ceph_reclaim_caps_nr(struct ceph_mds_client * mdsc,int nr)2424  void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2425  {
2426  	int val;
2427  	if (!nr)
2428  		return;
2429  	val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2430  	if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2431  		atomic_set(&mdsc->cap_reclaim_pending, 0);
2432  		ceph_queue_cap_reclaim_work(mdsc);
2433  	}
2434  }
2435  
2436  /*
2437   * requests
2438   */
2439  
ceph_alloc_readdir_reply_buffer(struct ceph_mds_request * req,struct inode * dir)2440  int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2441  				    struct inode *dir)
2442  {
2443  	struct ceph_inode_info *ci = ceph_inode(dir);
2444  	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2445  	struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2446  	size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2447  	unsigned int num_entries;
2448  	int order;
2449  
2450  	spin_lock(&ci->i_ceph_lock);
2451  	num_entries = ci->i_files + ci->i_subdirs;
2452  	spin_unlock(&ci->i_ceph_lock);
2453  	num_entries = max(num_entries, 1U);
2454  	num_entries = min(num_entries, opt->max_readdir);
2455  
2456  	order = get_order(size * num_entries);
2457  	while (order >= 0) {
2458  		rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2459  							     __GFP_NOWARN |
2460  							     __GFP_ZERO,
2461  							     order);
2462  		if (rinfo->dir_entries)
2463  			break;
2464  		order--;
2465  	}
2466  	if (!rinfo->dir_entries)
2467  		return -ENOMEM;
2468  
2469  	num_entries = (PAGE_SIZE << order) / size;
2470  	num_entries = min(num_entries, opt->max_readdir);
2471  
2472  	rinfo->dir_buf_size = PAGE_SIZE << order;
2473  	req->r_num_caps = num_entries + 1;
2474  	req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2475  	req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2476  	return 0;
2477  }
2478  
2479  /*
2480   * Create an mds request.
2481   */
2482  struct ceph_mds_request *
ceph_mdsc_create_request(struct ceph_mds_client * mdsc,int op,int mode)2483  ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2484  {
2485  	struct ceph_mds_request *req;
2486  
2487  	req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2488  	if (!req)
2489  		return ERR_PTR(-ENOMEM);
2490  
2491  	mutex_init(&req->r_fill_mutex);
2492  	req->r_mdsc = mdsc;
2493  	req->r_started = jiffies;
2494  	req->r_start_latency = ktime_get();
2495  	req->r_resend_mds = -1;
2496  	INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2497  	INIT_LIST_HEAD(&req->r_unsafe_target_item);
2498  	req->r_fmode = -1;
2499  	req->r_feature_needed = -1;
2500  	kref_init(&req->r_kref);
2501  	RB_CLEAR_NODE(&req->r_node);
2502  	INIT_LIST_HEAD(&req->r_wait);
2503  	init_completion(&req->r_completion);
2504  	init_completion(&req->r_safe_completion);
2505  	INIT_LIST_HEAD(&req->r_unsafe_item);
2506  
2507  	ktime_get_coarse_real_ts64(&req->r_stamp);
2508  
2509  	req->r_op = op;
2510  	req->r_direct_mode = mode;
2511  	return req;
2512  }
2513  
2514  /*
2515   * return oldest (lowest) request, tid in request tree, 0 if none.
2516   *
2517   * called under mdsc->mutex.
2518   */
__get_oldest_req(struct ceph_mds_client * mdsc)2519  static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2520  {
2521  	if (RB_EMPTY_ROOT(&mdsc->request_tree))
2522  		return NULL;
2523  	return rb_entry(rb_first(&mdsc->request_tree),
2524  			struct ceph_mds_request, r_node);
2525  }
2526  
__get_oldest_tid(struct ceph_mds_client * mdsc)2527  static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2528  {
2529  	return mdsc->oldest_tid;
2530  }
2531  
2532  #if IS_ENABLED(CONFIG_FS_ENCRYPTION)
get_fscrypt_altname(const struct ceph_mds_request * req,u32 * plen)2533  static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2534  {
2535  	struct inode *dir = req->r_parent;
2536  	struct dentry *dentry = req->r_dentry;
2537  	u8 *cryptbuf = NULL;
2538  	u32 len = 0;
2539  	int ret = 0;
2540  
2541  	/* only encode if we have parent and dentry */
2542  	if (!dir || !dentry)
2543  		goto success;
2544  
2545  	/* No-op unless this is encrypted */
2546  	if (!IS_ENCRYPTED(dir))
2547  		goto success;
2548  
2549  	ret = ceph_fscrypt_prepare_readdir(dir);
2550  	if (ret < 0)
2551  		return ERR_PTR(ret);
2552  
2553  	/* No key? Just ignore it. */
2554  	if (!fscrypt_has_encryption_key(dir))
2555  		goto success;
2556  
2557  	if (!fscrypt_fname_encrypted_size(dir, dentry->d_name.len, NAME_MAX,
2558  					  &len)) {
2559  		WARN_ON_ONCE(1);
2560  		return ERR_PTR(-ENAMETOOLONG);
2561  	}
2562  
2563  	/* No need to append altname if name is short enough */
2564  	if (len <= CEPH_NOHASH_NAME_MAX) {
2565  		len = 0;
2566  		goto success;
2567  	}
2568  
2569  	cryptbuf = kmalloc(len, GFP_KERNEL);
2570  	if (!cryptbuf)
2571  		return ERR_PTR(-ENOMEM);
2572  
2573  	ret = fscrypt_fname_encrypt(dir, &dentry->d_name, cryptbuf, len);
2574  	if (ret) {
2575  		kfree(cryptbuf);
2576  		return ERR_PTR(ret);
2577  	}
2578  success:
2579  	*plen = len;
2580  	return cryptbuf;
2581  }
2582  #else
get_fscrypt_altname(const struct ceph_mds_request * req,u32 * plen)2583  static u8 *get_fscrypt_altname(const struct ceph_mds_request *req, u32 *plen)
2584  {
2585  	*plen = 0;
2586  	return NULL;
2587  }
2588  #endif
2589  
2590  /**
2591   * ceph_mdsc_build_path - build a path string to a given dentry
2592   * @mdsc: mds client
2593   * @dentry: dentry to which path should be built
2594   * @plen: returned length of string
2595   * @pbase: returned base inode number
2596   * @for_wire: is this path going to be sent to the MDS?
2597   *
2598   * Build a string that represents the path to the dentry. This is mostly called
2599   * for two different purposes:
2600   *
2601   * 1) we need to build a path string to send to the MDS (for_wire == true)
2602   * 2) we need a path string for local presentation (e.g. debugfs)
2603   *    (for_wire == false)
2604   *
2605   * The path is built in reverse, starting with the dentry. Walk back up toward
2606   * the root, building the path until the first non-snapped inode is reached
2607   * (for_wire) or the root inode is reached (!for_wire).
2608   *
2609   * Encode hidden .snap dirs as a double /, i.e.
2610   *   foo/.snap/bar -> foo//bar
2611   */
ceph_mdsc_build_path(struct ceph_mds_client * mdsc,struct dentry * dentry,int * plen,u64 * pbase,int for_wire)2612  char *ceph_mdsc_build_path(struct ceph_mds_client *mdsc, struct dentry *dentry,
2613  			   int *plen, u64 *pbase, int for_wire)
2614  {
2615  	struct dentry *cur;
2616  	struct inode *inode;
2617  	char *path;
2618  	int pos;
2619  	unsigned seq;
2620  	u64 base;
2621  
2622  	if (!dentry)
2623  		return ERR_PTR(-EINVAL);
2624  
2625  	path = __getname();
2626  	if (!path)
2627  		return ERR_PTR(-ENOMEM);
2628  retry:
2629  	pos = PATH_MAX - 1;
2630  	path[pos] = '\0';
2631  
2632  	seq = read_seqbegin(&rename_lock);
2633  	cur = dget(dentry);
2634  	for (;;) {
2635  		struct dentry *parent;
2636  
2637  		spin_lock(&cur->d_lock);
2638  		inode = d_inode(cur);
2639  		if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2640  			dout("build_path path+%d: %p SNAPDIR\n",
2641  			     pos, cur);
2642  			spin_unlock(&cur->d_lock);
2643  			parent = dget_parent(cur);
2644  		} else if (for_wire && inode && dentry != cur &&
2645  			   ceph_snap(inode) == CEPH_NOSNAP) {
2646  			spin_unlock(&cur->d_lock);
2647  			pos++; /* get rid of any prepended '/' */
2648  			break;
2649  		} else if (!for_wire || !IS_ENCRYPTED(d_inode(cur->d_parent))) {
2650  			pos -= cur->d_name.len;
2651  			if (pos < 0) {
2652  				spin_unlock(&cur->d_lock);
2653  				break;
2654  			}
2655  			memcpy(path + pos, cur->d_name.name, cur->d_name.len);
2656  			spin_unlock(&cur->d_lock);
2657  			parent = dget_parent(cur);
2658  		} else {
2659  			int len, ret;
2660  			char buf[NAME_MAX];
2661  
2662  			/*
2663  			 * Proactively copy name into buf, in case we need to
2664  			 * present it as-is.
2665  			 */
2666  			memcpy(buf, cur->d_name.name, cur->d_name.len);
2667  			len = cur->d_name.len;
2668  			spin_unlock(&cur->d_lock);
2669  			parent = dget_parent(cur);
2670  
2671  			ret = ceph_fscrypt_prepare_readdir(d_inode(parent));
2672  			if (ret < 0) {
2673  				dput(parent);
2674  				dput(cur);
2675  				return ERR_PTR(ret);
2676  			}
2677  
2678  			if (fscrypt_has_encryption_key(d_inode(parent))) {
2679  				len = ceph_encode_encrypted_fname(d_inode(parent),
2680  								  cur, buf);
2681  				if (len < 0) {
2682  					dput(parent);
2683  					dput(cur);
2684  					return ERR_PTR(len);
2685  				}
2686  			}
2687  			pos -= len;
2688  			if (pos < 0) {
2689  				dput(parent);
2690  				break;
2691  			}
2692  			memcpy(path + pos, buf, len);
2693  		}
2694  		dput(cur);
2695  		cur = parent;
2696  
2697  		/* Are we at the root? */
2698  		if (IS_ROOT(cur))
2699  			break;
2700  
2701  		/* Are we out of buffer? */
2702  		if (--pos < 0)
2703  			break;
2704  
2705  		path[pos] = '/';
2706  	}
2707  	inode = d_inode(cur);
2708  	base = inode ? ceph_ino(inode) : 0;
2709  	dput(cur);
2710  
2711  	if (read_seqretry(&rename_lock, seq))
2712  		goto retry;
2713  
2714  	if (pos < 0) {
2715  		/*
2716  		 * The path is longer than PATH_MAX and this function
2717  		 * cannot ever succeed.  Creating paths that long is
2718  		 * possible with Ceph, but Linux cannot use them.
2719  		 */
2720  		return ERR_PTR(-ENAMETOOLONG);
2721  	}
2722  
2723  	*pbase = base;
2724  	*plen = PATH_MAX - 1 - pos;
2725  	dout("build_path on %p %d built %llx '%.*s'\n",
2726  	     dentry, d_count(dentry), base, *plen, path + pos);
2727  	return path + pos;
2728  }
2729  
build_dentry_path(struct ceph_mds_client * mdsc,struct dentry * dentry,struct inode * dir,const char ** ppath,int * ppathlen,u64 * pino,bool * pfreepath,bool parent_locked)2730  static int build_dentry_path(struct ceph_mds_client *mdsc, struct dentry *dentry,
2731  			     struct inode *dir, const char **ppath, int *ppathlen,
2732  			     u64 *pino, bool *pfreepath, bool parent_locked)
2733  {
2734  	char *path;
2735  
2736  	rcu_read_lock();
2737  	if (!dir)
2738  		dir = d_inode_rcu(dentry->d_parent);
2739  	if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP &&
2740  	    !IS_ENCRYPTED(dir)) {
2741  		*pino = ceph_ino(dir);
2742  		rcu_read_unlock();
2743  		*ppath = dentry->d_name.name;
2744  		*ppathlen = dentry->d_name.len;
2745  		return 0;
2746  	}
2747  	rcu_read_unlock();
2748  	path = ceph_mdsc_build_path(mdsc, dentry, ppathlen, pino, 1);
2749  	if (IS_ERR(path))
2750  		return PTR_ERR(path);
2751  	*ppath = path;
2752  	*pfreepath = true;
2753  	return 0;
2754  }
2755  
build_inode_path(struct inode * inode,const char ** ppath,int * ppathlen,u64 * pino,bool * pfreepath)2756  static int build_inode_path(struct inode *inode,
2757  			    const char **ppath, int *ppathlen, u64 *pino,
2758  			    bool *pfreepath)
2759  {
2760  	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
2761  	struct dentry *dentry;
2762  	char *path;
2763  
2764  	if (ceph_snap(inode) == CEPH_NOSNAP) {
2765  		*pino = ceph_ino(inode);
2766  		*ppathlen = 0;
2767  		return 0;
2768  	}
2769  	dentry = d_find_alias(inode);
2770  	path = ceph_mdsc_build_path(mdsc, dentry, ppathlen, pino, 1);
2771  	dput(dentry);
2772  	if (IS_ERR(path))
2773  		return PTR_ERR(path);
2774  	*ppath = path;
2775  	*pfreepath = true;
2776  	return 0;
2777  }
2778  
2779  /*
2780   * request arguments may be specified via an inode *, a dentry *, or
2781   * an explicit ino+path.
2782   */
set_request_path_attr(struct ceph_mds_client * mdsc,struct inode * rinode,struct dentry * rdentry,struct inode * rdiri,const char * rpath,u64 rino,const char ** ppath,int * pathlen,u64 * ino,bool * freepath,bool parent_locked)2783  static int set_request_path_attr(struct ceph_mds_client *mdsc, struct inode *rinode,
2784  				 struct dentry *rdentry, struct inode *rdiri,
2785  				 const char *rpath, u64 rino, const char **ppath,
2786  				 int *pathlen, u64 *ino, bool *freepath,
2787  				 bool parent_locked)
2788  {
2789  	int r = 0;
2790  
2791  	if (rinode) {
2792  		r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2793  		dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2794  		     ceph_snap(rinode));
2795  	} else if (rdentry) {
2796  		r = build_dentry_path(mdsc, rdentry, rdiri, ppath, pathlen, ino,
2797  					freepath, parent_locked);
2798  		dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2799  		     *ppath);
2800  	} else if (rpath || rino) {
2801  		*ino = rino;
2802  		*ppath = rpath;
2803  		*pathlen = rpath ? strlen(rpath) : 0;
2804  		dout(" path %.*s\n", *pathlen, rpath);
2805  	}
2806  
2807  	return r;
2808  }
2809  
encode_mclientrequest_tail(void ** p,const struct ceph_mds_request * req)2810  static void encode_mclientrequest_tail(void **p,
2811  				       const struct ceph_mds_request *req)
2812  {
2813  	struct ceph_timespec ts;
2814  	int i;
2815  
2816  	ceph_encode_timespec64(&ts, &req->r_stamp);
2817  	ceph_encode_copy(p, &ts, sizeof(ts));
2818  
2819  	/* v4: gid_list */
2820  	ceph_encode_32(p, req->r_cred->group_info->ngroups);
2821  	for (i = 0; i < req->r_cred->group_info->ngroups; i++)
2822  		ceph_encode_64(p, from_kgid(&init_user_ns,
2823  					    req->r_cred->group_info->gid[i]));
2824  
2825  	/* v5: altname */
2826  	ceph_encode_32(p, req->r_altname_len);
2827  	ceph_encode_copy(p, req->r_altname, req->r_altname_len);
2828  
2829  	/* v6: fscrypt_auth and fscrypt_file */
2830  	if (req->r_fscrypt_auth) {
2831  		u32 authlen = ceph_fscrypt_auth_len(req->r_fscrypt_auth);
2832  
2833  		ceph_encode_32(p, authlen);
2834  		ceph_encode_copy(p, req->r_fscrypt_auth, authlen);
2835  	} else {
2836  		ceph_encode_32(p, 0);
2837  	}
2838  	if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags)) {
2839  		ceph_encode_32(p, sizeof(__le64));
2840  		ceph_encode_64(p, req->r_fscrypt_file);
2841  	} else {
2842  		ceph_encode_32(p, 0);
2843  	}
2844  }
2845  
2846  static struct ceph_mds_request_head_legacy *
find_legacy_request_head(void * p,u64 features)2847  find_legacy_request_head(void *p, u64 features)
2848  {
2849  	bool legacy = !(features & CEPH_FEATURE_FS_BTIME);
2850  	struct ceph_mds_request_head_old *ohead;
2851  
2852  	if (legacy)
2853  		return (struct ceph_mds_request_head_legacy *)p;
2854  	ohead = (struct ceph_mds_request_head_old *)p;
2855  	return (struct ceph_mds_request_head_legacy *)&ohead->oldest_client_tid;
2856  }
2857  
2858  /*
2859   * called under mdsc->mutex
2860   */
create_request_message(struct ceph_mds_session * session,struct ceph_mds_request * req,bool drop_cap_releases)2861  static struct ceph_msg *create_request_message(struct ceph_mds_session *session,
2862  					       struct ceph_mds_request *req,
2863  					       bool drop_cap_releases)
2864  {
2865  	int mds = session->s_mds;
2866  	struct ceph_mds_client *mdsc = session->s_mdsc;
2867  	struct ceph_msg *msg;
2868  	struct ceph_mds_request_head_legacy *lhead;
2869  	const char *path1 = NULL;
2870  	const char *path2 = NULL;
2871  	u64 ino1 = 0, ino2 = 0;
2872  	int pathlen1 = 0, pathlen2 = 0;
2873  	bool freepath1 = false, freepath2 = false;
2874  	struct dentry *old_dentry = NULL;
2875  	int len;
2876  	u16 releases;
2877  	void *p, *end;
2878  	int ret;
2879  	bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME);
2880  	bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD,
2881  				     &session->s_features);
2882  
2883  	ret = set_request_path_attr(mdsc, req->r_inode, req->r_dentry,
2884  			      req->r_parent, req->r_path1, req->r_ino1.ino,
2885  			      &path1, &pathlen1, &ino1, &freepath1,
2886  			      test_bit(CEPH_MDS_R_PARENT_LOCKED,
2887  					&req->r_req_flags));
2888  	if (ret < 0) {
2889  		msg = ERR_PTR(ret);
2890  		goto out;
2891  	}
2892  
2893  	/* If r_old_dentry is set, then assume that its parent is locked */
2894  	if (req->r_old_dentry &&
2895  	    !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED))
2896  		old_dentry = req->r_old_dentry;
2897  	ret = set_request_path_attr(mdsc, NULL, old_dentry,
2898  			      req->r_old_dentry_dir,
2899  			      req->r_path2, req->r_ino2.ino,
2900  			      &path2, &pathlen2, &ino2, &freepath2, true);
2901  	if (ret < 0) {
2902  		msg = ERR_PTR(ret);
2903  		goto out_free1;
2904  	}
2905  
2906  	req->r_altname = get_fscrypt_altname(req, &req->r_altname_len);
2907  	if (IS_ERR(req->r_altname)) {
2908  		msg = ERR_CAST(req->r_altname);
2909  		req->r_altname = NULL;
2910  		goto out_free2;
2911  	}
2912  
2913  	/*
2914  	 * For old cephs without supporting the 32bit retry/fwd feature
2915  	 * it will copy the raw memories directly when decoding the
2916  	 * requests. While new cephs will decode the head depending the
2917  	 * version member, so we need to make sure it will be compatible
2918  	 * with them both.
2919  	 */
2920  	if (legacy)
2921  		len = sizeof(struct ceph_mds_request_head_legacy);
2922  	else if (old_version)
2923  		len = sizeof(struct ceph_mds_request_head_old);
2924  	else
2925  		len = sizeof(struct ceph_mds_request_head);
2926  
2927  	/* filepaths */
2928  	len += 2 * (1 + sizeof(u32) + sizeof(u64));
2929  	len += pathlen1 + pathlen2;
2930  
2931  	/* cap releases */
2932  	len += sizeof(struct ceph_mds_request_release) *
2933  		(!!req->r_inode_drop + !!req->r_dentry_drop +
2934  		 !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2935  
2936  	if (req->r_dentry_drop)
2937  		len += pathlen1;
2938  	if (req->r_old_dentry_drop)
2939  		len += pathlen2;
2940  
2941  	/* MClientRequest tail */
2942  
2943  	/* req->r_stamp */
2944  	len += sizeof(struct ceph_timespec);
2945  
2946  	/* gid list */
2947  	len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups);
2948  
2949  	/* alternate name */
2950  	len += sizeof(u32) + req->r_altname_len;
2951  
2952  	/* fscrypt_auth */
2953  	len += sizeof(u32); // fscrypt_auth
2954  	if (req->r_fscrypt_auth)
2955  		len += ceph_fscrypt_auth_len(req->r_fscrypt_auth);
2956  
2957  	/* fscrypt_file */
2958  	len += sizeof(u32);
2959  	if (test_bit(CEPH_MDS_R_FSCRYPT_FILE, &req->r_req_flags))
2960  		len += sizeof(__le64);
2961  
2962  	msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2963  	if (!msg) {
2964  		msg = ERR_PTR(-ENOMEM);
2965  		goto out_free2;
2966  	}
2967  
2968  	msg->hdr.tid = cpu_to_le64(req->r_tid);
2969  
2970  	lhead = find_legacy_request_head(msg->front.iov_base,
2971  					 session->s_con.peer_features);
2972  
2973  	/*
2974  	 * The ceph_mds_request_head_legacy didn't contain a version field, and
2975  	 * one was added when we moved the message version from 3->4.
2976  	 */
2977  	if (legacy) {
2978  		msg->hdr.version = cpu_to_le16(3);
2979  		p = msg->front.iov_base + sizeof(*lhead);
2980  	} else if (old_version) {
2981  		struct ceph_mds_request_head_old *ohead = msg->front.iov_base;
2982  
2983  		msg->hdr.version = cpu_to_le16(4);
2984  		ohead->version = cpu_to_le16(1);
2985  		p = msg->front.iov_base + sizeof(*ohead);
2986  	} else {
2987  		struct ceph_mds_request_head *nhead = msg->front.iov_base;
2988  
2989  		msg->hdr.version = cpu_to_le16(6);
2990  		nhead->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION);
2991  		p = msg->front.iov_base + sizeof(*nhead);
2992  	}
2993  
2994  	end = msg->front.iov_base + msg->front.iov_len;
2995  
2996  	lhead->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2997  	lhead->op = cpu_to_le32(req->r_op);
2998  	lhead->caller_uid = cpu_to_le32(from_kuid(&init_user_ns,
2999  						  req->r_cred->fsuid));
3000  	lhead->caller_gid = cpu_to_le32(from_kgid(&init_user_ns,
3001  						  req->r_cred->fsgid));
3002  	lhead->ino = cpu_to_le64(req->r_deleg_ino);
3003  	lhead->args = req->r_args;
3004  
3005  	ceph_encode_filepath(&p, end, ino1, path1);
3006  	ceph_encode_filepath(&p, end, ino2, path2);
3007  
3008  	/* make note of release offset, in case we need to replay */
3009  	req->r_request_release_offset = p - msg->front.iov_base;
3010  
3011  	/* cap releases */
3012  	releases = 0;
3013  	if (req->r_inode_drop)
3014  		releases += ceph_encode_inode_release(&p,
3015  		      req->r_inode ? req->r_inode : d_inode(req->r_dentry),
3016  		      mds, req->r_inode_drop, req->r_inode_unless,
3017  		      req->r_op == CEPH_MDS_OP_READDIR);
3018  	if (req->r_dentry_drop) {
3019  		ret = ceph_encode_dentry_release(&p, req->r_dentry,
3020  				req->r_parent, mds, req->r_dentry_drop,
3021  				req->r_dentry_unless);
3022  		if (ret < 0)
3023  			goto out_err;
3024  		releases += ret;
3025  	}
3026  	if (req->r_old_dentry_drop) {
3027  		ret = ceph_encode_dentry_release(&p, req->r_old_dentry,
3028  				req->r_old_dentry_dir, mds,
3029  				req->r_old_dentry_drop,
3030  				req->r_old_dentry_unless);
3031  		if (ret < 0)
3032  			goto out_err;
3033  		releases += ret;
3034  	}
3035  	if (req->r_old_inode_drop)
3036  		releases += ceph_encode_inode_release(&p,
3037  		      d_inode(req->r_old_dentry),
3038  		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
3039  
3040  	if (drop_cap_releases) {
3041  		releases = 0;
3042  		p = msg->front.iov_base + req->r_request_release_offset;
3043  	}
3044  
3045  	lhead->num_releases = cpu_to_le16(releases);
3046  
3047  	encode_mclientrequest_tail(&p, req);
3048  
3049  	if (WARN_ON_ONCE(p > end)) {
3050  		ceph_msg_put(msg);
3051  		msg = ERR_PTR(-ERANGE);
3052  		goto out_free2;
3053  	}
3054  
3055  	msg->front.iov_len = p - msg->front.iov_base;
3056  	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3057  
3058  	if (req->r_pagelist) {
3059  		struct ceph_pagelist *pagelist = req->r_pagelist;
3060  		ceph_msg_data_add_pagelist(msg, pagelist);
3061  		msg->hdr.data_len = cpu_to_le32(pagelist->length);
3062  	} else {
3063  		msg->hdr.data_len = 0;
3064  	}
3065  
3066  	msg->hdr.data_off = cpu_to_le16(0);
3067  
3068  out_free2:
3069  	if (freepath2)
3070  		ceph_mdsc_free_path((char *)path2, pathlen2);
3071  out_free1:
3072  	if (freepath1)
3073  		ceph_mdsc_free_path((char *)path1, pathlen1);
3074  out:
3075  	return msg;
3076  out_err:
3077  	ceph_msg_put(msg);
3078  	msg = ERR_PTR(ret);
3079  	goto out_free2;
3080  }
3081  
3082  /*
3083   * called under mdsc->mutex if error, under no mutex if
3084   * success.
3085   */
complete_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)3086  static void complete_request(struct ceph_mds_client *mdsc,
3087  			     struct ceph_mds_request *req)
3088  {
3089  	req->r_end_latency = ktime_get();
3090  
3091  	if (req->r_callback)
3092  		req->r_callback(mdsc, req);
3093  	complete_all(&req->r_completion);
3094  }
3095  
3096  /*
3097   * called under mdsc->mutex
3098   */
__prepare_send_request(struct ceph_mds_session * session,struct ceph_mds_request * req,bool drop_cap_releases)3099  static int __prepare_send_request(struct ceph_mds_session *session,
3100  				  struct ceph_mds_request *req,
3101  				  bool drop_cap_releases)
3102  {
3103  	int mds = session->s_mds;
3104  	struct ceph_mds_client *mdsc = session->s_mdsc;
3105  	struct ceph_mds_request_head_legacy *lhead;
3106  	struct ceph_mds_request_head *nhead;
3107  	struct ceph_msg *msg;
3108  	int flags = 0, old_max_retry;
3109  	bool old_version = !test_bit(CEPHFS_FEATURE_32BITS_RETRY_FWD,
3110  				     &session->s_features);
3111  
3112  	/*
3113  	 * Avoid inifinite retrying after overflow. The client will
3114  	 * increase the retry count and if the MDS is old version,
3115  	 * so we limit to retry at most 256 times.
3116  	 */
3117  	if (req->r_attempts) {
3118  	       old_max_retry = sizeof_field(struct ceph_mds_request_head_old,
3119  					    num_retry);
3120  	       old_max_retry = 1 << (old_max_retry * BITS_PER_BYTE);
3121  	       if ((old_version && req->r_attempts >= old_max_retry) ||
3122  		   ((uint32_t)req->r_attempts >= U32_MAX)) {
3123  			pr_warn_ratelimited("%s request tid %llu seq overflow\n",
3124  					    __func__, req->r_tid);
3125  			return -EMULTIHOP;
3126  	       }
3127  	}
3128  
3129  	req->r_attempts++;
3130  	if (req->r_inode) {
3131  		struct ceph_cap *cap =
3132  			ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
3133  
3134  		if (cap)
3135  			req->r_sent_on_mseq = cap->mseq;
3136  		else
3137  			req->r_sent_on_mseq = -1;
3138  	}
3139  	dout("%s %p tid %lld %s (attempt %d)\n", __func__, req,
3140  	     req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
3141  
3142  	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3143  		void *p;
3144  
3145  		/*
3146  		 * Replay.  Do not regenerate message (and rebuild
3147  		 * paths, etc.); just use the original message.
3148  		 * Rebuilding paths will break for renames because
3149  		 * d_move mangles the src name.
3150  		 */
3151  		msg = req->r_request;
3152  		lhead = find_legacy_request_head(msg->front.iov_base,
3153  						 session->s_con.peer_features);
3154  
3155  		flags = le32_to_cpu(lhead->flags);
3156  		flags |= CEPH_MDS_FLAG_REPLAY;
3157  		lhead->flags = cpu_to_le32(flags);
3158  
3159  		if (req->r_target_inode)
3160  			lhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
3161  
3162  		lhead->num_retry = req->r_attempts - 1;
3163  		if (!old_version) {
3164  			nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3165  			nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3166  		}
3167  
3168  		/* remove cap/dentry releases from message */
3169  		lhead->num_releases = 0;
3170  
3171  		p = msg->front.iov_base + req->r_request_release_offset;
3172  		encode_mclientrequest_tail(&p, req);
3173  
3174  		msg->front.iov_len = p - msg->front.iov_base;
3175  		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
3176  		return 0;
3177  	}
3178  
3179  	if (req->r_request) {
3180  		ceph_msg_put(req->r_request);
3181  		req->r_request = NULL;
3182  	}
3183  	msg = create_request_message(session, req, drop_cap_releases);
3184  	if (IS_ERR(msg)) {
3185  		req->r_err = PTR_ERR(msg);
3186  		return PTR_ERR(msg);
3187  	}
3188  	req->r_request = msg;
3189  
3190  	lhead = find_legacy_request_head(msg->front.iov_base,
3191  					 session->s_con.peer_features);
3192  	lhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
3193  	if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3194  		flags |= CEPH_MDS_FLAG_REPLAY;
3195  	if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
3196  		flags |= CEPH_MDS_FLAG_ASYNC;
3197  	if (req->r_parent)
3198  		flags |= CEPH_MDS_FLAG_WANT_DENTRY;
3199  	lhead->flags = cpu_to_le32(flags);
3200  	lhead->num_fwd = req->r_num_fwd;
3201  	lhead->num_retry = req->r_attempts - 1;
3202  	if (!old_version) {
3203  		nhead = (struct ceph_mds_request_head*)msg->front.iov_base;
3204  		nhead->ext_num_fwd = cpu_to_le32(req->r_num_fwd);
3205  		nhead->ext_num_retry = cpu_to_le32(req->r_attempts - 1);
3206  	}
3207  
3208  	dout(" r_parent = %p\n", req->r_parent);
3209  	return 0;
3210  }
3211  
3212  /*
3213   * called under mdsc->mutex
3214   */
__send_request(struct ceph_mds_session * session,struct ceph_mds_request * req,bool drop_cap_releases)3215  static int __send_request(struct ceph_mds_session *session,
3216  			  struct ceph_mds_request *req,
3217  			  bool drop_cap_releases)
3218  {
3219  	int err;
3220  
3221  	err = __prepare_send_request(session, req, drop_cap_releases);
3222  	if (!err) {
3223  		ceph_msg_get(req->r_request);
3224  		ceph_con_send(&session->s_con, req->r_request);
3225  	}
3226  
3227  	return err;
3228  }
3229  
3230  /*
3231   * send request, or put it on the appropriate wait list.
3232   */
__do_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req)3233  static void __do_request(struct ceph_mds_client *mdsc,
3234  			struct ceph_mds_request *req)
3235  {
3236  	struct ceph_mds_session *session = NULL;
3237  	int mds = -1;
3238  	int err = 0;
3239  	bool random;
3240  
3241  	if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3242  		if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
3243  			__unregister_request(mdsc, req);
3244  		return;
3245  	}
3246  
3247  	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) {
3248  		dout("do_request metadata corrupted\n");
3249  		err = -EIO;
3250  		goto finish;
3251  	}
3252  	if (req->r_timeout &&
3253  	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
3254  		dout("do_request timed out\n");
3255  		err = -ETIMEDOUT;
3256  		goto finish;
3257  	}
3258  	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
3259  		dout("do_request forced umount\n");
3260  		err = -EIO;
3261  		goto finish;
3262  	}
3263  	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
3264  		if (mdsc->mdsmap_err) {
3265  			err = mdsc->mdsmap_err;
3266  			dout("do_request mdsmap err %d\n", err);
3267  			goto finish;
3268  		}
3269  		if (mdsc->mdsmap->m_epoch == 0) {
3270  			dout("do_request no mdsmap, waiting for map\n");
3271  			list_add(&req->r_wait, &mdsc->waiting_for_map);
3272  			return;
3273  		}
3274  		if (!(mdsc->fsc->mount_options->flags &
3275  		      CEPH_MOUNT_OPT_MOUNTWAIT) &&
3276  		    !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
3277  			err = -EHOSTUNREACH;
3278  			goto finish;
3279  		}
3280  	}
3281  
3282  	put_request_session(req);
3283  
3284  	mds = __choose_mds(mdsc, req, &random);
3285  	if (mds < 0 ||
3286  	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
3287  		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3288  			err = -EJUKEBOX;
3289  			goto finish;
3290  		}
3291  		dout("do_request no mds or not active, waiting for map\n");
3292  		list_add(&req->r_wait, &mdsc->waiting_for_map);
3293  		return;
3294  	}
3295  
3296  	/* get, open session */
3297  	session = __ceph_lookup_mds_session(mdsc, mds);
3298  	if (!session) {
3299  		session = register_session(mdsc, mds);
3300  		if (IS_ERR(session)) {
3301  			err = PTR_ERR(session);
3302  			goto finish;
3303  		}
3304  	}
3305  	req->r_session = ceph_get_mds_session(session);
3306  
3307  	dout("do_request mds%d session %p state %s\n", mds, session,
3308  	     ceph_session_state_name(session->s_state));
3309  
3310  	/*
3311  	 * The old ceph will crash the MDSs when see unknown OPs
3312  	 */
3313  	if (req->r_feature_needed > 0 &&
3314  	    !test_bit(req->r_feature_needed, &session->s_features)) {
3315  		err = -EOPNOTSUPP;
3316  		goto out_session;
3317  	}
3318  
3319  	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
3320  	    session->s_state != CEPH_MDS_SESSION_HUNG) {
3321  		/*
3322  		 * We cannot queue async requests since the caps and delegated
3323  		 * inodes are bound to the session. Just return -EJUKEBOX and
3324  		 * let the caller retry a sync request in that case.
3325  		 */
3326  		if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
3327  			err = -EJUKEBOX;
3328  			goto out_session;
3329  		}
3330  
3331  		/*
3332  		 * If the session has been REJECTED, then return a hard error,
3333  		 * unless it's a CLEANRECOVER mount, in which case we'll queue
3334  		 * it to the mdsc queue.
3335  		 */
3336  		if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
3337  			if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER))
3338  				list_add(&req->r_wait, &mdsc->waiting_for_map);
3339  			else
3340  				err = -EACCES;
3341  			goto out_session;
3342  		}
3343  
3344  		if (session->s_state == CEPH_MDS_SESSION_NEW ||
3345  		    session->s_state == CEPH_MDS_SESSION_CLOSING) {
3346  			err = __open_session(mdsc, session);
3347  			if (err)
3348  				goto out_session;
3349  			/* retry the same mds later */
3350  			if (random)
3351  				req->r_resend_mds = mds;
3352  		}
3353  		list_add(&req->r_wait, &session->s_waiting);
3354  		goto out_session;
3355  	}
3356  
3357  	/* send request */
3358  	req->r_resend_mds = -1;   /* forget any previous mds hint */
3359  
3360  	if (req->r_request_started == 0)   /* note request start time */
3361  		req->r_request_started = jiffies;
3362  
3363  	/*
3364  	 * For async create we will choose the auth MDS of frag in parent
3365  	 * directory to send the request and ususally this works fine, but
3366  	 * if the migrated the dirtory to another MDS before it could handle
3367  	 * it the request will be forwarded.
3368  	 *
3369  	 * And then the auth cap will be changed.
3370  	 */
3371  	if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) {
3372  		struct ceph_dentry_info *di = ceph_dentry(req->r_dentry);
3373  		struct ceph_inode_info *ci;
3374  		struct ceph_cap *cap;
3375  
3376  		/*
3377  		 * The request maybe handled very fast and the new inode
3378  		 * hasn't been linked to the dentry yet. We need to wait
3379  		 * for the ceph_finish_async_create(), which shouldn't be
3380  		 * stuck too long or fail in thoery, to finish when forwarding
3381  		 * the request.
3382  		 */
3383  		if (!d_inode(req->r_dentry)) {
3384  			err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT,
3385  					  TASK_KILLABLE);
3386  			if (err) {
3387  				mutex_lock(&req->r_fill_mutex);
3388  				set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3389  				mutex_unlock(&req->r_fill_mutex);
3390  				goto out_session;
3391  			}
3392  		}
3393  
3394  		ci = ceph_inode(d_inode(req->r_dentry));
3395  
3396  		spin_lock(&ci->i_ceph_lock);
3397  		cap = ci->i_auth_cap;
3398  		if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) {
3399  			dout("do_request session changed for auth cap %d -> %d\n",
3400  			     cap->session->s_mds, session->s_mds);
3401  
3402  			/* Remove the auth cap from old session */
3403  			spin_lock(&cap->session->s_cap_lock);
3404  			cap->session->s_nr_caps--;
3405  			list_del_init(&cap->session_caps);
3406  			spin_unlock(&cap->session->s_cap_lock);
3407  
3408  			/* Add the auth cap to the new session */
3409  			cap->mds = mds;
3410  			cap->session = session;
3411  			spin_lock(&session->s_cap_lock);
3412  			session->s_nr_caps++;
3413  			list_add_tail(&cap->session_caps, &session->s_caps);
3414  			spin_unlock(&session->s_cap_lock);
3415  
3416  			change_auth_cap_ses(ci, session);
3417  		}
3418  		spin_unlock(&ci->i_ceph_lock);
3419  	}
3420  
3421  	err = __send_request(session, req, false);
3422  
3423  out_session:
3424  	ceph_put_mds_session(session);
3425  finish:
3426  	if (err) {
3427  		dout("__do_request early error %d\n", err);
3428  		req->r_err = err;
3429  		complete_request(mdsc, req);
3430  		__unregister_request(mdsc, req);
3431  	}
3432  	return;
3433  }
3434  
3435  /*
3436   * called under mdsc->mutex
3437   */
__wake_requests(struct ceph_mds_client * mdsc,struct list_head * head)3438  static void __wake_requests(struct ceph_mds_client *mdsc,
3439  			    struct list_head *head)
3440  {
3441  	struct ceph_mds_request *req;
3442  	LIST_HEAD(tmp_list);
3443  
3444  	list_splice_init(head, &tmp_list);
3445  
3446  	while (!list_empty(&tmp_list)) {
3447  		req = list_entry(tmp_list.next,
3448  				 struct ceph_mds_request, r_wait);
3449  		list_del_init(&req->r_wait);
3450  		dout(" wake request %p tid %llu\n", req, req->r_tid);
3451  		__do_request(mdsc, req);
3452  	}
3453  }
3454  
3455  /*
3456   * Wake up threads with requests pending for @mds, so that they can
3457   * resubmit their requests to a possibly different mds.
3458   */
kick_requests(struct ceph_mds_client * mdsc,int mds)3459  static void kick_requests(struct ceph_mds_client *mdsc, int mds)
3460  {
3461  	struct ceph_mds_request *req;
3462  	struct rb_node *p = rb_first(&mdsc->request_tree);
3463  
3464  	dout("kick_requests mds%d\n", mds);
3465  	while (p) {
3466  		req = rb_entry(p, struct ceph_mds_request, r_node);
3467  		p = rb_next(p);
3468  		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3469  			continue;
3470  		if (req->r_attempts > 0)
3471  			continue; /* only new requests */
3472  		if (req->r_session &&
3473  		    req->r_session->s_mds == mds) {
3474  			dout(" kicking tid %llu\n", req->r_tid);
3475  			list_del_init(&req->r_wait);
3476  			__do_request(mdsc, req);
3477  		}
3478  	}
3479  }
3480  
ceph_mdsc_submit_request(struct ceph_mds_client * mdsc,struct inode * dir,struct ceph_mds_request * req)3481  int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
3482  			      struct ceph_mds_request *req)
3483  {
3484  	int err = 0;
3485  
3486  	/* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
3487  	if (req->r_inode)
3488  		ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
3489  	if (req->r_parent) {
3490  		struct ceph_inode_info *ci = ceph_inode(req->r_parent);
3491  		int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
3492  			    CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
3493  		spin_lock(&ci->i_ceph_lock);
3494  		ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
3495  		__ceph_touch_fmode(ci, mdsc, fmode);
3496  		spin_unlock(&ci->i_ceph_lock);
3497  	}
3498  	if (req->r_old_dentry_dir)
3499  		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
3500  				  CEPH_CAP_PIN);
3501  
3502  	if (req->r_inode) {
3503  		err = ceph_wait_on_async_create(req->r_inode);
3504  		if (err) {
3505  			dout("%s: wait for async create returned: %d\n",
3506  			     __func__, err);
3507  			return err;
3508  		}
3509  	}
3510  
3511  	if (!err && req->r_old_inode) {
3512  		err = ceph_wait_on_async_create(req->r_old_inode);
3513  		if (err) {
3514  			dout("%s: wait for async create returned: %d\n",
3515  			     __func__, err);
3516  			return err;
3517  		}
3518  	}
3519  
3520  	dout("submit_request on %p for inode %p\n", req, dir);
3521  	mutex_lock(&mdsc->mutex);
3522  	__register_request(mdsc, req, dir);
3523  	__do_request(mdsc, req);
3524  	err = req->r_err;
3525  	mutex_unlock(&mdsc->mutex);
3526  	return err;
3527  }
3528  
ceph_mdsc_wait_request(struct ceph_mds_client * mdsc,struct ceph_mds_request * req,ceph_mds_request_wait_callback_t wait_func)3529  int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3530  			   struct ceph_mds_request *req,
3531  			   ceph_mds_request_wait_callback_t wait_func)
3532  {
3533  	int err;
3534  
3535  	/* wait */
3536  	dout("do_request waiting\n");
3537  	if (wait_func) {
3538  		err = wait_func(mdsc, req);
3539  	} else {
3540  		long timeleft = wait_for_completion_killable_timeout(
3541  					&req->r_completion,
3542  					ceph_timeout_jiffies(req->r_timeout));
3543  		if (timeleft > 0)
3544  			err = 0;
3545  		else if (!timeleft)
3546  			err = -ETIMEDOUT;  /* timed out */
3547  		else
3548  			err = timeleft;  /* killed */
3549  	}
3550  	dout("do_request waited, got %d\n", err);
3551  	mutex_lock(&mdsc->mutex);
3552  
3553  	/* only abort if we didn't race with a real reply */
3554  	if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3555  		err = le32_to_cpu(req->r_reply_info.head->result);
3556  	} else if (err < 0) {
3557  		dout("aborted request %lld with %d\n", req->r_tid, err);
3558  
3559  		/*
3560  		 * ensure we aren't running concurrently with
3561  		 * ceph_fill_trace or ceph_readdir_prepopulate, which
3562  		 * rely on locks (dir mutex) held by our caller.
3563  		 */
3564  		mutex_lock(&req->r_fill_mutex);
3565  		req->r_err = err;
3566  		set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3567  		mutex_unlock(&req->r_fill_mutex);
3568  
3569  		if (req->r_parent &&
3570  		    (req->r_op & CEPH_MDS_OP_WRITE))
3571  			ceph_invalidate_dir_request(req);
3572  	} else {
3573  		err = req->r_err;
3574  	}
3575  
3576  	mutex_unlock(&mdsc->mutex);
3577  	return err;
3578  }
3579  
3580  /*
3581   * Synchrously perform an mds request.  Take care of all of the
3582   * session setup, forwarding, retry details.
3583   */
ceph_mdsc_do_request(struct ceph_mds_client * mdsc,struct inode * dir,struct ceph_mds_request * req)3584  int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3585  			 struct inode *dir,
3586  			 struct ceph_mds_request *req)
3587  {
3588  	int err;
3589  
3590  	dout("do_request on %p\n", req);
3591  
3592  	/* issue */
3593  	err = ceph_mdsc_submit_request(mdsc, dir, req);
3594  	if (!err)
3595  		err = ceph_mdsc_wait_request(mdsc, req, NULL);
3596  	dout("do_request %p done, result %d\n", req, err);
3597  	return err;
3598  }
3599  
3600  /*
3601   * Invalidate dir's completeness, dentry lease state on an aborted MDS
3602   * namespace request.
3603   */
ceph_invalidate_dir_request(struct ceph_mds_request * req)3604  void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3605  {
3606  	struct inode *dir = req->r_parent;
3607  	struct inode *old_dir = req->r_old_dentry_dir;
3608  
3609  	dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3610  
3611  	ceph_dir_clear_complete(dir);
3612  	if (old_dir)
3613  		ceph_dir_clear_complete(old_dir);
3614  	if (req->r_dentry)
3615  		ceph_invalidate_dentry_lease(req->r_dentry);
3616  	if (req->r_old_dentry)
3617  		ceph_invalidate_dentry_lease(req->r_old_dentry);
3618  }
3619  
3620  /*
3621   * Handle mds reply.
3622   *
3623   * We take the session mutex and parse and process the reply immediately.
3624   * This preserves the logical ordering of replies, capabilities, etc., sent
3625   * by the MDS as they are applied to our local cache.
3626   */
handle_reply(struct ceph_mds_session * session,struct ceph_msg * msg)3627  static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3628  {
3629  	struct ceph_mds_client *mdsc = session->s_mdsc;
3630  	struct ceph_mds_request *req;
3631  	struct ceph_mds_reply_head *head = msg->front.iov_base;
3632  	struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
3633  	struct ceph_snap_realm *realm;
3634  	u64 tid;
3635  	int err, result;
3636  	int mds = session->s_mds;
3637  	bool close_sessions = false;
3638  
3639  	if (msg->front.iov_len < sizeof(*head)) {
3640  		pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3641  		ceph_msg_dump(msg);
3642  		return;
3643  	}
3644  
3645  	/* get request, session */
3646  	tid = le64_to_cpu(msg->hdr.tid);
3647  	mutex_lock(&mdsc->mutex);
3648  	req = lookup_get_request(mdsc, tid);
3649  	if (!req) {
3650  		dout("handle_reply on unknown tid %llu\n", tid);
3651  		mutex_unlock(&mdsc->mutex);
3652  		return;
3653  	}
3654  	dout("handle_reply %p\n", req);
3655  
3656  	/* correct session? */
3657  	if (req->r_session != session) {
3658  		pr_err("mdsc_handle_reply got %llu on session mds%d"
3659  		       " not mds%d\n", tid, session->s_mds,
3660  		       req->r_session ? req->r_session->s_mds : -1);
3661  		mutex_unlock(&mdsc->mutex);
3662  		goto out;
3663  	}
3664  
3665  	/* dup? */
3666  	if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3667  	    (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3668  		pr_warn("got a dup %s reply on %llu from mds%d\n",
3669  			   head->safe ? "safe" : "unsafe", tid, mds);
3670  		mutex_unlock(&mdsc->mutex);
3671  		goto out;
3672  	}
3673  	if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3674  		pr_warn("got unsafe after safe on %llu from mds%d\n",
3675  			   tid, mds);
3676  		mutex_unlock(&mdsc->mutex);
3677  		goto out;
3678  	}
3679  
3680  	result = le32_to_cpu(head->result);
3681  
3682  	if (head->safe) {
3683  		set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3684  		__unregister_request(mdsc, req);
3685  
3686  		/* last request during umount? */
3687  		if (mdsc->stopping && !__get_oldest_req(mdsc))
3688  			complete_all(&mdsc->safe_umount_waiters);
3689  
3690  		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3691  			/*
3692  			 * We already handled the unsafe response, now do the
3693  			 * cleanup.  No need to examine the response; the MDS
3694  			 * doesn't include any result info in the safe
3695  			 * response.  And even if it did, there is nothing
3696  			 * useful we could do with a revised return value.
3697  			 */
3698  			dout("got safe reply %llu, mds%d\n", tid, mds);
3699  
3700  			mutex_unlock(&mdsc->mutex);
3701  			goto out;
3702  		}
3703  	} else {
3704  		set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3705  		list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3706  	}
3707  
3708  	dout("handle_reply tid %lld result %d\n", tid, result);
3709  	if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3710  		err = parse_reply_info(session, msg, req, (u64)-1);
3711  	else
3712  		err = parse_reply_info(session, msg, req,
3713  				       session->s_con.peer_features);
3714  	mutex_unlock(&mdsc->mutex);
3715  
3716  	/* Must find target inode outside of mutexes to avoid deadlocks */
3717  	rinfo = &req->r_reply_info;
3718  	if ((err >= 0) && rinfo->head->is_target) {
3719  		struct inode *in = xchg(&req->r_new_inode, NULL);
3720  		struct ceph_vino tvino = {
3721  			.ino  = le64_to_cpu(rinfo->targeti.in->ino),
3722  			.snap = le64_to_cpu(rinfo->targeti.in->snapid)
3723  		};
3724  
3725  		/*
3726  		 * If we ended up opening an existing inode, discard
3727  		 * r_new_inode
3728  		 */
3729  		if (req->r_op == CEPH_MDS_OP_CREATE &&
3730  		    !req->r_reply_info.has_create_ino) {
3731  			/* This should never happen on an async create */
3732  			WARN_ON_ONCE(req->r_deleg_ino);
3733  			iput(in);
3734  			in = NULL;
3735  		}
3736  
3737  		in = ceph_get_inode(mdsc->fsc->sb, tvino, in);
3738  		if (IS_ERR(in)) {
3739  			err = PTR_ERR(in);
3740  			mutex_lock(&session->s_mutex);
3741  			goto out_err;
3742  		}
3743  		req->r_target_inode = in;
3744  	}
3745  
3746  	mutex_lock(&session->s_mutex);
3747  	if (err < 0) {
3748  		pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3749  		ceph_msg_dump(msg);
3750  		goto out_err;
3751  	}
3752  
3753  	/* snap trace */
3754  	realm = NULL;
3755  	if (rinfo->snapblob_len) {
3756  		down_write(&mdsc->snap_rwsem);
3757  		err = ceph_update_snap_trace(mdsc, rinfo->snapblob,
3758  				rinfo->snapblob + rinfo->snapblob_len,
3759  				le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3760  				&realm);
3761  		if (err) {
3762  			up_write(&mdsc->snap_rwsem);
3763  			close_sessions = true;
3764  			if (err == -EIO)
3765  				ceph_msg_dump(msg);
3766  			goto out_err;
3767  		}
3768  		downgrade_write(&mdsc->snap_rwsem);
3769  	} else {
3770  		down_read(&mdsc->snap_rwsem);
3771  	}
3772  
3773  	/* insert trace into our cache */
3774  	mutex_lock(&req->r_fill_mutex);
3775  	current->journal_info = req;
3776  	err = ceph_fill_trace(mdsc->fsc->sb, req);
3777  	if (err == 0) {
3778  		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3779  				    req->r_op == CEPH_MDS_OP_LSSNAP))
3780  			err = ceph_readdir_prepopulate(req, req->r_session);
3781  	}
3782  	current->journal_info = NULL;
3783  	mutex_unlock(&req->r_fill_mutex);
3784  
3785  	up_read(&mdsc->snap_rwsem);
3786  	if (realm)
3787  		ceph_put_snap_realm(mdsc, realm);
3788  
3789  	if (err == 0) {
3790  		if (req->r_target_inode &&
3791  		    test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3792  			struct ceph_inode_info *ci =
3793  				ceph_inode(req->r_target_inode);
3794  			spin_lock(&ci->i_unsafe_lock);
3795  			list_add_tail(&req->r_unsafe_target_item,
3796  				      &ci->i_unsafe_iops);
3797  			spin_unlock(&ci->i_unsafe_lock);
3798  		}
3799  
3800  		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3801  	}
3802  out_err:
3803  	mutex_lock(&mdsc->mutex);
3804  	if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3805  		if (err) {
3806  			req->r_err = err;
3807  		} else {
3808  			req->r_reply =  ceph_msg_get(msg);
3809  			set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3810  		}
3811  	} else {
3812  		dout("reply arrived after request %lld was aborted\n", tid);
3813  	}
3814  	mutex_unlock(&mdsc->mutex);
3815  
3816  	mutex_unlock(&session->s_mutex);
3817  
3818  	/* kick calling process */
3819  	complete_request(mdsc, req);
3820  
3821  	ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency,
3822  				     req->r_end_latency, err);
3823  out:
3824  	ceph_mdsc_put_request(req);
3825  
3826  	/* Defer closing the sessions after s_mutex lock being released */
3827  	if (close_sessions)
3828  		ceph_mdsc_close_sessions(mdsc);
3829  	return;
3830  }
3831  
3832  
3833  
3834  /*
3835   * handle mds notification that our request has been forwarded.
3836   */
handle_forward(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_msg * msg)3837  static void handle_forward(struct ceph_mds_client *mdsc,
3838  			   struct ceph_mds_session *session,
3839  			   struct ceph_msg *msg)
3840  {
3841  	struct ceph_mds_request *req;
3842  	u64 tid = le64_to_cpu(msg->hdr.tid);
3843  	u32 next_mds;
3844  	u32 fwd_seq;
3845  	int err = -EINVAL;
3846  	void *p = msg->front.iov_base;
3847  	void *end = p + msg->front.iov_len;
3848  	bool aborted = false;
3849  
3850  	ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3851  	next_mds = ceph_decode_32(&p);
3852  	fwd_seq = ceph_decode_32(&p);
3853  
3854  	mutex_lock(&mdsc->mutex);
3855  	req = lookup_get_request(mdsc, tid);
3856  	if (!req) {
3857  		mutex_unlock(&mdsc->mutex);
3858  		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3859  		return;  /* dup reply? */
3860  	}
3861  
3862  	if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3863  		dout("forward tid %llu aborted, unregistering\n", tid);
3864  		__unregister_request(mdsc, req);
3865  	} else if (fwd_seq <= req->r_num_fwd || (uint32_t)fwd_seq >= U32_MAX) {
3866  		/*
3867  		 * Avoid inifinite retrying after overflow.
3868  		 *
3869  		 * The MDS will increase the fwd count and in client side
3870  		 * if the num_fwd is less than the one saved in request
3871  		 * that means the MDS is an old version and overflowed of
3872  		 * 8 bits.
3873  		 */
3874  		mutex_lock(&req->r_fill_mutex);
3875  		req->r_err = -EMULTIHOP;
3876  		set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3877  		mutex_unlock(&req->r_fill_mutex);
3878  		aborted = true;
3879  		pr_warn_ratelimited("forward tid %llu seq overflow\n", tid);
3880  	} else {
3881  		/* resend. forward race not possible; mds would drop */
3882  		dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3883  		BUG_ON(req->r_err);
3884  		BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3885  		req->r_attempts = 0;
3886  		req->r_num_fwd = fwd_seq;
3887  		req->r_resend_mds = next_mds;
3888  		put_request_session(req);
3889  		__do_request(mdsc, req);
3890  	}
3891  	mutex_unlock(&mdsc->mutex);
3892  
3893  	/* kick calling process */
3894  	if (aborted)
3895  		complete_request(mdsc, req);
3896  	ceph_mdsc_put_request(req);
3897  	return;
3898  
3899  bad:
3900  	pr_err("mdsc_handle_forward decode error err=%d\n", err);
3901  	ceph_msg_dump(msg);
3902  }
3903  
__decode_session_metadata(void ** p,void * end,bool * blocklisted)3904  static int __decode_session_metadata(void **p, void *end,
3905  				     bool *blocklisted)
3906  {
3907  	/* map<string,string> */
3908  	u32 n;
3909  	bool err_str;
3910  	ceph_decode_32_safe(p, end, n, bad);
3911  	while (n-- > 0) {
3912  		u32 len;
3913  		ceph_decode_32_safe(p, end, len, bad);
3914  		ceph_decode_need(p, end, len, bad);
3915  		err_str = !strncmp(*p, "error_string", len);
3916  		*p += len;
3917  		ceph_decode_32_safe(p, end, len, bad);
3918  		ceph_decode_need(p, end, len, bad);
3919  		/*
3920  		 * Match "blocklisted (blacklisted)" from newer MDSes,
3921  		 * or "blacklisted" from older MDSes.
3922  		 */
3923  		if (err_str && strnstr(*p, "blacklisted", len))
3924  			*blocklisted = true;
3925  		*p += len;
3926  	}
3927  	return 0;
3928  bad:
3929  	return -1;
3930  }
3931  
3932  /*
3933   * handle a mds session control message
3934   */
handle_session(struct ceph_mds_session * session,struct ceph_msg * msg)3935  static void handle_session(struct ceph_mds_session *session,
3936  			   struct ceph_msg *msg)
3937  {
3938  	struct ceph_mds_client *mdsc = session->s_mdsc;
3939  	int mds = session->s_mds;
3940  	int msg_version = le16_to_cpu(msg->hdr.version);
3941  	void *p = msg->front.iov_base;
3942  	void *end = p + msg->front.iov_len;
3943  	struct ceph_mds_session_head *h;
3944  	u32 op;
3945  	u64 seq, features = 0;
3946  	int wake = 0;
3947  	bool blocklisted = false;
3948  
3949  	/* decode */
3950  	ceph_decode_need(&p, end, sizeof(*h), bad);
3951  	h = p;
3952  	p += sizeof(*h);
3953  
3954  	op = le32_to_cpu(h->op);
3955  	seq = le64_to_cpu(h->seq);
3956  
3957  	if (msg_version >= 3) {
3958  		u32 len;
3959  		/* version >= 2 and < 5, decode metadata, skip otherwise
3960  		 * as it's handled via flags.
3961  		 */
3962  		if (msg_version >= 5)
3963  			ceph_decode_skip_map(&p, end, string, string, bad);
3964  		else if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3965  			goto bad;
3966  
3967  		/* version >= 3, feature bits */
3968  		ceph_decode_32_safe(&p, end, len, bad);
3969  		if (len) {
3970  			ceph_decode_64_safe(&p, end, features, bad);
3971  			p += len - sizeof(features);
3972  		}
3973  	}
3974  
3975  	if (msg_version >= 5) {
3976  		u32 flags, len;
3977  
3978  		/* version >= 4 */
3979  		ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */
3980  		ceph_decode_32_safe(&p, end, len, bad); /* len */
3981  		ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */
3982  
3983  		/* version >= 5, flags   */
3984  		ceph_decode_32_safe(&p, end, flags, bad);
3985  		if (flags & CEPH_SESSION_BLOCKLISTED) {
3986  			pr_warn("mds%d session blocklisted\n", session->s_mds);
3987  			blocklisted = true;
3988  		}
3989  	}
3990  
3991  	mutex_lock(&mdsc->mutex);
3992  	if (op == CEPH_SESSION_CLOSE) {
3993  		ceph_get_mds_session(session);
3994  		__unregister_session(mdsc, session);
3995  	}
3996  	/* FIXME: this ttl calculation is generous */
3997  	session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3998  	mutex_unlock(&mdsc->mutex);
3999  
4000  	mutex_lock(&session->s_mutex);
4001  
4002  	dout("handle_session mds%d %s %p state %s seq %llu\n",
4003  	     mds, ceph_session_op_name(op), session,
4004  	     ceph_session_state_name(session->s_state), seq);
4005  
4006  	if (session->s_state == CEPH_MDS_SESSION_HUNG) {
4007  		session->s_state = CEPH_MDS_SESSION_OPEN;
4008  		pr_info("mds%d came back\n", session->s_mds);
4009  	}
4010  
4011  	switch (op) {
4012  	case CEPH_SESSION_OPEN:
4013  		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4014  			pr_info("mds%d reconnect success\n", session->s_mds);
4015  
4016  		session->s_features = features;
4017  		if (session->s_state == CEPH_MDS_SESSION_OPEN) {
4018  			pr_notice("mds%d is already opened\n", session->s_mds);
4019  		} else {
4020  			session->s_state = CEPH_MDS_SESSION_OPEN;
4021  			renewed_caps(mdsc, session, 0);
4022  			if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
4023  				     &session->s_features))
4024  				metric_schedule_delayed(&mdsc->metric);
4025  		}
4026  
4027  		/*
4028  		 * The connection maybe broken and the session in client
4029  		 * side has been reinitialized, need to update the seq
4030  		 * anyway.
4031  		 */
4032  		if (!session->s_seq && seq)
4033  			session->s_seq = seq;
4034  
4035  		wake = 1;
4036  		if (mdsc->stopping)
4037  			__close_session(mdsc, session);
4038  		break;
4039  
4040  	case CEPH_SESSION_RENEWCAPS:
4041  		if (session->s_renew_seq == seq)
4042  			renewed_caps(mdsc, session, 1);
4043  		break;
4044  
4045  	case CEPH_SESSION_CLOSE:
4046  		if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
4047  			pr_info("mds%d reconnect denied\n", session->s_mds);
4048  		session->s_state = CEPH_MDS_SESSION_CLOSED;
4049  		cleanup_session_requests(mdsc, session);
4050  		remove_session_caps(session);
4051  		wake = 2; /* for good measure */
4052  		wake_up_all(&mdsc->session_close_wq);
4053  		break;
4054  
4055  	case CEPH_SESSION_STALE:
4056  		pr_info("mds%d caps went stale, renewing\n",
4057  			session->s_mds);
4058  		atomic_inc(&session->s_cap_gen);
4059  		session->s_cap_ttl = jiffies - 1;
4060  		send_renew_caps(mdsc, session);
4061  		break;
4062  
4063  	case CEPH_SESSION_RECALL_STATE:
4064  		ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
4065  		break;
4066  
4067  	case CEPH_SESSION_FLUSHMSG:
4068  		/* flush cap releases */
4069  		spin_lock(&session->s_cap_lock);
4070  		if (session->s_num_cap_releases)
4071  			ceph_flush_cap_releases(mdsc, session);
4072  		spin_unlock(&session->s_cap_lock);
4073  
4074  		send_flushmsg_ack(mdsc, session, seq);
4075  		break;
4076  
4077  	case CEPH_SESSION_FORCE_RO:
4078  		dout("force_session_readonly %p\n", session);
4079  		spin_lock(&session->s_cap_lock);
4080  		session->s_readonly = true;
4081  		spin_unlock(&session->s_cap_lock);
4082  		wake_up_session_caps(session, FORCE_RO);
4083  		break;
4084  
4085  	case CEPH_SESSION_REJECT:
4086  		WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
4087  		pr_info("mds%d rejected session\n", session->s_mds);
4088  		session->s_state = CEPH_MDS_SESSION_REJECTED;
4089  		cleanup_session_requests(mdsc, session);
4090  		remove_session_caps(session);
4091  		if (blocklisted)
4092  			mdsc->fsc->blocklisted = true;
4093  		wake = 2; /* for good measure */
4094  		break;
4095  
4096  	default:
4097  		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
4098  		WARN_ON(1);
4099  	}
4100  
4101  	mutex_unlock(&session->s_mutex);
4102  	if (wake) {
4103  		mutex_lock(&mdsc->mutex);
4104  		__wake_requests(mdsc, &session->s_waiting);
4105  		if (wake == 2)
4106  			kick_requests(mdsc, mds);
4107  		mutex_unlock(&mdsc->mutex);
4108  	}
4109  	if (op == CEPH_SESSION_CLOSE)
4110  		ceph_put_mds_session(session);
4111  	return;
4112  
4113  bad:
4114  	pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
4115  	       (int)msg->front.iov_len);
4116  	ceph_msg_dump(msg);
4117  	return;
4118  }
4119  
ceph_mdsc_release_dir_caps(struct ceph_mds_request * req)4120  void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
4121  {
4122  	int dcaps;
4123  
4124  	dcaps = xchg(&req->r_dir_caps, 0);
4125  	if (dcaps) {
4126  		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4127  		ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
4128  	}
4129  }
4130  
ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request * req)4131  void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
4132  {
4133  	int dcaps;
4134  
4135  	dcaps = xchg(&req->r_dir_caps, 0);
4136  	if (dcaps) {
4137  		dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
4138  		ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
4139  						dcaps);
4140  	}
4141  }
4142  
4143  /*
4144   * called under session->mutex.
4145   */
replay_unsafe_requests(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)4146  static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
4147  				   struct ceph_mds_session *session)
4148  {
4149  	struct ceph_mds_request *req, *nreq;
4150  	struct rb_node *p;
4151  
4152  	dout("replay_unsafe_requests mds%d\n", session->s_mds);
4153  
4154  	mutex_lock(&mdsc->mutex);
4155  	list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
4156  		__send_request(session, req, true);
4157  
4158  	/*
4159  	 * also re-send old requests when MDS enters reconnect stage. So that MDS
4160  	 * can process completed request in clientreplay stage.
4161  	 */
4162  	p = rb_first(&mdsc->request_tree);
4163  	while (p) {
4164  		req = rb_entry(p, struct ceph_mds_request, r_node);
4165  		p = rb_next(p);
4166  		if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
4167  			continue;
4168  		if (req->r_attempts == 0)
4169  			continue; /* only old requests */
4170  		if (!req->r_session)
4171  			continue;
4172  		if (req->r_session->s_mds != session->s_mds)
4173  			continue;
4174  
4175  		ceph_mdsc_release_dir_caps_no_check(req);
4176  
4177  		__send_request(session, req, true);
4178  	}
4179  	mutex_unlock(&mdsc->mutex);
4180  }
4181  
send_reconnect_partial(struct ceph_reconnect_state * recon_state)4182  static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
4183  {
4184  	struct ceph_msg *reply;
4185  	struct ceph_pagelist *_pagelist;
4186  	struct page *page;
4187  	__le32 *addr;
4188  	int err = -ENOMEM;
4189  
4190  	if (!recon_state->allow_multi)
4191  		return -ENOSPC;
4192  
4193  	/* can't handle message that contains both caps and realm */
4194  	BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
4195  
4196  	/* pre-allocate new pagelist */
4197  	_pagelist = ceph_pagelist_alloc(GFP_NOFS);
4198  	if (!_pagelist)
4199  		return -ENOMEM;
4200  
4201  	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4202  	if (!reply)
4203  		goto fail_msg;
4204  
4205  	/* placeholder for nr_caps */
4206  	err = ceph_pagelist_encode_32(_pagelist, 0);
4207  	if (err < 0)
4208  		goto fail;
4209  
4210  	if (recon_state->nr_caps) {
4211  		/* currently encoding caps */
4212  		err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
4213  		if (err)
4214  			goto fail;
4215  	} else {
4216  		/* placeholder for nr_realms (currently encoding relams) */
4217  		err = ceph_pagelist_encode_32(_pagelist, 0);
4218  		if (err < 0)
4219  			goto fail;
4220  	}
4221  
4222  	err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
4223  	if (err)
4224  		goto fail;
4225  
4226  	page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
4227  	addr = kmap_atomic(page);
4228  	if (recon_state->nr_caps) {
4229  		/* currently encoding caps */
4230  		*addr = cpu_to_le32(recon_state->nr_caps);
4231  	} else {
4232  		/* currently encoding relams */
4233  		*(addr + 1) = cpu_to_le32(recon_state->nr_realms);
4234  	}
4235  	kunmap_atomic(addr);
4236  
4237  	reply->hdr.version = cpu_to_le16(5);
4238  	reply->hdr.compat_version = cpu_to_le16(4);
4239  
4240  	reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
4241  	ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
4242  
4243  	ceph_con_send(&recon_state->session->s_con, reply);
4244  	ceph_pagelist_release(recon_state->pagelist);
4245  
4246  	recon_state->pagelist = _pagelist;
4247  	recon_state->nr_caps = 0;
4248  	recon_state->nr_realms = 0;
4249  	recon_state->msg_version = 5;
4250  	return 0;
4251  fail:
4252  	ceph_msg_put(reply);
4253  fail_msg:
4254  	ceph_pagelist_release(_pagelist);
4255  	return err;
4256  }
4257  
d_find_primary(struct inode * inode)4258  static struct dentry* d_find_primary(struct inode *inode)
4259  {
4260  	struct dentry *alias, *dn = NULL;
4261  
4262  	if (hlist_empty(&inode->i_dentry))
4263  		return NULL;
4264  
4265  	spin_lock(&inode->i_lock);
4266  	if (hlist_empty(&inode->i_dentry))
4267  		goto out_unlock;
4268  
4269  	if (S_ISDIR(inode->i_mode)) {
4270  		alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
4271  		if (!IS_ROOT(alias))
4272  			dn = dget(alias);
4273  		goto out_unlock;
4274  	}
4275  
4276  	hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
4277  		spin_lock(&alias->d_lock);
4278  		if (!d_unhashed(alias) &&
4279  		    (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
4280  			dn = dget_dlock(alias);
4281  		}
4282  		spin_unlock(&alias->d_lock);
4283  		if (dn)
4284  			break;
4285  	}
4286  out_unlock:
4287  	spin_unlock(&inode->i_lock);
4288  	return dn;
4289  }
4290  
4291  /*
4292   * Encode information about a cap for a reconnect with the MDS.
4293   */
reconnect_caps_cb(struct inode * inode,int mds,void * arg)4294  static int reconnect_caps_cb(struct inode *inode, int mds, void *arg)
4295  {
4296  	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
4297  	union {
4298  		struct ceph_mds_cap_reconnect v2;
4299  		struct ceph_mds_cap_reconnect_v1 v1;
4300  	} rec;
4301  	struct ceph_inode_info *ci = ceph_inode(inode);
4302  	struct ceph_reconnect_state *recon_state = arg;
4303  	struct ceph_pagelist *pagelist = recon_state->pagelist;
4304  	struct dentry *dentry;
4305  	struct ceph_cap *cap;
4306  	char *path;
4307  	int pathlen = 0, err;
4308  	u64 pathbase;
4309  	u64 snap_follows;
4310  
4311  	dentry = d_find_primary(inode);
4312  	if (dentry) {
4313  		/* set pathbase to parent dir when msg_version >= 2 */
4314  		path = ceph_mdsc_build_path(mdsc, dentry, &pathlen, &pathbase,
4315  					    recon_state->msg_version >= 2);
4316  		dput(dentry);
4317  		if (IS_ERR(path)) {
4318  			err = PTR_ERR(path);
4319  			goto out_err;
4320  		}
4321  	} else {
4322  		path = NULL;
4323  		pathbase = 0;
4324  	}
4325  
4326  	spin_lock(&ci->i_ceph_lock);
4327  	cap = __get_cap_for_mds(ci, mds);
4328  	if (!cap) {
4329  		spin_unlock(&ci->i_ceph_lock);
4330  		err = 0;
4331  		goto out_err;
4332  	}
4333  	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
4334  	     inode, ceph_vinop(inode), cap, cap->cap_id,
4335  	     ceph_cap_string(cap->issued));
4336  
4337  	cap->seq = 0;        /* reset cap seq */
4338  	cap->issue_seq = 0;  /* and issue_seq */
4339  	cap->mseq = 0;       /* and migrate_seq */
4340  	cap->cap_gen = atomic_read(&cap->session->s_cap_gen);
4341  
4342  	/* These are lost when the session goes away */
4343  	if (S_ISDIR(inode->i_mode)) {
4344  		if (cap->issued & CEPH_CAP_DIR_CREATE) {
4345  			ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
4346  			memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
4347  		}
4348  		cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
4349  	}
4350  
4351  	if (recon_state->msg_version >= 2) {
4352  		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
4353  		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4354  		rec.v2.issued = cpu_to_le32(cap->issued);
4355  		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4356  		rec.v2.pathbase = cpu_to_le64(pathbase);
4357  		rec.v2.flock_len = (__force __le32)
4358  			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
4359  	} else {
4360  		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
4361  		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
4362  		rec.v1.issued = cpu_to_le32(cap->issued);
4363  		rec.v1.size = cpu_to_le64(i_size_read(inode));
4364  		ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
4365  		ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
4366  		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
4367  		rec.v1.pathbase = cpu_to_le64(pathbase);
4368  	}
4369  
4370  	if (list_empty(&ci->i_cap_snaps)) {
4371  		snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
4372  	} else {
4373  		struct ceph_cap_snap *capsnap =
4374  			list_first_entry(&ci->i_cap_snaps,
4375  					 struct ceph_cap_snap, ci_item);
4376  		snap_follows = capsnap->follows;
4377  	}
4378  	spin_unlock(&ci->i_ceph_lock);
4379  
4380  	if (recon_state->msg_version >= 2) {
4381  		int num_fcntl_locks, num_flock_locks;
4382  		struct ceph_filelock *flocks = NULL;
4383  		size_t struct_len, total_len = sizeof(u64);
4384  		u8 struct_v = 0;
4385  
4386  encode_again:
4387  		if (rec.v2.flock_len) {
4388  			ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
4389  		} else {
4390  			num_fcntl_locks = 0;
4391  			num_flock_locks = 0;
4392  		}
4393  		if (num_fcntl_locks + num_flock_locks > 0) {
4394  			flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
4395  					       sizeof(struct ceph_filelock),
4396  					       GFP_NOFS);
4397  			if (!flocks) {
4398  				err = -ENOMEM;
4399  				goto out_err;
4400  			}
4401  			err = ceph_encode_locks_to_buffer(inode, flocks,
4402  							  num_fcntl_locks,
4403  							  num_flock_locks);
4404  			if (err) {
4405  				kfree(flocks);
4406  				flocks = NULL;
4407  				if (err == -ENOSPC)
4408  					goto encode_again;
4409  				goto out_err;
4410  			}
4411  		} else {
4412  			kfree(flocks);
4413  			flocks = NULL;
4414  		}
4415  
4416  		if (recon_state->msg_version >= 3) {
4417  			/* version, compat_version and struct_len */
4418  			total_len += 2 * sizeof(u8) + sizeof(u32);
4419  			struct_v = 2;
4420  		}
4421  		/*
4422  		 * number of encoded locks is stable, so copy to pagelist
4423  		 */
4424  		struct_len = 2 * sizeof(u32) +
4425  			    (num_fcntl_locks + num_flock_locks) *
4426  			    sizeof(struct ceph_filelock);
4427  		rec.v2.flock_len = cpu_to_le32(struct_len);
4428  
4429  		struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
4430  
4431  		if (struct_v >= 2)
4432  			struct_len += sizeof(u64); /* snap_follows */
4433  
4434  		total_len += struct_len;
4435  
4436  		if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
4437  			err = send_reconnect_partial(recon_state);
4438  			if (err)
4439  				goto out_freeflocks;
4440  			pagelist = recon_state->pagelist;
4441  		}
4442  
4443  		err = ceph_pagelist_reserve(pagelist, total_len);
4444  		if (err)
4445  			goto out_freeflocks;
4446  
4447  		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4448  		if (recon_state->msg_version >= 3) {
4449  			ceph_pagelist_encode_8(pagelist, struct_v);
4450  			ceph_pagelist_encode_8(pagelist, 1);
4451  			ceph_pagelist_encode_32(pagelist, struct_len);
4452  		}
4453  		ceph_pagelist_encode_string(pagelist, path, pathlen);
4454  		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
4455  		ceph_locks_to_pagelist(flocks, pagelist,
4456  				       num_fcntl_locks, num_flock_locks);
4457  		if (struct_v >= 2)
4458  			ceph_pagelist_encode_64(pagelist, snap_follows);
4459  out_freeflocks:
4460  		kfree(flocks);
4461  	} else {
4462  		err = ceph_pagelist_reserve(pagelist,
4463  					    sizeof(u64) + sizeof(u32) +
4464  					    pathlen + sizeof(rec.v1));
4465  		if (err)
4466  			goto out_err;
4467  
4468  		ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
4469  		ceph_pagelist_encode_string(pagelist, path, pathlen);
4470  		ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
4471  	}
4472  
4473  out_err:
4474  	ceph_mdsc_free_path(path, pathlen);
4475  	if (!err)
4476  		recon_state->nr_caps++;
4477  	return err;
4478  }
4479  
encode_snap_realms(struct ceph_mds_client * mdsc,struct ceph_reconnect_state * recon_state)4480  static int encode_snap_realms(struct ceph_mds_client *mdsc,
4481  			      struct ceph_reconnect_state *recon_state)
4482  {
4483  	struct rb_node *p;
4484  	struct ceph_pagelist *pagelist = recon_state->pagelist;
4485  	int err = 0;
4486  
4487  	if (recon_state->msg_version >= 4) {
4488  		err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
4489  		if (err < 0)
4490  			goto fail;
4491  	}
4492  
4493  	/*
4494  	 * snaprealms.  we provide mds with the ino, seq (version), and
4495  	 * parent for all of our realms.  If the mds has any newer info,
4496  	 * it will tell us.
4497  	 */
4498  	for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
4499  		struct ceph_snap_realm *realm =
4500  		       rb_entry(p, struct ceph_snap_realm, node);
4501  		struct ceph_mds_snaprealm_reconnect sr_rec;
4502  
4503  		if (recon_state->msg_version >= 4) {
4504  			size_t need = sizeof(u8) * 2 + sizeof(u32) +
4505  				      sizeof(sr_rec);
4506  
4507  			if (pagelist->length + need > RECONNECT_MAX_SIZE) {
4508  				err = send_reconnect_partial(recon_state);
4509  				if (err)
4510  					goto fail;
4511  				pagelist = recon_state->pagelist;
4512  			}
4513  
4514  			err = ceph_pagelist_reserve(pagelist, need);
4515  			if (err)
4516  				goto fail;
4517  
4518  			ceph_pagelist_encode_8(pagelist, 1);
4519  			ceph_pagelist_encode_8(pagelist, 1);
4520  			ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
4521  		}
4522  
4523  		dout(" adding snap realm %llx seq %lld parent %llx\n",
4524  		     realm->ino, realm->seq, realm->parent_ino);
4525  		sr_rec.ino = cpu_to_le64(realm->ino);
4526  		sr_rec.seq = cpu_to_le64(realm->seq);
4527  		sr_rec.parent = cpu_to_le64(realm->parent_ino);
4528  
4529  		err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
4530  		if (err)
4531  			goto fail;
4532  
4533  		recon_state->nr_realms++;
4534  	}
4535  fail:
4536  	return err;
4537  }
4538  
4539  
4540  /*
4541   * If an MDS fails and recovers, clients need to reconnect in order to
4542   * reestablish shared state.  This includes all caps issued through
4543   * this session _and_ the snap_realm hierarchy.  Because it's not
4544   * clear which snap realms the mds cares about, we send everything we
4545   * know about.. that ensures we'll then get any new info the
4546   * recovering MDS might have.
4547   *
4548   * This is a relatively heavyweight operation, but it's rare.
4549   */
send_mds_reconnect(struct ceph_mds_client * mdsc,struct ceph_mds_session * session)4550  static void send_mds_reconnect(struct ceph_mds_client *mdsc,
4551  			       struct ceph_mds_session *session)
4552  {
4553  	struct ceph_msg *reply;
4554  	int mds = session->s_mds;
4555  	int err = -ENOMEM;
4556  	struct ceph_reconnect_state recon_state = {
4557  		.session = session,
4558  	};
4559  	LIST_HEAD(dispose);
4560  
4561  	pr_info("mds%d reconnect start\n", mds);
4562  
4563  	recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
4564  	if (!recon_state.pagelist)
4565  		goto fail_nopagelist;
4566  
4567  	reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
4568  	if (!reply)
4569  		goto fail_nomsg;
4570  
4571  	xa_destroy(&session->s_delegated_inos);
4572  
4573  	mutex_lock(&session->s_mutex);
4574  	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
4575  	session->s_seq = 0;
4576  
4577  	dout("session %p state %s\n", session,
4578  	     ceph_session_state_name(session->s_state));
4579  
4580  	atomic_inc(&session->s_cap_gen);
4581  
4582  	spin_lock(&session->s_cap_lock);
4583  	/* don't know if session is readonly */
4584  	session->s_readonly = 0;
4585  	/*
4586  	 * notify __ceph_remove_cap() that we are composing cap reconnect.
4587  	 * If a cap get released before being added to the cap reconnect,
4588  	 * __ceph_remove_cap() should skip queuing cap release.
4589  	 */
4590  	session->s_cap_reconnect = 1;
4591  	/* drop old cap expires; we're about to reestablish that state */
4592  	detach_cap_releases(session, &dispose);
4593  	spin_unlock(&session->s_cap_lock);
4594  	dispose_cap_releases(mdsc, &dispose);
4595  
4596  	/* trim unused caps to reduce MDS's cache rejoin time */
4597  	if (mdsc->fsc->sb->s_root)
4598  		shrink_dcache_parent(mdsc->fsc->sb->s_root);
4599  
4600  	ceph_con_close(&session->s_con);
4601  	ceph_con_open(&session->s_con,
4602  		      CEPH_ENTITY_TYPE_MDS, mds,
4603  		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4604  
4605  	/* replay unsafe requests */
4606  	replay_unsafe_requests(mdsc, session);
4607  
4608  	ceph_early_kick_flushing_caps(mdsc, session);
4609  
4610  	down_read(&mdsc->snap_rwsem);
4611  
4612  	/* placeholder for nr_caps */
4613  	err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4614  	if (err)
4615  		goto fail;
4616  
4617  	if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4618  		recon_state.msg_version = 3;
4619  		recon_state.allow_multi = true;
4620  	} else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4621  		recon_state.msg_version = 3;
4622  	} else {
4623  		recon_state.msg_version = 2;
4624  	}
4625  	/* trsaverse this session's caps */
4626  	err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4627  
4628  	spin_lock(&session->s_cap_lock);
4629  	session->s_cap_reconnect = 0;
4630  	spin_unlock(&session->s_cap_lock);
4631  
4632  	if (err < 0)
4633  		goto fail;
4634  
4635  	/* check if all realms can be encoded into current message */
4636  	if (mdsc->num_snap_realms) {
4637  		size_t total_len =
4638  			recon_state.pagelist->length +
4639  			mdsc->num_snap_realms *
4640  			sizeof(struct ceph_mds_snaprealm_reconnect);
4641  		if (recon_state.msg_version >= 4) {
4642  			/* number of realms */
4643  			total_len += sizeof(u32);
4644  			/* version, compat_version and struct_len */
4645  			total_len += mdsc->num_snap_realms *
4646  				     (2 * sizeof(u8) + sizeof(u32));
4647  		}
4648  		if (total_len > RECONNECT_MAX_SIZE) {
4649  			if (!recon_state.allow_multi) {
4650  				err = -ENOSPC;
4651  				goto fail;
4652  			}
4653  			if (recon_state.nr_caps) {
4654  				err = send_reconnect_partial(&recon_state);
4655  				if (err)
4656  					goto fail;
4657  			}
4658  			recon_state.msg_version = 5;
4659  		}
4660  	}
4661  
4662  	err = encode_snap_realms(mdsc, &recon_state);
4663  	if (err < 0)
4664  		goto fail;
4665  
4666  	if (recon_state.msg_version >= 5) {
4667  		err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4668  		if (err < 0)
4669  			goto fail;
4670  	}
4671  
4672  	if (recon_state.nr_caps || recon_state.nr_realms) {
4673  		struct page *page =
4674  			list_first_entry(&recon_state.pagelist->head,
4675  					struct page, lru);
4676  		__le32 *addr = kmap_atomic(page);
4677  		if (recon_state.nr_caps) {
4678  			WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4679  			*addr = cpu_to_le32(recon_state.nr_caps);
4680  		} else if (recon_state.msg_version >= 4) {
4681  			*(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4682  		}
4683  		kunmap_atomic(addr);
4684  	}
4685  
4686  	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4687  	if (recon_state.msg_version >= 4)
4688  		reply->hdr.compat_version = cpu_to_le16(4);
4689  
4690  	reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4691  	ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4692  
4693  	ceph_con_send(&session->s_con, reply);
4694  
4695  	mutex_unlock(&session->s_mutex);
4696  
4697  	mutex_lock(&mdsc->mutex);
4698  	__wake_requests(mdsc, &session->s_waiting);
4699  	mutex_unlock(&mdsc->mutex);
4700  
4701  	up_read(&mdsc->snap_rwsem);
4702  	ceph_pagelist_release(recon_state.pagelist);
4703  	return;
4704  
4705  fail:
4706  	ceph_msg_put(reply);
4707  	up_read(&mdsc->snap_rwsem);
4708  	mutex_unlock(&session->s_mutex);
4709  fail_nomsg:
4710  	ceph_pagelist_release(recon_state.pagelist);
4711  fail_nopagelist:
4712  	pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4713  	return;
4714  }
4715  
4716  
4717  /*
4718   * compare old and new mdsmaps, kicking requests
4719   * and closing out old connections as necessary
4720   *
4721   * called under mdsc->mutex.
4722   */
check_new_map(struct ceph_mds_client * mdsc,struct ceph_mdsmap * newmap,struct ceph_mdsmap * oldmap)4723  static void check_new_map(struct ceph_mds_client *mdsc,
4724  			  struct ceph_mdsmap *newmap,
4725  			  struct ceph_mdsmap *oldmap)
4726  {
4727  	int i, j, err;
4728  	int oldstate, newstate;
4729  	struct ceph_mds_session *s;
4730  	unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0};
4731  
4732  	dout("check_new_map new %u old %u\n",
4733  	     newmap->m_epoch, oldmap->m_epoch);
4734  
4735  	if (newmap->m_info) {
4736  		for (i = 0; i < newmap->possible_max_rank; i++) {
4737  			for (j = 0; j < newmap->m_info[i].num_export_targets; j++)
4738  				set_bit(newmap->m_info[i].export_targets[j], targets);
4739  		}
4740  	}
4741  
4742  	for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4743  		if (!mdsc->sessions[i])
4744  			continue;
4745  		s = mdsc->sessions[i];
4746  		oldstate = ceph_mdsmap_get_state(oldmap, i);
4747  		newstate = ceph_mdsmap_get_state(newmap, i);
4748  
4749  		dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4750  		     i, ceph_mds_state_name(oldstate),
4751  		     ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4752  		     ceph_mds_state_name(newstate),
4753  		     ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4754  		     ceph_session_state_name(s->s_state));
4755  
4756  		if (i >= newmap->possible_max_rank) {
4757  			/* force close session for stopped mds */
4758  			ceph_get_mds_session(s);
4759  			__unregister_session(mdsc, s);
4760  			__wake_requests(mdsc, &s->s_waiting);
4761  			mutex_unlock(&mdsc->mutex);
4762  
4763  			mutex_lock(&s->s_mutex);
4764  			cleanup_session_requests(mdsc, s);
4765  			remove_session_caps(s);
4766  			mutex_unlock(&s->s_mutex);
4767  
4768  			ceph_put_mds_session(s);
4769  
4770  			mutex_lock(&mdsc->mutex);
4771  			kick_requests(mdsc, i);
4772  			continue;
4773  		}
4774  
4775  		if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4776  			   ceph_mdsmap_get_addr(newmap, i),
4777  			   sizeof(struct ceph_entity_addr))) {
4778  			/* just close it */
4779  			mutex_unlock(&mdsc->mutex);
4780  			mutex_lock(&s->s_mutex);
4781  			mutex_lock(&mdsc->mutex);
4782  			ceph_con_close(&s->s_con);
4783  			mutex_unlock(&s->s_mutex);
4784  			s->s_state = CEPH_MDS_SESSION_RESTARTING;
4785  		} else if (oldstate == newstate) {
4786  			continue;  /* nothing new with this mds */
4787  		}
4788  
4789  		/*
4790  		 * send reconnect?
4791  		 */
4792  		if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4793  		    newstate >= CEPH_MDS_STATE_RECONNECT) {
4794  			mutex_unlock(&mdsc->mutex);
4795  			clear_bit(i, targets);
4796  			send_mds_reconnect(mdsc, s);
4797  			mutex_lock(&mdsc->mutex);
4798  		}
4799  
4800  		/*
4801  		 * kick request on any mds that has gone active.
4802  		 */
4803  		if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4804  		    newstate >= CEPH_MDS_STATE_ACTIVE) {
4805  			if (oldstate != CEPH_MDS_STATE_CREATING &&
4806  			    oldstate != CEPH_MDS_STATE_STARTING)
4807  				pr_info("mds%d recovery completed\n", s->s_mds);
4808  			kick_requests(mdsc, i);
4809  			mutex_unlock(&mdsc->mutex);
4810  			mutex_lock(&s->s_mutex);
4811  			mutex_lock(&mdsc->mutex);
4812  			ceph_kick_flushing_caps(mdsc, s);
4813  			mutex_unlock(&s->s_mutex);
4814  			wake_up_session_caps(s, RECONNECT);
4815  		}
4816  	}
4817  
4818  	/*
4819  	 * Only open and reconnect sessions that don't exist yet.
4820  	 */
4821  	for (i = 0; i < newmap->possible_max_rank; i++) {
4822  		/*
4823  		 * In case the import MDS is crashed just after
4824  		 * the EImportStart journal is flushed, so when
4825  		 * a standby MDS takes over it and is replaying
4826  		 * the EImportStart journal the new MDS daemon
4827  		 * will wait the client to reconnect it, but the
4828  		 * client may never register/open the session yet.
4829  		 *
4830  		 * Will try to reconnect that MDS daemon if the
4831  		 * rank number is in the export targets array and
4832  		 * is the up:reconnect state.
4833  		 */
4834  		newstate = ceph_mdsmap_get_state(newmap, i);
4835  		if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT)
4836  			continue;
4837  
4838  		/*
4839  		 * The session maybe registered and opened by some
4840  		 * requests which were choosing random MDSes during
4841  		 * the mdsc->mutex's unlock/lock gap below in rare
4842  		 * case. But the related MDS daemon will just queue
4843  		 * that requests and be still waiting for the client's
4844  		 * reconnection request in up:reconnect state.
4845  		 */
4846  		s = __ceph_lookup_mds_session(mdsc, i);
4847  		if (likely(!s)) {
4848  			s = __open_export_target_session(mdsc, i);
4849  			if (IS_ERR(s)) {
4850  				err = PTR_ERR(s);
4851  				pr_err("failed to open export target session, err %d\n",
4852  				       err);
4853  				continue;
4854  			}
4855  		}
4856  		dout("send reconnect to export target mds.%d\n", i);
4857  		mutex_unlock(&mdsc->mutex);
4858  		send_mds_reconnect(mdsc, s);
4859  		ceph_put_mds_session(s);
4860  		mutex_lock(&mdsc->mutex);
4861  	}
4862  
4863  	for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4864  		s = mdsc->sessions[i];
4865  		if (!s)
4866  			continue;
4867  		if (!ceph_mdsmap_is_laggy(newmap, i))
4868  			continue;
4869  		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4870  		    s->s_state == CEPH_MDS_SESSION_HUNG ||
4871  		    s->s_state == CEPH_MDS_SESSION_CLOSING) {
4872  			dout(" connecting to export targets of laggy mds%d\n",
4873  			     i);
4874  			__open_export_target_sessions(mdsc, s);
4875  		}
4876  	}
4877  }
4878  
4879  
4880  
4881  /*
4882   * leases
4883   */
4884  
4885  /*
4886   * caller must hold session s_mutex, dentry->d_lock
4887   */
__ceph_mdsc_drop_dentry_lease(struct dentry * dentry)4888  void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4889  {
4890  	struct ceph_dentry_info *di = ceph_dentry(dentry);
4891  
4892  	ceph_put_mds_session(di->lease_session);
4893  	di->lease_session = NULL;
4894  }
4895  
handle_lease(struct ceph_mds_client * mdsc,struct ceph_mds_session * session,struct ceph_msg * msg)4896  static void handle_lease(struct ceph_mds_client *mdsc,
4897  			 struct ceph_mds_session *session,
4898  			 struct ceph_msg *msg)
4899  {
4900  	struct super_block *sb = mdsc->fsc->sb;
4901  	struct inode *inode;
4902  	struct dentry *parent, *dentry;
4903  	struct ceph_dentry_info *di;
4904  	int mds = session->s_mds;
4905  	struct ceph_mds_lease *h = msg->front.iov_base;
4906  	u32 seq;
4907  	struct ceph_vino vino;
4908  	struct qstr dname;
4909  	int release = 0;
4910  
4911  	dout("handle_lease from mds%d\n", mds);
4912  
4913  	if (!ceph_inc_mds_stopping_blocker(mdsc, session))
4914  		return;
4915  
4916  	/* decode */
4917  	if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4918  		goto bad;
4919  	vino.ino = le64_to_cpu(h->ino);
4920  	vino.snap = CEPH_NOSNAP;
4921  	seq = le32_to_cpu(h->seq);
4922  	dname.len = get_unaligned_le32(h + 1);
4923  	if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4924  		goto bad;
4925  	dname.name = (void *)(h + 1) + sizeof(u32);
4926  
4927  	/* lookup inode */
4928  	inode = ceph_find_inode(sb, vino);
4929  	dout("handle_lease %s, ino %llx %p %.*s\n",
4930  	     ceph_lease_op_name(h->action), vino.ino, inode,
4931  	     dname.len, dname.name);
4932  
4933  	mutex_lock(&session->s_mutex);
4934  	if (!inode) {
4935  		dout("handle_lease no inode %llx\n", vino.ino);
4936  		goto release;
4937  	}
4938  
4939  	/* dentry */
4940  	parent = d_find_alias(inode);
4941  	if (!parent) {
4942  		dout("no parent dentry on inode %p\n", inode);
4943  		WARN_ON(1);
4944  		goto release;  /* hrm... */
4945  	}
4946  	dname.hash = full_name_hash(parent, dname.name, dname.len);
4947  	dentry = d_lookup(parent, &dname);
4948  	dput(parent);
4949  	if (!dentry)
4950  		goto release;
4951  
4952  	spin_lock(&dentry->d_lock);
4953  	di = ceph_dentry(dentry);
4954  	switch (h->action) {
4955  	case CEPH_MDS_LEASE_REVOKE:
4956  		if (di->lease_session == session) {
4957  			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4958  				h->seq = cpu_to_le32(di->lease_seq);
4959  			__ceph_mdsc_drop_dentry_lease(dentry);
4960  		}
4961  		release = 1;
4962  		break;
4963  
4964  	case CEPH_MDS_LEASE_RENEW:
4965  		if (di->lease_session == session &&
4966  		    di->lease_gen == atomic_read(&session->s_cap_gen) &&
4967  		    di->lease_renew_from &&
4968  		    di->lease_renew_after == 0) {
4969  			unsigned long duration =
4970  				msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4971  
4972  			di->lease_seq = seq;
4973  			di->time = di->lease_renew_from + duration;
4974  			di->lease_renew_after = di->lease_renew_from +
4975  				(duration >> 1);
4976  			di->lease_renew_from = 0;
4977  		}
4978  		break;
4979  	}
4980  	spin_unlock(&dentry->d_lock);
4981  	dput(dentry);
4982  
4983  	if (!release)
4984  		goto out;
4985  
4986  release:
4987  	/* let's just reuse the same message */
4988  	h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4989  	ceph_msg_get(msg);
4990  	ceph_con_send(&session->s_con, msg);
4991  
4992  out:
4993  	mutex_unlock(&session->s_mutex);
4994  	iput(inode);
4995  
4996  	ceph_dec_mds_stopping_blocker(mdsc);
4997  	return;
4998  
4999  bad:
5000  	ceph_dec_mds_stopping_blocker(mdsc);
5001  
5002  	pr_err("corrupt lease message\n");
5003  	ceph_msg_dump(msg);
5004  }
5005  
ceph_mdsc_lease_send_msg(struct ceph_mds_session * session,struct dentry * dentry,char action,u32 seq)5006  void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
5007  			      struct dentry *dentry, char action,
5008  			      u32 seq)
5009  {
5010  	struct ceph_msg *msg;
5011  	struct ceph_mds_lease *lease;
5012  	struct inode *dir;
5013  	int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
5014  
5015  	dout("lease_send_msg identry %p %s to mds%d\n",
5016  	     dentry, ceph_lease_op_name(action), session->s_mds);
5017  
5018  	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
5019  	if (!msg)
5020  		return;
5021  	lease = msg->front.iov_base;
5022  	lease->action = action;
5023  	lease->seq = cpu_to_le32(seq);
5024  
5025  	spin_lock(&dentry->d_lock);
5026  	dir = d_inode(dentry->d_parent);
5027  	lease->ino = cpu_to_le64(ceph_ino(dir));
5028  	lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
5029  
5030  	put_unaligned_le32(dentry->d_name.len, lease + 1);
5031  	memcpy((void *)(lease + 1) + 4,
5032  	       dentry->d_name.name, dentry->d_name.len);
5033  	spin_unlock(&dentry->d_lock);
5034  
5035  	ceph_con_send(&session->s_con, msg);
5036  }
5037  
5038  /*
5039   * lock unlock the session, to wait ongoing session activities
5040   */
lock_unlock_session(struct ceph_mds_session * s)5041  static void lock_unlock_session(struct ceph_mds_session *s)
5042  {
5043  	mutex_lock(&s->s_mutex);
5044  	mutex_unlock(&s->s_mutex);
5045  }
5046  
maybe_recover_session(struct ceph_mds_client * mdsc)5047  static void maybe_recover_session(struct ceph_mds_client *mdsc)
5048  {
5049  	struct ceph_fs_client *fsc = mdsc->fsc;
5050  
5051  	if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
5052  		return;
5053  
5054  	if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
5055  		return;
5056  
5057  	if (!READ_ONCE(fsc->blocklisted))
5058  		return;
5059  
5060  	pr_info("auto reconnect after blocklisted\n");
5061  	ceph_force_reconnect(fsc->sb);
5062  }
5063  
check_session_state(struct ceph_mds_session * s)5064  bool check_session_state(struct ceph_mds_session *s)
5065  {
5066  	switch (s->s_state) {
5067  	case CEPH_MDS_SESSION_OPEN:
5068  		if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
5069  			s->s_state = CEPH_MDS_SESSION_HUNG;
5070  			pr_info("mds%d hung\n", s->s_mds);
5071  		}
5072  		break;
5073  	case CEPH_MDS_SESSION_CLOSING:
5074  	case CEPH_MDS_SESSION_NEW:
5075  	case CEPH_MDS_SESSION_RESTARTING:
5076  	case CEPH_MDS_SESSION_CLOSED:
5077  	case CEPH_MDS_SESSION_REJECTED:
5078  		return false;
5079  	}
5080  
5081  	return true;
5082  }
5083  
5084  /*
5085   * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
5086   * then we need to retransmit that request.
5087   */
inc_session_sequence(struct ceph_mds_session * s)5088  void inc_session_sequence(struct ceph_mds_session *s)
5089  {
5090  	lockdep_assert_held(&s->s_mutex);
5091  
5092  	s->s_seq++;
5093  
5094  	if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
5095  		int ret;
5096  
5097  		dout("resending session close request for mds%d\n", s->s_mds);
5098  		ret = request_close_session(s);
5099  		if (ret < 0)
5100  			pr_err("unable to close session to mds%d: %d\n",
5101  			       s->s_mds, ret);
5102  	}
5103  }
5104  
5105  /*
5106   * delayed work -- periodically trim expired leases, renew caps with mds.  If
5107   * the @delay parameter is set to 0 or if it's more than 5 secs, the default
5108   * workqueue delay value of 5 secs will be used.
5109   */
schedule_delayed(struct ceph_mds_client * mdsc,unsigned long delay)5110  static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
5111  {
5112  	unsigned long max_delay = HZ * 5;
5113  
5114  	/* 5 secs default delay */
5115  	if (!delay || (delay > max_delay))
5116  		delay = max_delay;
5117  	schedule_delayed_work(&mdsc->delayed_work,
5118  			      round_jiffies_relative(delay));
5119  }
5120  
delayed_work(struct work_struct * work)5121  static void delayed_work(struct work_struct *work)
5122  {
5123  	struct ceph_mds_client *mdsc =
5124  		container_of(work, struct ceph_mds_client, delayed_work.work);
5125  	unsigned long delay;
5126  	int renew_interval;
5127  	int renew_caps;
5128  	int i;
5129  
5130  	dout("mdsc delayed_work\n");
5131  
5132  	if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
5133  		return;
5134  
5135  	mutex_lock(&mdsc->mutex);
5136  	renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
5137  	renew_caps = time_after_eq(jiffies, HZ*renew_interval +
5138  				   mdsc->last_renew_caps);
5139  	if (renew_caps)
5140  		mdsc->last_renew_caps = jiffies;
5141  
5142  	for (i = 0; i < mdsc->max_sessions; i++) {
5143  		struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
5144  		if (!s)
5145  			continue;
5146  
5147  		if (!check_session_state(s)) {
5148  			ceph_put_mds_session(s);
5149  			continue;
5150  		}
5151  		mutex_unlock(&mdsc->mutex);
5152  
5153  		mutex_lock(&s->s_mutex);
5154  		if (renew_caps)
5155  			send_renew_caps(mdsc, s);
5156  		else
5157  			ceph_con_keepalive(&s->s_con);
5158  		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
5159  		    s->s_state == CEPH_MDS_SESSION_HUNG)
5160  			ceph_send_cap_releases(mdsc, s);
5161  		mutex_unlock(&s->s_mutex);
5162  		ceph_put_mds_session(s);
5163  
5164  		mutex_lock(&mdsc->mutex);
5165  	}
5166  	mutex_unlock(&mdsc->mutex);
5167  
5168  	delay = ceph_check_delayed_caps(mdsc);
5169  
5170  	ceph_queue_cap_reclaim_work(mdsc);
5171  
5172  	ceph_trim_snapid_map(mdsc);
5173  
5174  	maybe_recover_session(mdsc);
5175  
5176  	schedule_delayed(mdsc, delay);
5177  }
5178  
ceph_mdsc_init(struct ceph_fs_client * fsc)5179  int ceph_mdsc_init(struct ceph_fs_client *fsc)
5180  
5181  {
5182  	struct ceph_mds_client *mdsc;
5183  	int err;
5184  
5185  	mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
5186  	if (!mdsc)
5187  		return -ENOMEM;
5188  	mdsc->fsc = fsc;
5189  	mutex_init(&mdsc->mutex);
5190  	mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
5191  	if (!mdsc->mdsmap) {
5192  		err = -ENOMEM;
5193  		goto err_mdsc;
5194  	}
5195  
5196  	init_completion(&mdsc->safe_umount_waiters);
5197  	spin_lock_init(&mdsc->stopping_lock);
5198  	atomic_set(&mdsc->stopping_blockers, 0);
5199  	init_completion(&mdsc->stopping_waiter);
5200  	init_waitqueue_head(&mdsc->session_close_wq);
5201  	INIT_LIST_HEAD(&mdsc->waiting_for_map);
5202  	mdsc->quotarealms_inodes = RB_ROOT;
5203  	mutex_init(&mdsc->quotarealms_inodes_mutex);
5204  	init_rwsem(&mdsc->snap_rwsem);
5205  	mdsc->snap_realms = RB_ROOT;
5206  	INIT_LIST_HEAD(&mdsc->snap_empty);
5207  	spin_lock_init(&mdsc->snap_empty_lock);
5208  	mdsc->request_tree = RB_ROOT;
5209  	INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
5210  	mdsc->last_renew_caps = jiffies;
5211  	INIT_LIST_HEAD(&mdsc->cap_delay_list);
5212  	INIT_LIST_HEAD(&mdsc->cap_wait_list);
5213  	spin_lock_init(&mdsc->cap_delay_lock);
5214  	INIT_LIST_HEAD(&mdsc->snap_flush_list);
5215  	spin_lock_init(&mdsc->snap_flush_lock);
5216  	mdsc->last_cap_flush_tid = 1;
5217  	INIT_LIST_HEAD(&mdsc->cap_flush_list);
5218  	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
5219  	spin_lock_init(&mdsc->cap_dirty_lock);
5220  	init_waitqueue_head(&mdsc->cap_flushing_wq);
5221  	INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
5222  	err = ceph_metric_init(&mdsc->metric);
5223  	if (err)
5224  		goto err_mdsmap;
5225  
5226  	spin_lock_init(&mdsc->dentry_list_lock);
5227  	INIT_LIST_HEAD(&mdsc->dentry_leases);
5228  	INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
5229  
5230  	ceph_caps_init(mdsc);
5231  	ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
5232  
5233  	spin_lock_init(&mdsc->snapid_map_lock);
5234  	mdsc->snapid_map_tree = RB_ROOT;
5235  	INIT_LIST_HEAD(&mdsc->snapid_map_lru);
5236  
5237  	init_rwsem(&mdsc->pool_perm_rwsem);
5238  	mdsc->pool_perm_tree = RB_ROOT;
5239  
5240  	strscpy(mdsc->nodename, utsname()->nodename,
5241  		sizeof(mdsc->nodename));
5242  
5243  	fsc->mdsc = mdsc;
5244  	return 0;
5245  
5246  err_mdsmap:
5247  	kfree(mdsc->mdsmap);
5248  err_mdsc:
5249  	kfree(mdsc);
5250  	return err;
5251  }
5252  
5253  /*
5254   * Wait for safe replies on open mds requests.  If we time out, drop
5255   * all requests from the tree to avoid dangling dentry refs.
5256   */
wait_requests(struct ceph_mds_client * mdsc)5257  static void wait_requests(struct ceph_mds_client *mdsc)
5258  {
5259  	struct ceph_options *opts = mdsc->fsc->client->options;
5260  	struct ceph_mds_request *req;
5261  
5262  	mutex_lock(&mdsc->mutex);
5263  	if (__get_oldest_req(mdsc)) {
5264  		mutex_unlock(&mdsc->mutex);
5265  
5266  		dout("wait_requests waiting for requests\n");
5267  		wait_for_completion_timeout(&mdsc->safe_umount_waiters,
5268  				    ceph_timeout_jiffies(opts->mount_timeout));
5269  
5270  		/* tear down remaining requests */
5271  		mutex_lock(&mdsc->mutex);
5272  		while ((req = __get_oldest_req(mdsc))) {
5273  			dout("wait_requests timed out on tid %llu\n",
5274  			     req->r_tid);
5275  			list_del_init(&req->r_wait);
5276  			__unregister_request(mdsc, req);
5277  		}
5278  	}
5279  	mutex_unlock(&mdsc->mutex);
5280  	dout("wait_requests done\n");
5281  }
5282  
send_flush_mdlog(struct ceph_mds_session * s)5283  void send_flush_mdlog(struct ceph_mds_session *s)
5284  {
5285  	struct ceph_msg *msg;
5286  
5287  	/*
5288  	 * Pre-luminous MDS crashes when it sees an unknown session request
5289  	 */
5290  	if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
5291  		return;
5292  
5293  	mutex_lock(&s->s_mutex);
5294  	dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
5295  	     ceph_session_state_name(s->s_state), s->s_seq);
5296  	msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
5297  				      s->s_seq);
5298  	if (!msg) {
5299  		pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
5300  		       s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
5301  	} else {
5302  		ceph_con_send(&s->s_con, msg);
5303  	}
5304  	mutex_unlock(&s->s_mutex);
5305  }
5306  
5307  /*
5308   * called before mount is ro, and before dentries are torn down.
5309   * (hmm, does this still race with new lookups?)
5310   */
ceph_mdsc_pre_umount(struct ceph_mds_client * mdsc)5311  void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
5312  {
5313  	dout("pre_umount\n");
5314  	mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
5315  
5316  	ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
5317  	ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
5318  	ceph_flush_dirty_caps(mdsc);
5319  	wait_requests(mdsc);
5320  
5321  	/*
5322  	 * wait for reply handlers to drop their request refs and
5323  	 * their inode/dcache refs
5324  	 */
5325  	ceph_msgr_flush();
5326  
5327  	ceph_cleanup_quotarealms_inodes(mdsc);
5328  }
5329  
5330  /*
5331   * flush the mdlog and wait for all write mds requests to flush.
5332   */
flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client * mdsc,u64 want_tid)5333  static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc,
5334  						 u64 want_tid)
5335  {
5336  	struct ceph_mds_request *req = NULL, *nextreq;
5337  	struct ceph_mds_session *last_session = NULL;
5338  	struct rb_node *n;
5339  
5340  	mutex_lock(&mdsc->mutex);
5341  	dout("%s want %lld\n", __func__, want_tid);
5342  restart:
5343  	req = __get_oldest_req(mdsc);
5344  	while (req && req->r_tid <= want_tid) {
5345  		/* find next request */
5346  		n = rb_next(&req->r_node);
5347  		if (n)
5348  			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
5349  		else
5350  			nextreq = NULL;
5351  		if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
5352  		    (req->r_op & CEPH_MDS_OP_WRITE)) {
5353  			struct ceph_mds_session *s = req->r_session;
5354  
5355  			if (!s) {
5356  				req = nextreq;
5357  				continue;
5358  			}
5359  
5360  			/* write op */
5361  			ceph_mdsc_get_request(req);
5362  			if (nextreq)
5363  				ceph_mdsc_get_request(nextreq);
5364  			s = ceph_get_mds_session(s);
5365  			mutex_unlock(&mdsc->mutex);
5366  
5367  			/* send flush mdlog request to MDS */
5368  			if (last_session != s) {
5369  				send_flush_mdlog(s);
5370  				ceph_put_mds_session(last_session);
5371  				last_session = s;
5372  			} else {
5373  				ceph_put_mds_session(s);
5374  			}
5375  			dout("%s wait on %llu (want %llu)\n", __func__,
5376  			     req->r_tid, want_tid);
5377  			wait_for_completion(&req->r_safe_completion);
5378  
5379  			mutex_lock(&mdsc->mutex);
5380  			ceph_mdsc_put_request(req);
5381  			if (!nextreq)
5382  				break;  /* next dne before, so we're done! */
5383  			if (RB_EMPTY_NODE(&nextreq->r_node)) {
5384  				/* next request was removed from tree */
5385  				ceph_mdsc_put_request(nextreq);
5386  				goto restart;
5387  			}
5388  			ceph_mdsc_put_request(nextreq);  /* won't go away */
5389  		}
5390  		req = nextreq;
5391  	}
5392  	mutex_unlock(&mdsc->mutex);
5393  	ceph_put_mds_session(last_session);
5394  	dout("%s done\n", __func__);
5395  }
5396  
ceph_mdsc_sync(struct ceph_mds_client * mdsc)5397  void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
5398  {
5399  	u64 want_tid, want_flush;
5400  
5401  	if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN)
5402  		return;
5403  
5404  	dout("sync\n");
5405  	mutex_lock(&mdsc->mutex);
5406  	want_tid = mdsc->last_tid;
5407  	mutex_unlock(&mdsc->mutex);
5408  
5409  	ceph_flush_dirty_caps(mdsc);
5410  	spin_lock(&mdsc->cap_dirty_lock);
5411  	want_flush = mdsc->last_cap_flush_tid;
5412  	if (!list_empty(&mdsc->cap_flush_list)) {
5413  		struct ceph_cap_flush *cf =
5414  			list_last_entry(&mdsc->cap_flush_list,
5415  					struct ceph_cap_flush, g_list);
5416  		cf->wake = true;
5417  	}
5418  	spin_unlock(&mdsc->cap_dirty_lock);
5419  
5420  	dout("sync want tid %lld flush_seq %lld\n",
5421  	     want_tid, want_flush);
5422  
5423  	flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid);
5424  	wait_caps_flush(mdsc, want_flush);
5425  }
5426  
5427  /*
5428   * true if all sessions are closed, or we force unmount
5429   */
done_closing_sessions(struct ceph_mds_client * mdsc,int skipped)5430  static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
5431  {
5432  	if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
5433  		return true;
5434  	return atomic_read(&mdsc->num_sessions) <= skipped;
5435  }
5436  
5437  /*
5438   * called after sb is ro or when metadata corrupted.
5439   */
ceph_mdsc_close_sessions(struct ceph_mds_client * mdsc)5440  void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
5441  {
5442  	struct ceph_options *opts = mdsc->fsc->client->options;
5443  	struct ceph_mds_session *session;
5444  	int i;
5445  	int skipped = 0;
5446  
5447  	dout("close_sessions\n");
5448  
5449  	/* close sessions */
5450  	mutex_lock(&mdsc->mutex);
5451  	for (i = 0; i < mdsc->max_sessions; i++) {
5452  		session = __ceph_lookup_mds_session(mdsc, i);
5453  		if (!session)
5454  			continue;
5455  		mutex_unlock(&mdsc->mutex);
5456  		mutex_lock(&session->s_mutex);
5457  		if (__close_session(mdsc, session) <= 0)
5458  			skipped++;
5459  		mutex_unlock(&session->s_mutex);
5460  		ceph_put_mds_session(session);
5461  		mutex_lock(&mdsc->mutex);
5462  	}
5463  	mutex_unlock(&mdsc->mutex);
5464  
5465  	dout("waiting for sessions to close\n");
5466  	wait_event_timeout(mdsc->session_close_wq,
5467  			   done_closing_sessions(mdsc, skipped),
5468  			   ceph_timeout_jiffies(opts->mount_timeout));
5469  
5470  	/* tear down remaining sessions */
5471  	mutex_lock(&mdsc->mutex);
5472  	for (i = 0; i < mdsc->max_sessions; i++) {
5473  		if (mdsc->sessions[i]) {
5474  			session = ceph_get_mds_session(mdsc->sessions[i]);
5475  			__unregister_session(mdsc, session);
5476  			mutex_unlock(&mdsc->mutex);
5477  			mutex_lock(&session->s_mutex);
5478  			remove_session_caps(session);
5479  			mutex_unlock(&session->s_mutex);
5480  			ceph_put_mds_session(session);
5481  			mutex_lock(&mdsc->mutex);
5482  		}
5483  	}
5484  	WARN_ON(!list_empty(&mdsc->cap_delay_list));
5485  	mutex_unlock(&mdsc->mutex);
5486  
5487  	ceph_cleanup_snapid_map(mdsc);
5488  	ceph_cleanup_global_and_empty_realms(mdsc);
5489  
5490  	cancel_work_sync(&mdsc->cap_reclaim_work);
5491  	cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
5492  
5493  	dout("stopped\n");
5494  }
5495  
ceph_mdsc_force_umount(struct ceph_mds_client * mdsc)5496  void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
5497  {
5498  	struct ceph_mds_session *session;
5499  	int mds;
5500  
5501  	dout("force umount\n");
5502  
5503  	mutex_lock(&mdsc->mutex);
5504  	for (mds = 0; mds < mdsc->max_sessions; mds++) {
5505  		session = __ceph_lookup_mds_session(mdsc, mds);
5506  		if (!session)
5507  			continue;
5508  
5509  		if (session->s_state == CEPH_MDS_SESSION_REJECTED)
5510  			__unregister_session(mdsc, session);
5511  		__wake_requests(mdsc, &session->s_waiting);
5512  		mutex_unlock(&mdsc->mutex);
5513  
5514  		mutex_lock(&session->s_mutex);
5515  		__close_session(mdsc, session);
5516  		if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
5517  			cleanup_session_requests(mdsc, session);
5518  			remove_session_caps(session);
5519  		}
5520  		mutex_unlock(&session->s_mutex);
5521  		ceph_put_mds_session(session);
5522  
5523  		mutex_lock(&mdsc->mutex);
5524  		kick_requests(mdsc, mds);
5525  	}
5526  	__wake_requests(mdsc, &mdsc->waiting_for_map);
5527  	mutex_unlock(&mdsc->mutex);
5528  }
5529  
ceph_mdsc_stop(struct ceph_mds_client * mdsc)5530  static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
5531  {
5532  	dout("stop\n");
5533  	/*
5534  	 * Make sure the delayed work stopped before releasing
5535  	 * the resources.
5536  	 *
5537  	 * Because the cancel_delayed_work_sync() will only
5538  	 * guarantee that the work finishes executing. But the
5539  	 * delayed work will re-arm itself again after that.
5540  	 */
5541  	flush_delayed_work(&mdsc->delayed_work);
5542  
5543  	if (mdsc->mdsmap)
5544  		ceph_mdsmap_destroy(mdsc->mdsmap);
5545  	kfree(mdsc->sessions);
5546  	ceph_caps_finalize(mdsc);
5547  	ceph_pool_perm_destroy(mdsc);
5548  }
5549  
ceph_mdsc_destroy(struct ceph_fs_client * fsc)5550  void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
5551  {
5552  	struct ceph_mds_client *mdsc = fsc->mdsc;
5553  	dout("mdsc_destroy %p\n", mdsc);
5554  
5555  	if (!mdsc)
5556  		return;
5557  
5558  	/* flush out any connection work with references to us */
5559  	ceph_msgr_flush();
5560  
5561  	ceph_mdsc_stop(mdsc);
5562  
5563  	ceph_metric_destroy(&mdsc->metric);
5564  
5565  	fsc->mdsc = NULL;
5566  	kfree(mdsc);
5567  	dout("mdsc_destroy %p done\n", mdsc);
5568  }
5569  
ceph_mdsc_handle_fsmap(struct ceph_mds_client * mdsc,struct ceph_msg * msg)5570  void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5571  {
5572  	struct ceph_fs_client *fsc = mdsc->fsc;
5573  	const char *mds_namespace = fsc->mount_options->mds_namespace;
5574  	void *p = msg->front.iov_base;
5575  	void *end = p + msg->front.iov_len;
5576  	u32 epoch;
5577  	u32 num_fs;
5578  	u32 mount_fscid = (u32)-1;
5579  	int err = -EINVAL;
5580  
5581  	ceph_decode_need(&p, end, sizeof(u32), bad);
5582  	epoch = ceph_decode_32(&p);
5583  
5584  	dout("handle_fsmap epoch %u\n", epoch);
5585  
5586  	/* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */
5587  	ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad);
5588  
5589  	ceph_decode_32_safe(&p, end, num_fs, bad);
5590  	while (num_fs-- > 0) {
5591  		void *info_p, *info_end;
5592  		u32 info_len;
5593  		u32 fscid, namelen;
5594  
5595  		ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
5596  		p += 2;		// info_v, info_cv
5597  		info_len = ceph_decode_32(&p);
5598  		ceph_decode_need(&p, end, info_len, bad);
5599  		info_p = p;
5600  		info_end = p + info_len;
5601  		p = info_end;
5602  
5603  		ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
5604  		fscid = ceph_decode_32(&info_p);
5605  		namelen = ceph_decode_32(&info_p);
5606  		ceph_decode_need(&info_p, info_end, namelen, bad);
5607  
5608  		if (mds_namespace &&
5609  		    strlen(mds_namespace) == namelen &&
5610  		    !strncmp(mds_namespace, (char *)info_p, namelen)) {
5611  			mount_fscid = fscid;
5612  			break;
5613  		}
5614  	}
5615  
5616  	ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
5617  	if (mount_fscid != (u32)-1) {
5618  		fsc->client->monc.fs_cluster_id = mount_fscid;
5619  		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
5620  				   0, true);
5621  		ceph_monc_renew_subs(&fsc->client->monc);
5622  	} else {
5623  		err = -ENOENT;
5624  		goto err_out;
5625  	}
5626  	return;
5627  
5628  bad:
5629  	pr_err("error decoding fsmap %d. Shutting down mount.\n", err);
5630  	ceph_umount_begin(mdsc->fsc->sb);
5631  	ceph_msg_dump(msg);
5632  err_out:
5633  	mutex_lock(&mdsc->mutex);
5634  	mdsc->mdsmap_err = err;
5635  	__wake_requests(mdsc, &mdsc->waiting_for_map);
5636  	mutex_unlock(&mdsc->mutex);
5637  }
5638  
5639  /*
5640   * handle mds map update.
5641   */
ceph_mdsc_handle_mdsmap(struct ceph_mds_client * mdsc,struct ceph_msg * msg)5642  void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5643  {
5644  	u32 epoch;
5645  	u32 maplen;
5646  	void *p = msg->front.iov_base;
5647  	void *end = p + msg->front.iov_len;
5648  	struct ceph_mdsmap *newmap, *oldmap;
5649  	struct ceph_fsid fsid;
5650  	int err = -EINVAL;
5651  
5652  	ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5653  	ceph_decode_copy(&p, &fsid, sizeof(fsid));
5654  	if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
5655  		return;
5656  	epoch = ceph_decode_32(&p);
5657  	maplen = ceph_decode_32(&p);
5658  	dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5659  
5660  	/* do we need it? */
5661  	mutex_lock(&mdsc->mutex);
5662  	if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5663  		dout("handle_map epoch %u <= our %u\n",
5664  		     epoch, mdsc->mdsmap->m_epoch);
5665  		mutex_unlock(&mdsc->mutex);
5666  		return;
5667  	}
5668  
5669  	newmap = ceph_mdsmap_decode(mdsc, &p, end, ceph_msgr2(mdsc->fsc->client));
5670  	if (IS_ERR(newmap)) {
5671  		err = PTR_ERR(newmap);
5672  		goto bad_unlock;
5673  	}
5674  
5675  	/* swap into place */
5676  	if (mdsc->mdsmap) {
5677  		oldmap = mdsc->mdsmap;
5678  		mdsc->mdsmap = newmap;
5679  		check_new_map(mdsc, newmap, oldmap);
5680  		ceph_mdsmap_destroy(oldmap);
5681  	} else {
5682  		mdsc->mdsmap = newmap;  /* first mds map */
5683  	}
5684  	mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5685  					MAX_LFS_FILESIZE);
5686  
5687  	__wake_requests(mdsc, &mdsc->waiting_for_map);
5688  	ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5689  			  mdsc->mdsmap->m_epoch);
5690  
5691  	mutex_unlock(&mdsc->mutex);
5692  	schedule_delayed(mdsc, 0);
5693  	return;
5694  
5695  bad_unlock:
5696  	mutex_unlock(&mdsc->mutex);
5697  bad:
5698  	pr_err("error decoding mdsmap %d. Shutting down mount.\n", err);
5699  	ceph_umount_begin(mdsc->fsc->sb);
5700  	ceph_msg_dump(msg);
5701  	return;
5702  }
5703  
mds_get_con(struct ceph_connection * con)5704  static struct ceph_connection *mds_get_con(struct ceph_connection *con)
5705  {
5706  	struct ceph_mds_session *s = con->private;
5707  
5708  	if (ceph_get_mds_session(s))
5709  		return con;
5710  	return NULL;
5711  }
5712  
mds_put_con(struct ceph_connection * con)5713  static void mds_put_con(struct ceph_connection *con)
5714  {
5715  	struct ceph_mds_session *s = con->private;
5716  
5717  	ceph_put_mds_session(s);
5718  }
5719  
5720  /*
5721   * if the client is unresponsive for long enough, the mds will kill
5722   * the session entirely.
5723   */
mds_peer_reset(struct ceph_connection * con)5724  static void mds_peer_reset(struct ceph_connection *con)
5725  {
5726  	struct ceph_mds_session *s = con->private;
5727  	struct ceph_mds_client *mdsc = s->s_mdsc;
5728  
5729  	pr_warn("mds%d closed our session\n", s->s_mds);
5730  	if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO)
5731  		send_mds_reconnect(mdsc, s);
5732  }
5733  
mds_dispatch(struct ceph_connection * con,struct ceph_msg * msg)5734  static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5735  {
5736  	struct ceph_mds_session *s = con->private;
5737  	struct ceph_mds_client *mdsc = s->s_mdsc;
5738  	int type = le16_to_cpu(msg->hdr.type);
5739  
5740  	mutex_lock(&mdsc->mutex);
5741  	if (__verify_registered_session(mdsc, s) < 0) {
5742  		mutex_unlock(&mdsc->mutex);
5743  		goto out;
5744  	}
5745  	mutex_unlock(&mdsc->mutex);
5746  
5747  	switch (type) {
5748  	case CEPH_MSG_MDS_MAP:
5749  		ceph_mdsc_handle_mdsmap(mdsc, msg);
5750  		break;
5751  	case CEPH_MSG_FS_MAP_USER:
5752  		ceph_mdsc_handle_fsmap(mdsc, msg);
5753  		break;
5754  	case CEPH_MSG_CLIENT_SESSION:
5755  		handle_session(s, msg);
5756  		break;
5757  	case CEPH_MSG_CLIENT_REPLY:
5758  		handle_reply(s, msg);
5759  		break;
5760  	case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5761  		handle_forward(mdsc, s, msg);
5762  		break;
5763  	case CEPH_MSG_CLIENT_CAPS:
5764  		ceph_handle_caps(s, msg);
5765  		break;
5766  	case CEPH_MSG_CLIENT_SNAP:
5767  		ceph_handle_snap(mdsc, s, msg);
5768  		break;
5769  	case CEPH_MSG_CLIENT_LEASE:
5770  		handle_lease(mdsc, s, msg);
5771  		break;
5772  	case CEPH_MSG_CLIENT_QUOTA:
5773  		ceph_handle_quota(mdsc, s, msg);
5774  		break;
5775  
5776  	default:
5777  		pr_err("received unknown message type %d %s\n", type,
5778  		       ceph_msg_type_name(type));
5779  	}
5780  out:
5781  	ceph_msg_put(msg);
5782  }
5783  
5784  /*
5785   * authentication
5786   */
5787  
5788  /*
5789   * Note: returned pointer is the address of a structure that's
5790   * managed separately.  Caller must *not* attempt to free it.
5791   */
5792  static struct ceph_auth_handshake *
mds_get_authorizer(struct ceph_connection * con,int * proto,int force_new)5793  mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new)
5794  {
5795  	struct ceph_mds_session *s = con->private;
5796  	struct ceph_mds_client *mdsc = s->s_mdsc;
5797  	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5798  	struct ceph_auth_handshake *auth = &s->s_auth;
5799  	int ret;
5800  
5801  	ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5802  					 force_new, proto, NULL, NULL);
5803  	if (ret)
5804  		return ERR_PTR(ret);
5805  
5806  	return auth;
5807  }
5808  
mds_add_authorizer_challenge(struct ceph_connection * con,void * challenge_buf,int challenge_buf_len)5809  static int mds_add_authorizer_challenge(struct ceph_connection *con,
5810  				    void *challenge_buf, int challenge_buf_len)
5811  {
5812  	struct ceph_mds_session *s = con->private;
5813  	struct ceph_mds_client *mdsc = s->s_mdsc;
5814  	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5815  
5816  	return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5817  					    challenge_buf, challenge_buf_len);
5818  }
5819  
mds_verify_authorizer_reply(struct ceph_connection * con)5820  static int mds_verify_authorizer_reply(struct ceph_connection *con)
5821  {
5822  	struct ceph_mds_session *s = con->private;
5823  	struct ceph_mds_client *mdsc = s->s_mdsc;
5824  	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5825  	struct ceph_auth_handshake *auth = &s->s_auth;
5826  
5827  	return ceph_auth_verify_authorizer_reply(ac, auth->authorizer,
5828  		auth->authorizer_reply_buf, auth->authorizer_reply_buf_len,
5829  		NULL, NULL, NULL, NULL);
5830  }
5831  
mds_invalidate_authorizer(struct ceph_connection * con)5832  static int mds_invalidate_authorizer(struct ceph_connection *con)
5833  {
5834  	struct ceph_mds_session *s = con->private;
5835  	struct ceph_mds_client *mdsc = s->s_mdsc;
5836  	struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5837  
5838  	ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5839  
5840  	return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5841  }
5842  
mds_get_auth_request(struct ceph_connection * con,void * buf,int * buf_len,void ** authorizer,int * authorizer_len)5843  static int mds_get_auth_request(struct ceph_connection *con,
5844  				void *buf, int *buf_len,
5845  				void **authorizer, int *authorizer_len)
5846  {
5847  	struct ceph_mds_session *s = con->private;
5848  	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5849  	struct ceph_auth_handshake *auth = &s->s_auth;
5850  	int ret;
5851  
5852  	ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS,
5853  				       buf, buf_len);
5854  	if (ret)
5855  		return ret;
5856  
5857  	*authorizer = auth->authorizer_buf;
5858  	*authorizer_len = auth->authorizer_buf_len;
5859  	return 0;
5860  }
5861  
mds_handle_auth_reply_more(struct ceph_connection * con,void * reply,int reply_len,void * buf,int * buf_len,void ** authorizer,int * authorizer_len)5862  static int mds_handle_auth_reply_more(struct ceph_connection *con,
5863  				      void *reply, int reply_len,
5864  				      void *buf, int *buf_len,
5865  				      void **authorizer, int *authorizer_len)
5866  {
5867  	struct ceph_mds_session *s = con->private;
5868  	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5869  	struct ceph_auth_handshake *auth = &s->s_auth;
5870  	int ret;
5871  
5872  	ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len,
5873  					      buf, buf_len);
5874  	if (ret)
5875  		return ret;
5876  
5877  	*authorizer = auth->authorizer_buf;
5878  	*authorizer_len = auth->authorizer_buf_len;
5879  	return 0;
5880  }
5881  
mds_handle_auth_done(struct ceph_connection * con,u64 global_id,void * reply,int reply_len,u8 * session_key,int * session_key_len,u8 * con_secret,int * con_secret_len)5882  static int mds_handle_auth_done(struct ceph_connection *con,
5883  				u64 global_id, void *reply, int reply_len,
5884  				u8 *session_key, int *session_key_len,
5885  				u8 *con_secret, int *con_secret_len)
5886  {
5887  	struct ceph_mds_session *s = con->private;
5888  	struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth;
5889  	struct ceph_auth_handshake *auth = &s->s_auth;
5890  
5891  	return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len,
5892  					       session_key, session_key_len,
5893  					       con_secret, con_secret_len);
5894  }
5895  
mds_handle_auth_bad_method(struct ceph_connection * con,int used_proto,int result,const int * allowed_protos,int proto_cnt,const int * allowed_modes,int mode_cnt)5896  static int mds_handle_auth_bad_method(struct ceph_connection *con,
5897  				      int used_proto, int result,
5898  				      const int *allowed_protos, int proto_cnt,
5899  				      const int *allowed_modes, int mode_cnt)
5900  {
5901  	struct ceph_mds_session *s = con->private;
5902  	struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc;
5903  	int ret;
5904  
5905  	if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS,
5906  					    used_proto, result,
5907  					    allowed_protos, proto_cnt,
5908  					    allowed_modes, mode_cnt)) {
5909  		ret = ceph_monc_validate_auth(monc);
5910  		if (ret)
5911  			return ret;
5912  	}
5913  
5914  	return -EACCES;
5915  }
5916  
mds_alloc_msg(struct ceph_connection * con,struct ceph_msg_header * hdr,int * skip)5917  static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5918  				struct ceph_msg_header *hdr, int *skip)
5919  {
5920  	struct ceph_msg *msg;
5921  	int type = (int) le16_to_cpu(hdr->type);
5922  	int front_len = (int) le32_to_cpu(hdr->front_len);
5923  
5924  	if (con->in_msg)
5925  		return con->in_msg;
5926  
5927  	*skip = 0;
5928  	msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5929  	if (!msg) {
5930  		pr_err("unable to allocate msg type %d len %d\n",
5931  		       type, front_len);
5932  		return NULL;
5933  	}
5934  
5935  	return msg;
5936  }
5937  
mds_sign_message(struct ceph_msg * msg)5938  static int mds_sign_message(struct ceph_msg *msg)
5939  {
5940         struct ceph_mds_session *s = msg->con->private;
5941         struct ceph_auth_handshake *auth = &s->s_auth;
5942  
5943         return ceph_auth_sign_message(auth, msg);
5944  }
5945  
mds_check_message_signature(struct ceph_msg * msg)5946  static int mds_check_message_signature(struct ceph_msg *msg)
5947  {
5948         struct ceph_mds_session *s = msg->con->private;
5949         struct ceph_auth_handshake *auth = &s->s_auth;
5950  
5951         return ceph_auth_check_message_signature(auth, msg);
5952  }
5953  
5954  static const struct ceph_connection_operations mds_con_ops = {
5955  	.get = mds_get_con,
5956  	.put = mds_put_con,
5957  	.alloc_msg = mds_alloc_msg,
5958  	.dispatch = mds_dispatch,
5959  	.peer_reset = mds_peer_reset,
5960  	.get_authorizer = mds_get_authorizer,
5961  	.add_authorizer_challenge = mds_add_authorizer_challenge,
5962  	.verify_authorizer_reply = mds_verify_authorizer_reply,
5963  	.invalidate_authorizer = mds_invalidate_authorizer,
5964  	.sign_message = mds_sign_message,
5965  	.check_message_signature = mds_check_message_signature,
5966  	.get_auth_request = mds_get_auth_request,
5967  	.handle_auth_reply_more = mds_handle_auth_reply_more,
5968  	.handle_auth_done = mds_handle_auth_done,
5969  	.handle_auth_bad_method = mds_handle_auth_bad_method,
5970  };
5971  
5972  /* eof */
5973