xref: /openbmc/linux/net/ceph/mon_client.c (revision b9ccfda2)
1 #include <linux/ceph/ceph_debug.h>
2 
3 #include <linux/module.h>
4 #include <linux/types.h>
5 #include <linux/slab.h>
6 #include <linux/random.h>
7 #include <linux/sched.h>
8 
9 #include <linux/ceph/mon_client.h>
10 #include <linux/ceph/libceph.h>
11 #include <linux/ceph/debugfs.h>
12 #include <linux/ceph/decode.h>
13 #include <linux/ceph/auth.h>
14 
15 /*
16  * Interact with Ceph monitor cluster.  Handle requests for new map
17  * versions, and periodically resend as needed.  Also implement
18  * statfs() and umount().
19  *
20  * A small cluster of Ceph "monitors" are responsible for managing critical
21  * cluster configuration and state information.  An odd number (e.g., 3, 5)
22  * of cmon daemons use a modified version of the Paxos part-time parliament
23  * algorithm to manage the MDS map (mds cluster membership), OSD map, and
24  * list of clients who have mounted the file system.
25  *
26  * We maintain an open, active session with a monitor at all times in order to
27  * receive timely MDSMap updates.  We periodically send a keepalive byte on the
28  * TCP socket to ensure we detect a failure.  If the connection does break, we
29  * randomly hunt for a new monitor.  Once the connection is reestablished, we
30  * resend any outstanding requests.
31  */
32 
33 static const struct ceph_connection_operations mon_con_ops;
34 
35 static int __validate_auth(struct ceph_mon_client *monc);
36 
37 /*
38  * Decode a monmap blob (e.g., during mount).
39  */
40 struct ceph_monmap *ceph_monmap_decode(void *p, void *end)
41 {
42 	struct ceph_monmap *m = NULL;
43 	int i, err = -EINVAL;
44 	struct ceph_fsid fsid;
45 	u32 epoch, num_mon;
46 	u16 version;
47 	u32 len;
48 
49 	ceph_decode_32_safe(&p, end, len, bad);
50 	ceph_decode_need(&p, end, len, bad);
51 
52 	dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p));
53 
54 	ceph_decode_16_safe(&p, end, version, bad);
55 
56 	ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad);
57 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
58 	epoch = ceph_decode_32(&p);
59 
60 	num_mon = ceph_decode_32(&p);
61 	ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad);
62 
63 	if (num_mon >= CEPH_MAX_MON)
64 		goto bad;
65 	m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS);
66 	if (m == NULL)
67 		return ERR_PTR(-ENOMEM);
68 	m->fsid = fsid;
69 	m->epoch = epoch;
70 	m->num_mon = num_mon;
71 	ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0]));
72 	for (i = 0; i < num_mon; i++)
73 		ceph_decode_addr(&m->mon_inst[i].addr);
74 
75 	dout("monmap_decode epoch %d, num_mon %d\n", m->epoch,
76 	     m->num_mon);
77 	for (i = 0; i < m->num_mon; i++)
78 		dout("monmap_decode  mon%d is %s\n", i,
79 		     ceph_pr_addr(&m->mon_inst[i].addr.in_addr));
80 	return m;
81 
82 bad:
83 	dout("monmap_decode failed with %d\n", err);
84 	kfree(m);
85 	return ERR_PTR(err);
86 }
87 
88 /*
89  * return true if *addr is included in the monmap.
90  */
91 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
92 {
93 	int i;
94 
95 	for (i = 0; i < m->num_mon; i++)
96 		if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0)
97 			return 1;
98 	return 0;
99 }
100 
101 /*
102  * Send an auth request.
103  */
104 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
105 {
106 	monc->pending_auth = 1;
107 	monc->m_auth->front.iov_len = len;
108 	monc->m_auth->hdr.front_len = cpu_to_le32(len);
109 	ceph_con_revoke(monc->con, monc->m_auth);
110 	ceph_msg_get(monc->m_auth);  /* keep our ref */
111 	ceph_con_send(monc->con, monc->m_auth);
112 }
113 
114 /*
115  * Close monitor session, if any.
116  */
117 static void __close_session(struct ceph_mon_client *monc)
118 {
119 	dout("__close_session closing mon%d\n", monc->cur_mon);
120 	ceph_con_revoke(monc->con, monc->m_auth);
121 	ceph_con_close(monc->con);
122 	monc->cur_mon = -1;
123 	monc->pending_auth = 0;
124 	ceph_auth_reset(monc->auth);
125 }
126 
127 /*
128  * Open a session with a (new) monitor.
129  */
130 static int __open_session(struct ceph_mon_client *monc)
131 {
132 	char r;
133 	int ret;
134 
135 	if (monc->cur_mon < 0) {
136 		get_random_bytes(&r, 1);
137 		monc->cur_mon = r % monc->monmap->num_mon;
138 		dout("open_session num=%d r=%d -> mon%d\n",
139 		     monc->monmap->num_mon, r, monc->cur_mon);
140 		monc->sub_sent = 0;
141 		monc->sub_renew_after = jiffies;  /* i.e., expired */
142 		monc->want_next_osdmap = !!monc->want_next_osdmap;
143 
144 		dout("open_session mon%d opening\n", monc->cur_mon);
145 		monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
146 		monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
147 		ceph_con_open(monc->con,
148 			      &monc->monmap->mon_inst[monc->cur_mon].addr);
149 
150 		/* initiatiate authentication handshake */
151 		ret = ceph_auth_build_hello(monc->auth,
152 					    monc->m_auth->front.iov_base,
153 					    monc->m_auth->front_max);
154 		__send_prepared_auth_request(monc, ret);
155 	} else {
156 		dout("open_session mon%d already open\n", monc->cur_mon);
157 	}
158 	return 0;
159 }
160 
161 static bool __sub_expired(struct ceph_mon_client *monc)
162 {
163 	return time_after_eq(jiffies, monc->sub_renew_after);
164 }
165 
166 /*
167  * Reschedule delayed work timer.
168  */
169 static void __schedule_delayed(struct ceph_mon_client *monc)
170 {
171 	unsigned int delay;
172 
173 	if (monc->cur_mon < 0 || __sub_expired(monc))
174 		delay = 10 * HZ;
175 	else
176 		delay = 20 * HZ;
177 	dout("__schedule_delayed after %u\n", delay);
178 	schedule_delayed_work(&monc->delayed_work, delay);
179 }
180 
181 /*
182  * Send subscribe request for mdsmap and/or osdmap.
183  */
184 static void __send_subscribe(struct ceph_mon_client *monc)
185 {
186 	dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
187 	     (unsigned int)monc->sub_sent, __sub_expired(monc),
188 	     monc->want_next_osdmap);
189 	if ((__sub_expired(monc) && !monc->sub_sent) ||
190 	    monc->want_next_osdmap == 1) {
191 		struct ceph_msg *msg = monc->m_subscribe;
192 		struct ceph_mon_subscribe_item *i;
193 		void *p, *end;
194 		int num;
195 
196 		p = msg->front.iov_base;
197 		end = p + msg->front_max;
198 
199 		num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
200 		ceph_encode_32(&p, num);
201 
202 		if (monc->want_next_osdmap) {
203 			dout("__send_subscribe to 'osdmap' %u\n",
204 			     (unsigned int)monc->have_osdmap);
205 			ceph_encode_string(&p, end, "osdmap", 6);
206 			i = p;
207 			i->have = cpu_to_le64(monc->have_osdmap);
208 			i->onetime = 1;
209 			p += sizeof(*i);
210 			monc->want_next_osdmap = 2;  /* requested */
211 		}
212 		if (monc->want_mdsmap) {
213 			dout("__send_subscribe to 'mdsmap' %u+\n",
214 			     (unsigned int)monc->have_mdsmap);
215 			ceph_encode_string(&p, end, "mdsmap", 6);
216 			i = p;
217 			i->have = cpu_to_le64(monc->have_mdsmap);
218 			i->onetime = 0;
219 			p += sizeof(*i);
220 		}
221 		ceph_encode_string(&p, end, "monmap", 6);
222 		i = p;
223 		i->have = 0;
224 		i->onetime = 0;
225 		p += sizeof(*i);
226 
227 		msg->front.iov_len = p - msg->front.iov_base;
228 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
229 		ceph_con_revoke(monc->con, msg);
230 		ceph_con_send(monc->con, ceph_msg_get(msg));
231 
232 		monc->sub_sent = jiffies | 1;  /* never 0 */
233 	}
234 }
235 
236 static void handle_subscribe_ack(struct ceph_mon_client *monc,
237 				 struct ceph_msg *msg)
238 {
239 	unsigned int seconds;
240 	struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
241 
242 	if (msg->front.iov_len < sizeof(*h))
243 		goto bad;
244 	seconds = le32_to_cpu(h->duration);
245 
246 	mutex_lock(&monc->mutex);
247 	if (monc->hunting) {
248 		pr_info("mon%d %s session established\n",
249 			monc->cur_mon,
250 			ceph_pr_addr(&monc->con->peer_addr.in_addr));
251 		monc->hunting = false;
252 	}
253 	dout("handle_subscribe_ack after %d seconds\n", seconds);
254 	monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
255 	monc->sub_sent = 0;
256 	mutex_unlock(&monc->mutex);
257 	return;
258 bad:
259 	pr_err("got corrupt subscribe-ack msg\n");
260 	ceph_msg_dump(msg);
261 }
262 
263 /*
264  * Keep track of which maps we have
265  */
266 int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
267 {
268 	mutex_lock(&monc->mutex);
269 	monc->have_mdsmap = got;
270 	mutex_unlock(&monc->mutex);
271 	return 0;
272 }
273 EXPORT_SYMBOL(ceph_monc_got_mdsmap);
274 
275 int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
276 {
277 	mutex_lock(&monc->mutex);
278 	monc->have_osdmap = got;
279 	monc->want_next_osdmap = 0;
280 	mutex_unlock(&monc->mutex);
281 	return 0;
282 }
283 
284 /*
285  * Register interest in the next osdmap
286  */
287 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
288 {
289 	dout("request_next_osdmap have %u\n", monc->have_osdmap);
290 	mutex_lock(&monc->mutex);
291 	if (!monc->want_next_osdmap)
292 		monc->want_next_osdmap = 1;
293 	if (monc->want_next_osdmap < 2)
294 		__send_subscribe(monc);
295 	mutex_unlock(&monc->mutex);
296 }
297 
298 /*
299  *
300  */
301 int ceph_monc_open_session(struct ceph_mon_client *monc)
302 {
303 	mutex_lock(&monc->mutex);
304 	__open_session(monc);
305 	__schedule_delayed(monc);
306 	mutex_unlock(&monc->mutex);
307 	return 0;
308 }
309 EXPORT_SYMBOL(ceph_monc_open_session);
310 
311 /*
312  * The monitor responds with mount ack indicate mount success.  The
313  * included client ticket allows the client to talk to MDSs and OSDs.
314  */
315 static void ceph_monc_handle_map(struct ceph_mon_client *monc,
316 				 struct ceph_msg *msg)
317 {
318 	struct ceph_client *client = monc->client;
319 	struct ceph_monmap *monmap = NULL, *old = monc->monmap;
320 	void *p, *end;
321 
322 	mutex_lock(&monc->mutex);
323 
324 	dout("handle_monmap\n");
325 	p = msg->front.iov_base;
326 	end = p + msg->front.iov_len;
327 
328 	monmap = ceph_monmap_decode(p, end);
329 	if (IS_ERR(monmap)) {
330 		pr_err("problem decoding monmap, %d\n",
331 		       (int)PTR_ERR(monmap));
332 		goto out;
333 	}
334 
335 	if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) {
336 		kfree(monmap);
337 		goto out;
338 	}
339 
340 	client->monc.monmap = monmap;
341 	kfree(old);
342 
343 	if (!client->have_fsid) {
344 		client->have_fsid = true;
345 		mutex_unlock(&monc->mutex);
346 		/*
347 		 * do debugfs initialization without mutex to avoid
348 		 * creating a locking dependency
349 		 */
350 		ceph_debugfs_client_init(client);
351 		goto out_unlocked;
352 	}
353 out:
354 	mutex_unlock(&monc->mutex);
355 out_unlocked:
356 	wake_up_all(&client->auth_wq);
357 }
358 
359 /*
360  * generic requests (e.g., statfs, poolop)
361  */
362 static struct ceph_mon_generic_request *__lookup_generic_req(
363 	struct ceph_mon_client *monc, u64 tid)
364 {
365 	struct ceph_mon_generic_request *req;
366 	struct rb_node *n = monc->generic_request_tree.rb_node;
367 
368 	while (n) {
369 		req = rb_entry(n, struct ceph_mon_generic_request, node);
370 		if (tid < req->tid)
371 			n = n->rb_left;
372 		else if (tid > req->tid)
373 			n = n->rb_right;
374 		else
375 			return req;
376 	}
377 	return NULL;
378 }
379 
380 static void __insert_generic_request(struct ceph_mon_client *monc,
381 			    struct ceph_mon_generic_request *new)
382 {
383 	struct rb_node **p = &monc->generic_request_tree.rb_node;
384 	struct rb_node *parent = NULL;
385 	struct ceph_mon_generic_request *req = NULL;
386 
387 	while (*p) {
388 		parent = *p;
389 		req = rb_entry(parent, struct ceph_mon_generic_request, node);
390 		if (new->tid < req->tid)
391 			p = &(*p)->rb_left;
392 		else if (new->tid > req->tid)
393 			p = &(*p)->rb_right;
394 		else
395 			BUG();
396 	}
397 
398 	rb_link_node(&new->node, parent, p);
399 	rb_insert_color(&new->node, &monc->generic_request_tree);
400 }
401 
402 static void release_generic_request(struct kref *kref)
403 {
404 	struct ceph_mon_generic_request *req =
405 		container_of(kref, struct ceph_mon_generic_request, kref);
406 
407 	if (req->reply)
408 		ceph_msg_put(req->reply);
409 	if (req->request)
410 		ceph_msg_put(req->request);
411 
412 	kfree(req);
413 }
414 
415 static void put_generic_request(struct ceph_mon_generic_request *req)
416 {
417 	kref_put(&req->kref, release_generic_request);
418 }
419 
420 static void get_generic_request(struct ceph_mon_generic_request *req)
421 {
422 	kref_get(&req->kref);
423 }
424 
425 static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
426 					 struct ceph_msg_header *hdr,
427 					 int *skip)
428 {
429 	struct ceph_mon_client *monc = con->private;
430 	struct ceph_mon_generic_request *req;
431 	u64 tid = le64_to_cpu(hdr->tid);
432 	struct ceph_msg *m;
433 
434 	mutex_lock(&monc->mutex);
435 	req = __lookup_generic_req(monc, tid);
436 	if (!req) {
437 		dout("get_generic_reply %lld dne\n", tid);
438 		*skip = 1;
439 		m = NULL;
440 	} else {
441 		dout("get_generic_reply %lld got %p\n", tid, req->reply);
442 		m = ceph_msg_get(req->reply);
443 		/*
444 		 * we don't need to track the connection reading into
445 		 * this reply because we only have one open connection
446 		 * at a time, ever.
447 		 */
448 	}
449 	mutex_unlock(&monc->mutex);
450 	return m;
451 }
452 
453 static int do_generic_request(struct ceph_mon_client *monc,
454 			      struct ceph_mon_generic_request *req)
455 {
456 	int err;
457 
458 	/* register request */
459 	mutex_lock(&monc->mutex);
460 	req->tid = ++monc->last_tid;
461 	req->request->hdr.tid = cpu_to_le64(req->tid);
462 	__insert_generic_request(monc, req);
463 	monc->num_generic_requests++;
464 	ceph_con_send(monc->con, ceph_msg_get(req->request));
465 	mutex_unlock(&monc->mutex);
466 
467 	err = wait_for_completion_interruptible(&req->completion);
468 
469 	mutex_lock(&monc->mutex);
470 	rb_erase(&req->node, &monc->generic_request_tree);
471 	monc->num_generic_requests--;
472 	mutex_unlock(&monc->mutex);
473 
474 	if (!err)
475 		err = req->result;
476 	return err;
477 }
478 
479 /*
480  * statfs
481  */
482 static void handle_statfs_reply(struct ceph_mon_client *monc,
483 				struct ceph_msg *msg)
484 {
485 	struct ceph_mon_generic_request *req;
486 	struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
487 	u64 tid = le64_to_cpu(msg->hdr.tid);
488 
489 	if (msg->front.iov_len != sizeof(*reply))
490 		goto bad;
491 	dout("handle_statfs_reply %p tid %llu\n", msg, tid);
492 
493 	mutex_lock(&monc->mutex);
494 	req = __lookup_generic_req(monc, tid);
495 	if (req) {
496 		*(struct ceph_statfs *)req->buf = reply->st;
497 		req->result = 0;
498 		get_generic_request(req);
499 	}
500 	mutex_unlock(&monc->mutex);
501 	if (req) {
502 		complete_all(&req->completion);
503 		put_generic_request(req);
504 	}
505 	return;
506 
507 bad:
508 	pr_err("corrupt generic reply, tid %llu\n", tid);
509 	ceph_msg_dump(msg);
510 }
511 
512 /*
513  * Do a synchronous statfs().
514  */
515 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
516 {
517 	struct ceph_mon_generic_request *req;
518 	struct ceph_mon_statfs *h;
519 	int err;
520 
521 	req = kzalloc(sizeof(*req), GFP_NOFS);
522 	if (!req)
523 		return -ENOMEM;
524 
525 	kref_init(&req->kref);
526 	req->buf = buf;
527 	req->buf_len = sizeof(*buf);
528 	init_completion(&req->completion);
529 
530 	err = -ENOMEM;
531 	req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
532 				    true);
533 	if (!req->request)
534 		goto out;
535 	req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
536 				  true);
537 	if (!req->reply)
538 		goto out;
539 
540 	/* fill out request */
541 	h = req->request->front.iov_base;
542 	h->monhdr.have_version = 0;
543 	h->monhdr.session_mon = cpu_to_le16(-1);
544 	h->monhdr.session_mon_tid = 0;
545 	h->fsid = monc->monmap->fsid;
546 
547 	err = do_generic_request(monc, req);
548 
549 out:
550 	kref_put(&req->kref, release_generic_request);
551 	return err;
552 }
553 EXPORT_SYMBOL(ceph_monc_do_statfs);
554 
555 /*
556  * pool ops
557  */
558 static int get_poolop_reply_buf(const char *src, size_t src_len,
559 				char *dst, size_t dst_len)
560 {
561 	u32 buf_len;
562 
563 	if (src_len != sizeof(u32) + dst_len)
564 		return -EINVAL;
565 
566 	buf_len = le32_to_cpu(*(u32 *)src);
567 	if (buf_len != dst_len)
568 		return -EINVAL;
569 
570 	memcpy(dst, src + sizeof(u32), dst_len);
571 	return 0;
572 }
573 
574 static void handle_poolop_reply(struct ceph_mon_client *monc,
575 				struct ceph_msg *msg)
576 {
577 	struct ceph_mon_generic_request *req;
578 	struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
579 	u64 tid = le64_to_cpu(msg->hdr.tid);
580 
581 	if (msg->front.iov_len < sizeof(*reply))
582 		goto bad;
583 	dout("handle_poolop_reply %p tid %llu\n", msg, tid);
584 
585 	mutex_lock(&monc->mutex);
586 	req = __lookup_generic_req(monc, tid);
587 	if (req) {
588 		if (req->buf_len &&
589 		    get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
590 				     msg->front.iov_len - sizeof(*reply),
591 				     req->buf, req->buf_len) < 0) {
592 			mutex_unlock(&monc->mutex);
593 			goto bad;
594 		}
595 		req->result = le32_to_cpu(reply->reply_code);
596 		get_generic_request(req);
597 	}
598 	mutex_unlock(&monc->mutex);
599 	if (req) {
600 		complete(&req->completion);
601 		put_generic_request(req);
602 	}
603 	return;
604 
605 bad:
606 	pr_err("corrupt generic reply, tid %llu\n", tid);
607 	ceph_msg_dump(msg);
608 }
609 
610 /*
611  * Do a synchronous pool op.
612  */
613 int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
614 			u32 pool, u64 snapid,
615 			char *buf, int len)
616 {
617 	struct ceph_mon_generic_request *req;
618 	struct ceph_mon_poolop *h;
619 	int err;
620 
621 	req = kzalloc(sizeof(*req), GFP_NOFS);
622 	if (!req)
623 		return -ENOMEM;
624 
625 	kref_init(&req->kref);
626 	req->buf = buf;
627 	req->buf_len = len;
628 	init_completion(&req->completion);
629 
630 	err = -ENOMEM;
631 	req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
632 				    true);
633 	if (!req->request)
634 		goto out;
635 	req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS,
636 				  true);
637 	if (!req->reply)
638 		goto out;
639 
640 	/* fill out request */
641 	req->request->hdr.version = cpu_to_le16(2);
642 	h = req->request->front.iov_base;
643 	h->monhdr.have_version = 0;
644 	h->monhdr.session_mon = cpu_to_le16(-1);
645 	h->monhdr.session_mon_tid = 0;
646 	h->fsid = monc->monmap->fsid;
647 	h->pool = cpu_to_le32(pool);
648 	h->op = cpu_to_le32(op);
649 	h->auid = 0;
650 	h->snapid = cpu_to_le64(snapid);
651 	h->name_len = 0;
652 
653 	err = do_generic_request(monc, req);
654 
655 out:
656 	kref_put(&req->kref, release_generic_request);
657 	return err;
658 }
659 
660 int ceph_monc_create_snapid(struct ceph_mon_client *monc,
661 			    u32 pool, u64 *snapid)
662 {
663 	return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
664 				   pool, 0, (char *)snapid, sizeof(*snapid));
665 
666 }
667 EXPORT_SYMBOL(ceph_monc_create_snapid);
668 
669 int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
670 			    u32 pool, u64 snapid)
671 {
672 	return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
673 				   pool, snapid, 0, 0);
674 
675 }
676 
677 /*
678  * Resend pending generic requests.
679  */
680 static void __resend_generic_request(struct ceph_mon_client *monc)
681 {
682 	struct ceph_mon_generic_request *req;
683 	struct rb_node *p;
684 
685 	for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
686 		req = rb_entry(p, struct ceph_mon_generic_request, node);
687 		ceph_con_revoke(monc->con, req->request);
688 		ceph_con_send(monc->con, ceph_msg_get(req->request));
689 	}
690 }
691 
692 /*
693  * Delayed work.  If we haven't mounted yet, retry.  Otherwise,
694  * renew/retry subscription as needed (in case it is timing out, or we
695  * got an ENOMEM).  And keep the monitor connection alive.
696  */
697 static void delayed_work(struct work_struct *work)
698 {
699 	struct ceph_mon_client *monc =
700 		container_of(work, struct ceph_mon_client, delayed_work.work);
701 
702 	dout("monc delayed_work\n");
703 	mutex_lock(&monc->mutex);
704 	if (monc->hunting) {
705 		__close_session(monc);
706 		__open_session(monc);  /* continue hunting */
707 	} else {
708 		ceph_con_keepalive(monc->con);
709 
710 		__validate_auth(monc);
711 
712 		if (monc->auth->ops->is_authenticated(monc->auth))
713 			__send_subscribe(monc);
714 	}
715 	__schedule_delayed(monc);
716 	mutex_unlock(&monc->mutex);
717 }
718 
719 /*
720  * On startup, we build a temporary monmap populated with the IPs
721  * provided by mount(2).
722  */
723 static int build_initial_monmap(struct ceph_mon_client *monc)
724 {
725 	struct ceph_options *opt = monc->client->options;
726 	struct ceph_entity_addr *mon_addr = opt->mon_addr;
727 	int num_mon = opt->num_mon;
728 	int i;
729 
730 	/* build initial monmap */
731 	monc->monmap = kzalloc(sizeof(*monc->monmap) +
732 			       num_mon*sizeof(monc->monmap->mon_inst[0]),
733 			       GFP_KERNEL);
734 	if (!monc->monmap)
735 		return -ENOMEM;
736 	for (i = 0; i < num_mon; i++) {
737 		monc->monmap->mon_inst[i].addr = mon_addr[i];
738 		monc->monmap->mon_inst[i].addr.nonce = 0;
739 		monc->monmap->mon_inst[i].name.type =
740 			CEPH_ENTITY_TYPE_MON;
741 		monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
742 	}
743 	monc->monmap->num_mon = num_mon;
744 	monc->have_fsid = false;
745 	return 0;
746 }
747 
748 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
749 {
750 	int err = 0;
751 
752 	dout("init\n");
753 	memset(monc, 0, sizeof(*monc));
754 	monc->client = cl;
755 	monc->monmap = NULL;
756 	mutex_init(&monc->mutex);
757 
758 	err = build_initial_monmap(monc);
759 	if (err)
760 		goto out;
761 
762 	/* connection */
763 	monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
764 	if (!monc->con)
765 		goto out_monmap;
766 	ceph_con_init(monc->client->msgr, monc->con);
767 	monc->con->private = monc;
768 	monc->con->ops = &mon_con_ops;
769 
770 	/* authentication */
771 	monc->auth = ceph_auth_init(cl->options->name,
772 				    cl->options->key);
773 	if (IS_ERR(monc->auth)) {
774 		err = PTR_ERR(monc->auth);
775 		goto out_con;
776 	}
777 	monc->auth->want_keys =
778 		CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
779 		CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
780 
781 	/* msgs */
782 	err = -ENOMEM;
783 	monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
784 				     sizeof(struct ceph_mon_subscribe_ack),
785 				     GFP_NOFS, true);
786 	if (!monc->m_subscribe_ack)
787 		goto out_auth;
788 
789 	monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
790 					 true);
791 	if (!monc->m_subscribe)
792 		goto out_subscribe_ack;
793 
794 	monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
795 					  true);
796 	if (!monc->m_auth_reply)
797 		goto out_subscribe;
798 
799 	monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
800 	monc->pending_auth = 0;
801 	if (!monc->m_auth)
802 		goto out_auth_reply;
803 
804 	monc->cur_mon = -1;
805 	monc->hunting = true;
806 	monc->sub_renew_after = jiffies;
807 	monc->sub_sent = 0;
808 
809 	INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
810 	monc->generic_request_tree = RB_ROOT;
811 	monc->num_generic_requests = 0;
812 	monc->last_tid = 0;
813 
814 	monc->have_mdsmap = 0;
815 	monc->have_osdmap = 0;
816 	monc->want_next_osdmap = 1;
817 	return 0;
818 
819 out_auth_reply:
820 	ceph_msg_put(monc->m_auth_reply);
821 out_subscribe:
822 	ceph_msg_put(monc->m_subscribe);
823 out_subscribe_ack:
824 	ceph_msg_put(monc->m_subscribe_ack);
825 out_auth:
826 	ceph_auth_destroy(monc->auth);
827 out_con:
828 	monc->con->ops->put(monc->con);
829 out_monmap:
830 	kfree(monc->monmap);
831 out:
832 	return err;
833 }
834 EXPORT_SYMBOL(ceph_monc_init);
835 
836 void ceph_monc_stop(struct ceph_mon_client *monc)
837 {
838 	dout("stop\n");
839 	cancel_delayed_work_sync(&monc->delayed_work);
840 
841 	mutex_lock(&monc->mutex);
842 	__close_session(monc);
843 
844 	monc->con->private = NULL;
845 	monc->con->ops->put(monc->con);
846 	monc->con = NULL;
847 
848 	mutex_unlock(&monc->mutex);
849 
850 	/*
851 	 * flush msgr queue before we destroy ourselves to ensure that:
852 	 *  - any work that references our embedded con is finished.
853 	 *  - any osd_client or other work that may reference an authorizer
854 	 *    finishes before we shut down the auth subsystem.
855 	 */
856 	ceph_msgr_flush();
857 
858 	ceph_auth_destroy(monc->auth);
859 
860 	ceph_msg_put(monc->m_auth);
861 	ceph_msg_put(monc->m_auth_reply);
862 	ceph_msg_put(monc->m_subscribe);
863 	ceph_msg_put(monc->m_subscribe_ack);
864 
865 	kfree(monc->monmap);
866 }
867 EXPORT_SYMBOL(ceph_monc_stop);
868 
869 static void handle_auth_reply(struct ceph_mon_client *monc,
870 			      struct ceph_msg *msg)
871 {
872 	int ret;
873 	int was_auth = 0;
874 
875 	mutex_lock(&monc->mutex);
876 	if (monc->auth->ops)
877 		was_auth = monc->auth->ops->is_authenticated(monc->auth);
878 	monc->pending_auth = 0;
879 	ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
880 				     msg->front.iov_len,
881 				     monc->m_auth->front.iov_base,
882 				     monc->m_auth->front_max);
883 	if (ret < 0) {
884 		monc->client->auth_err = ret;
885 		wake_up_all(&monc->client->auth_wq);
886 	} else if (ret > 0) {
887 		__send_prepared_auth_request(monc, ret);
888 	} else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
889 		dout("authenticated, starting session\n");
890 
891 		monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
892 		monc->client->msgr->inst.name.num =
893 					cpu_to_le64(monc->auth->global_id);
894 
895 		__send_subscribe(monc);
896 		__resend_generic_request(monc);
897 	}
898 	mutex_unlock(&monc->mutex);
899 }
900 
901 static int __validate_auth(struct ceph_mon_client *monc)
902 {
903 	int ret;
904 
905 	if (monc->pending_auth)
906 		return 0;
907 
908 	ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
909 			      monc->m_auth->front_max);
910 	if (ret <= 0)
911 		return ret; /* either an error, or no need to authenticate */
912 	__send_prepared_auth_request(monc, ret);
913 	return 0;
914 }
915 
916 int ceph_monc_validate_auth(struct ceph_mon_client *monc)
917 {
918 	int ret;
919 
920 	mutex_lock(&monc->mutex);
921 	ret = __validate_auth(monc);
922 	mutex_unlock(&monc->mutex);
923 	return ret;
924 }
925 EXPORT_SYMBOL(ceph_monc_validate_auth);
926 
927 /*
928  * handle incoming message
929  */
930 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
931 {
932 	struct ceph_mon_client *monc = con->private;
933 	int type = le16_to_cpu(msg->hdr.type);
934 
935 	if (!monc)
936 		return;
937 
938 	switch (type) {
939 	case CEPH_MSG_AUTH_REPLY:
940 		handle_auth_reply(monc, msg);
941 		break;
942 
943 	case CEPH_MSG_MON_SUBSCRIBE_ACK:
944 		handle_subscribe_ack(monc, msg);
945 		break;
946 
947 	case CEPH_MSG_STATFS_REPLY:
948 		handle_statfs_reply(monc, msg);
949 		break;
950 
951 	case CEPH_MSG_POOLOP_REPLY:
952 		handle_poolop_reply(monc, msg);
953 		break;
954 
955 	case CEPH_MSG_MON_MAP:
956 		ceph_monc_handle_map(monc, msg);
957 		break;
958 
959 	case CEPH_MSG_OSD_MAP:
960 		ceph_osdc_handle_map(&monc->client->osdc, msg);
961 		break;
962 
963 	default:
964 		/* can the chained handler handle it? */
965 		if (monc->client->extra_mon_dispatch &&
966 		    monc->client->extra_mon_dispatch(monc->client, msg) == 0)
967 			break;
968 
969 		pr_err("received unknown message type %d %s\n", type,
970 		       ceph_msg_type_name(type));
971 	}
972 	ceph_msg_put(msg);
973 }
974 
975 /*
976  * Allocate memory for incoming message
977  */
978 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
979 				      struct ceph_msg_header *hdr,
980 				      int *skip)
981 {
982 	struct ceph_mon_client *monc = con->private;
983 	int type = le16_to_cpu(hdr->type);
984 	int front_len = le32_to_cpu(hdr->front_len);
985 	struct ceph_msg *m = NULL;
986 
987 	*skip = 0;
988 
989 	switch (type) {
990 	case CEPH_MSG_MON_SUBSCRIBE_ACK:
991 		m = ceph_msg_get(monc->m_subscribe_ack);
992 		break;
993 	case CEPH_MSG_POOLOP_REPLY:
994 	case CEPH_MSG_STATFS_REPLY:
995 		return get_generic_reply(con, hdr, skip);
996 	case CEPH_MSG_AUTH_REPLY:
997 		m = ceph_msg_get(monc->m_auth_reply);
998 		break;
999 	case CEPH_MSG_MON_MAP:
1000 	case CEPH_MSG_MDS_MAP:
1001 	case CEPH_MSG_OSD_MAP:
1002 		m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1003 		break;
1004 	}
1005 
1006 	if (!m) {
1007 		pr_info("alloc_msg unknown type %d\n", type);
1008 		*skip = 1;
1009 	}
1010 	return m;
1011 }
1012 
1013 /*
1014  * If the monitor connection resets, pick a new monitor and resubmit
1015  * any pending requests.
1016  */
1017 static void mon_fault(struct ceph_connection *con)
1018 {
1019 	struct ceph_mon_client *monc = con->private;
1020 
1021 	if (!monc)
1022 		return;
1023 
1024 	dout("mon_fault\n");
1025 	mutex_lock(&monc->mutex);
1026 	if (!con->private)
1027 		goto out;
1028 
1029 	if (!monc->hunting)
1030 		pr_info("mon%d %s session lost, "
1031 			"hunting for new mon\n", monc->cur_mon,
1032 			ceph_pr_addr(&monc->con->peer_addr.in_addr));
1033 
1034 	__close_session(monc);
1035 	if (!monc->hunting) {
1036 		/* start hunting */
1037 		monc->hunting = true;
1038 		__open_session(monc);
1039 	} else {
1040 		/* already hunting, let's wait a bit */
1041 		__schedule_delayed(monc);
1042 	}
1043 out:
1044 	mutex_unlock(&monc->mutex);
1045 }
1046 
1047 static const struct ceph_connection_operations mon_con_ops = {
1048 	.get = ceph_con_get,
1049 	.put = ceph_con_put,
1050 	.dispatch = dispatch,
1051 	.fault = mon_fault,
1052 	.alloc_msg = mon_alloc_msg,
1053 };
1054