xref: /openbmc/linux/fs/dlm/rcom.c (revision 79e790ff)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 #include "dlm_internal.h"
13 #include "lockspace.h"
14 #include "member.h"
15 #include "lowcomms.h"
16 #include "midcomms.h"
17 #include "rcom.h"
18 #include "recover.h"
19 #include "dir.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "util.h"
24 
25 static int rcom_response(struct dlm_ls *ls)
26 {
27 	return test_bit(LSFL_RCOM_READY, &ls->ls_flags);
28 }
29 
30 static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
31 		       struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret)
32 {
33 	struct dlm_rcom *rc;
34 	struct dlm_mhandle *mh;
35 	char *mb;
36 	int mb_len = sizeof(struct dlm_rcom) + len;
37 
38 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
39 	if (!mh) {
40 		log_print("create_rcom to %d type %d len %d ENOBUFS",
41 			  to_nodeid, type, len);
42 		return -ENOBUFS;
43 	}
44 
45 	rc = (struct dlm_rcom *) mb;
46 
47 	rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
48 	rc->rc_header.h_lockspace = ls->ls_global_id;
49 	rc->rc_header.h_nodeid = dlm_our_nodeid();
50 	rc->rc_header.h_length = mb_len;
51 	rc->rc_header.h_cmd = DLM_RCOM;
52 
53 	rc->rc_type = type;
54 
55 	spin_lock(&ls->ls_recover_lock);
56 	rc->rc_seq = ls->ls_recover_seq;
57 	spin_unlock(&ls->ls_recover_lock);
58 
59 	*mh_ret = mh;
60 	*rc_ret = rc;
61 	return 0;
62 }
63 
64 static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh,
65 		      struct dlm_rcom *rc)
66 {
67 	dlm_rcom_out(rc);
68 	dlm_lowcomms_commit_buffer(mh);
69 }
70 
71 static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
72 			    uint32_t flags)
73 {
74 	rs->rs_flags = cpu_to_le32(flags);
75 }
76 
77 /* When replying to a status request, a node also sends back its
78    configuration values.  The requesting node then checks that the remote
79    node is configured the same way as itself. */
80 
81 static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
82 			    uint32_t num_slots)
83 {
84 	rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
85 	rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
86 
87 	rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
88 	rf->rf_num_slots = cpu_to_le16(num_slots);
89 	rf->rf_generation =  cpu_to_le32(ls->ls_generation);
90 }
91 
92 static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
93 {
94 	struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
95 
96 	if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
97 		log_error(ls, "version mismatch: %x nodeid %d: %x",
98 			  DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
99 			  rc->rc_header.h_version);
100 		return -EPROTO;
101 	}
102 
103 	if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
104 	    le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
105 		log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
106 			  ls->ls_lvblen, ls->ls_exflags, nodeid,
107 			  le32_to_cpu(rf->rf_lvblen),
108 			  le32_to_cpu(rf->rf_lsflags));
109 		return -EPROTO;
110 	}
111 	return 0;
112 }
113 
114 static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq)
115 {
116 	spin_lock(&ls->ls_rcom_spin);
117 	*new_seq = ++ls->ls_rcom_seq;
118 	set_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
119 	spin_unlock(&ls->ls_rcom_spin);
120 }
121 
122 static void disallow_sync_reply(struct dlm_ls *ls)
123 {
124 	spin_lock(&ls->ls_rcom_spin);
125 	clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
126 	clear_bit(LSFL_RCOM_READY, &ls->ls_flags);
127 	spin_unlock(&ls->ls_rcom_spin);
128 }
129 
130 /*
131  * low nodeid gathers one slot value at a time from each node.
132  * it sets need_slots=0, and saves rf_our_slot returned from each
133  * rcom_config.
134  *
135  * other nodes gather all slot values at once from the low nodeid.
136  * they set need_slots=1, and ignore the rf_our_slot returned from each
137  * rcom_config.  they use the rf_num_slots returned from the low
138  * node's rcom_config.
139  */
140 
141 int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
142 {
143 	struct dlm_rcom *rc;
144 	struct dlm_mhandle *mh;
145 	int error = 0;
146 
147 	ls->ls_recover_nodeid = nodeid;
148 
149 	if (nodeid == dlm_our_nodeid()) {
150 		rc = ls->ls_recover_buf;
151 		rc->rc_result = dlm_recover_status(ls);
152 		goto out;
153 	}
154 
155 retry:
156 	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
157 			    sizeof(struct rcom_status), &rc, &mh);
158 	if (error)
159 		goto out;
160 
161 	set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
162 
163 	allow_sync_reply(ls, &rc->rc_id);
164 	memset(ls->ls_recover_buf, 0, LOWCOMMS_MAX_TX_BUFFER_LEN);
165 
166 	send_rcom(ls, mh, rc);
167 
168 	error = dlm_wait_function(ls, &rcom_response);
169 	disallow_sync_reply(ls);
170 	if (error == -ETIMEDOUT)
171 		goto retry;
172 	if (error)
173 		goto out;
174 
175 	rc = ls->ls_recover_buf;
176 
177 	if (rc->rc_result == -ESRCH) {
178 		/* we pretend the remote lockspace exists with 0 status */
179 		log_debug(ls, "remote node %d not ready", nodeid);
180 		rc->rc_result = 0;
181 		error = 0;
182 	} else {
183 		error = check_rcom_config(ls, rc, nodeid);
184 	}
185 
186 	/* the caller looks at rc_result for the remote recovery status */
187  out:
188 	return error;
189 }
190 
191 static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
192 {
193 	struct dlm_rcom *rc;
194 	struct dlm_mhandle *mh;
195 	struct rcom_status *rs;
196 	uint32_t status;
197 	int nodeid = rc_in->rc_header.h_nodeid;
198 	int len = sizeof(struct rcom_config);
199 	int num_slots = 0;
200 	int error;
201 
202 	if (!dlm_slots_version(&rc_in->rc_header)) {
203 		status = dlm_recover_status(ls);
204 		goto do_create;
205 	}
206 
207 	rs = (struct rcom_status *)rc_in->rc_buf;
208 
209 	if (!(le32_to_cpu(rs->rs_flags) & DLM_RSF_NEED_SLOTS)) {
210 		status = dlm_recover_status(ls);
211 		goto do_create;
212 	}
213 
214 	spin_lock(&ls->ls_recover_lock);
215 	status = ls->ls_recover_status;
216 	num_slots = ls->ls_num_slots;
217 	spin_unlock(&ls->ls_recover_lock);
218 	len += num_slots * sizeof(struct rcom_slot);
219 
220  do_create:
221 	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
222 			    len, &rc, &mh);
223 	if (error)
224 		return;
225 
226 	rc->rc_id = rc_in->rc_id;
227 	rc->rc_seq_reply = rc_in->rc_seq;
228 	rc->rc_result = status;
229 
230 	set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
231 
232 	if (!num_slots)
233 		goto do_send;
234 
235 	spin_lock(&ls->ls_recover_lock);
236 	if (ls->ls_num_slots != num_slots) {
237 		spin_unlock(&ls->ls_recover_lock);
238 		log_debug(ls, "receive_rcom_status num_slots %d to %d",
239 			  num_slots, ls->ls_num_slots);
240 		rc->rc_result = 0;
241 		set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
242 		goto do_send;
243 	}
244 
245 	dlm_slots_copy_out(ls, rc);
246 	spin_unlock(&ls->ls_recover_lock);
247 
248  do_send:
249 	send_rcom(ls, mh, rc);
250 }
251 
252 static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
253 {
254 	spin_lock(&ls->ls_rcom_spin);
255 	if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) ||
256 	    rc_in->rc_id != ls->ls_rcom_seq) {
257 		log_debug(ls, "reject reply %d from %d seq %llx expect %llx",
258 			  rc_in->rc_type, rc_in->rc_header.h_nodeid,
259 			  (unsigned long long)rc_in->rc_id,
260 			  (unsigned long long)ls->ls_rcom_seq);
261 		goto out;
262 	}
263 	memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length);
264 	set_bit(LSFL_RCOM_READY, &ls->ls_flags);
265 	clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags);
266 	wake_up(&ls->ls_wait_general);
267  out:
268 	spin_unlock(&ls->ls_rcom_spin);
269 }
270 
271 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
272 {
273 	struct dlm_rcom *rc;
274 	struct dlm_mhandle *mh;
275 	int error = 0;
276 
277 	ls->ls_recover_nodeid = nodeid;
278 
279 retry:
280 	error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
281 	if (error)
282 		goto out;
283 	memcpy(rc->rc_buf, last_name, last_len);
284 
285 	allow_sync_reply(ls, &rc->rc_id);
286 	memset(ls->ls_recover_buf, 0, LOWCOMMS_MAX_TX_BUFFER_LEN);
287 
288 	send_rcom(ls, mh, rc);
289 
290 	error = dlm_wait_function(ls, &rcom_response);
291 	disallow_sync_reply(ls);
292 	if (error == -ETIMEDOUT)
293 		goto retry;
294  out:
295 	return error;
296 }
297 
298 static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
299 {
300 	struct dlm_rcom *rc;
301 	struct dlm_mhandle *mh;
302 	int error, inlen, outlen, nodeid;
303 
304 	nodeid = rc_in->rc_header.h_nodeid;
305 	inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
306 	outlen = LOWCOMMS_MAX_TX_BUFFER_LEN - sizeof(struct dlm_rcom);
307 
308 	error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
309 	if (error)
310 		return;
311 	rc->rc_id = rc_in->rc_id;
312 	rc->rc_seq_reply = rc_in->rc_seq;
313 
314 	dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
315 			      nodeid);
316 	send_rcom(ls, mh, rc);
317 }
318 
319 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid)
320 {
321 	struct dlm_rcom *rc;
322 	struct dlm_mhandle *mh;
323 	struct dlm_ls *ls = r->res_ls;
324 	int error;
325 
326 	error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length,
327 			    &rc, &mh);
328 	if (error)
329 		goto out;
330 	memcpy(rc->rc_buf, r->res_name, r->res_length);
331 	rc->rc_id = (unsigned long) r->res_id;
332 
333 	send_rcom(ls, mh, rc);
334  out:
335 	return error;
336 }
337 
338 static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
339 {
340 	struct dlm_rcom *rc;
341 	struct dlm_mhandle *mh;
342 	int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid;
343 	int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
344 
345 	error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh);
346 	if (error)
347 		return;
348 
349 	/* Old code would send this special id to trigger a debug dump. */
350 	if (rc_in->rc_id == 0xFFFFFFFF) {
351 		log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
352 		dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
353 		return;
354 	}
355 
356 	error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
357 				  DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
358 	if (error)
359 		ret_nodeid = error;
360 	rc->rc_result = ret_nodeid;
361 	rc->rc_id = rc_in->rc_id;
362 	rc->rc_seq_reply = rc_in->rc_seq;
363 
364 	send_rcom(ls, mh, rc);
365 }
366 
367 static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
368 {
369 	dlm_recover_master_reply(ls, rc_in);
370 }
371 
372 static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb,
373 			   struct rcom_lock *rl)
374 {
375 	memset(rl, 0, sizeof(*rl));
376 
377 	rl->rl_ownpid = cpu_to_le32(lkb->lkb_ownpid);
378 	rl->rl_lkid = cpu_to_le32(lkb->lkb_id);
379 	rl->rl_exflags = cpu_to_le32(lkb->lkb_exflags);
380 	rl->rl_flags = cpu_to_le32(lkb->lkb_flags);
381 	rl->rl_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
382 	rl->rl_rqmode = lkb->lkb_rqmode;
383 	rl->rl_grmode = lkb->lkb_grmode;
384 	rl->rl_status = lkb->lkb_status;
385 	rl->rl_wait_type = cpu_to_le16(lkb->lkb_wait_type);
386 
387 	if (lkb->lkb_bastfn)
388 		rl->rl_asts |= DLM_CB_BAST;
389 	if (lkb->lkb_astfn)
390 		rl->rl_asts |= DLM_CB_CAST;
391 
392 	rl->rl_namelen = cpu_to_le16(r->res_length);
393 	memcpy(rl->rl_name, r->res_name, r->res_length);
394 
395 	/* FIXME: might we have an lvb without DLM_LKF_VALBLK set ?
396 	   If so, receive_rcom_lock_args() won't take this copy. */
397 
398 	if (lkb->lkb_lvbptr)
399 		memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
400 }
401 
402 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
403 {
404 	struct dlm_ls *ls = r->res_ls;
405 	struct dlm_rcom *rc;
406 	struct dlm_mhandle *mh;
407 	struct rcom_lock *rl;
408 	int error, len = sizeof(struct rcom_lock);
409 
410 	if (lkb->lkb_lvbptr)
411 		len += ls->ls_lvblen;
412 
413 	error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh);
414 	if (error)
415 		goto out;
416 
417 	rl = (struct rcom_lock *) rc->rc_buf;
418 	pack_rcom_lock(r, lkb, rl);
419 	rc->rc_id = (unsigned long) r;
420 
421 	send_rcom(ls, mh, rc);
422  out:
423 	return error;
424 }
425 
426 /* needs at least dlm_rcom + rcom_lock */
427 static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
428 {
429 	struct dlm_rcom *rc;
430 	struct dlm_mhandle *mh;
431 	int error, nodeid = rc_in->rc_header.h_nodeid;
432 
433 	dlm_recover_master_copy(ls, rc_in);
434 
435 	error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY,
436 			    sizeof(struct rcom_lock), &rc, &mh);
437 	if (error)
438 		return;
439 
440 	/* We send back the same rcom_lock struct we received, but
441 	   dlm_recover_master_copy() has filled in rl_remid and rl_result */
442 
443 	memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
444 	rc->rc_id = rc_in->rc_id;
445 	rc->rc_seq_reply = rc_in->rc_seq;
446 
447 	send_rcom(ls, mh, rc);
448 }
449 
450 /* If the lockspace doesn't exist then still send a status message
451    back; it's possible that it just doesn't have its global_id yet. */
452 
453 int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
454 {
455 	struct dlm_rcom *rc;
456 	struct rcom_config *rf;
457 	struct dlm_mhandle *mh;
458 	char *mb;
459 	int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
460 
461 	mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb);
462 	if (!mh)
463 		return -ENOBUFS;
464 
465 	rc = (struct dlm_rcom *) mb;
466 
467 	rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
468 	rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace;
469 	rc->rc_header.h_nodeid = dlm_our_nodeid();
470 	rc->rc_header.h_length = mb_len;
471 	rc->rc_header.h_cmd = DLM_RCOM;
472 
473 	rc->rc_type = DLM_RCOM_STATUS_REPLY;
474 	rc->rc_id = rc_in->rc_id;
475 	rc->rc_seq_reply = rc_in->rc_seq;
476 	rc->rc_result = -ESRCH;
477 
478 	rf = (struct rcom_config *) rc->rc_buf;
479 	rf->rf_lvblen = cpu_to_le32(~0U);
480 
481 	dlm_rcom_out(rc);
482 	dlm_lowcomms_commit_buffer(mh);
483 
484 	return 0;
485 }
486 
487 /*
488  * Ignore messages for stage Y before we set
489  * recover_status bit for stage X:
490  *
491  * recover_status = 0
492  *
493  * dlm_recover_members()
494  * - send nothing
495  * - recv nothing
496  * - ignore NAMES, NAMES_REPLY
497  * - ignore LOOKUP, LOOKUP_REPLY
498  * - ignore LOCK, LOCK_REPLY
499  *
500  * recover_status |= NODES
501  *
502  * dlm_recover_members_wait()
503  *
504  * dlm_recover_directory()
505  * - send NAMES
506  * - recv NAMES_REPLY
507  * - ignore LOOKUP, LOOKUP_REPLY
508  * - ignore LOCK, LOCK_REPLY
509  *
510  * recover_status |= DIR
511  *
512  * dlm_recover_directory_wait()
513  *
514  * dlm_recover_masters()
515  * - send LOOKUP
516  * - recv LOOKUP_REPLY
517  *
518  * dlm_recover_locks()
519  * - send LOCKS
520  * - recv LOCKS_REPLY
521  *
522  * recover_status |= LOCKS
523  *
524  * dlm_recover_locks_wait()
525  *
526  * recover_status |= DONE
527  */
528 
529 /* Called by dlm_recv; corresponds to dlm_receive_message() but special
530    recovery-only comms are sent through here. */
531 
532 void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
533 {
534 	int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
535 	int stop, reply = 0, names = 0, lookup = 0, lock = 0;
536 	uint32_t status;
537 	uint64_t seq;
538 
539 	switch (rc->rc_type) {
540 	case DLM_RCOM_STATUS_REPLY:
541 		reply = 1;
542 		break;
543 	case DLM_RCOM_NAMES:
544 		names = 1;
545 		break;
546 	case DLM_RCOM_NAMES_REPLY:
547 		names = 1;
548 		reply = 1;
549 		break;
550 	case DLM_RCOM_LOOKUP:
551 		lookup = 1;
552 		break;
553 	case DLM_RCOM_LOOKUP_REPLY:
554 		lookup = 1;
555 		reply = 1;
556 		break;
557 	case DLM_RCOM_LOCK:
558 		lock = 1;
559 		break;
560 	case DLM_RCOM_LOCK_REPLY:
561 		lock = 1;
562 		reply = 1;
563 		break;
564 	}
565 
566 	spin_lock(&ls->ls_recover_lock);
567 	status = ls->ls_recover_status;
568 	stop = test_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
569 	seq = ls->ls_recover_seq;
570 	spin_unlock(&ls->ls_recover_lock);
571 
572 	if (stop && (rc->rc_type != DLM_RCOM_STATUS))
573 		goto ignore;
574 
575 	if (reply && (rc->rc_seq_reply != seq))
576 		goto ignore;
577 
578 	if (!(status & DLM_RS_NODES) && (names || lookup || lock))
579 		goto ignore;
580 
581 	if (!(status & DLM_RS_DIR) && (lookup || lock))
582 		goto ignore;
583 
584 	switch (rc->rc_type) {
585 	case DLM_RCOM_STATUS:
586 		receive_rcom_status(ls, rc);
587 		break;
588 
589 	case DLM_RCOM_NAMES:
590 		receive_rcom_names(ls, rc);
591 		break;
592 
593 	case DLM_RCOM_LOOKUP:
594 		receive_rcom_lookup(ls, rc);
595 		break;
596 
597 	case DLM_RCOM_LOCK:
598 		if (rc->rc_header.h_length < lock_size)
599 			goto Eshort;
600 		receive_rcom_lock(ls, rc);
601 		break;
602 
603 	case DLM_RCOM_STATUS_REPLY:
604 		receive_sync_reply(ls, rc);
605 		break;
606 
607 	case DLM_RCOM_NAMES_REPLY:
608 		receive_sync_reply(ls, rc);
609 		break;
610 
611 	case DLM_RCOM_LOOKUP_REPLY:
612 		receive_rcom_lookup_reply(ls, rc);
613 		break;
614 
615 	case DLM_RCOM_LOCK_REPLY:
616 		if (rc->rc_header.h_length < lock_size)
617 			goto Eshort;
618 		dlm_recover_process_copy(ls, rc);
619 		break;
620 
621 	default:
622 		log_error(ls, "receive_rcom bad type %d", rc->rc_type);
623 	}
624 	return;
625 
626 ignore:
627 	log_limit(ls, "dlm_receive_rcom ignore msg %d "
628 		  "from %d %llu %llu recover seq %llu sts %x gen %u",
629 		   rc->rc_type,
630 		   nodeid,
631 		   (unsigned long long)rc->rc_seq,
632 		   (unsigned long long)rc->rc_seq_reply,
633 		   (unsigned long long)seq,
634 		   status, ls->ls_generation);
635 	return;
636 Eshort:
637 	log_error(ls, "recovery message %d from %d is too short",
638 		  rc->rc_type, nodeid);
639 }
640 
641