xref: /openbmc/linux/fs/dlm/recover.c (revision b6bec26c)
1  /******************************************************************************
2  *******************************************************************************
3  **
4  **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5  **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6  **
7  **  This copyrighted material is made available to anyone wishing to use,
8  **  modify, copy, or redistribute it subject to the terms and conditions
9  **  of the GNU General Public License v.2.
10  **
11  *******************************************************************************
12  ******************************************************************************/
13  
14  #include "dlm_internal.h"
15  #include "lockspace.h"
16  #include "dir.h"
17  #include "config.h"
18  #include "ast.h"
19  #include "memory.h"
20  #include "rcom.h"
21  #include "lock.h"
22  #include "lowcomms.h"
23  #include "member.h"
24  #include "recover.h"
25  
26  
27  /*
28   * Recovery waiting routines: these functions wait for a particular reply from
29   * a remote node, or for the remote node to report a certain status.  They need
30   * to abort if the lockspace is stopped indicating a node has failed (perhaps
31   * the one being waited for).
32   */
33  
34  /*
35   * Wait until given function returns non-zero or lockspace is stopped
36   * (LS_RECOVERY_STOP set due to failure of a node in ls_nodes).  When another
37   * function thinks it could have completed the waited-on task, they should wake
38   * up ls_wait_general to get an immediate response rather than waiting for the
39   * timeout.  This uses a timeout so it can check periodically if the wait
40   * should abort due to node failure (which doesn't cause a wake_up).
41   * This should only be called by the dlm_recoverd thread.
42   */
43  
44  int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
45  {
46  	int error = 0;
47  	int rv;
48  
49  	while (1) {
50  		rv = wait_event_timeout(ls->ls_wait_general,
51  					testfn(ls) || dlm_recovery_stopped(ls),
52  					dlm_config.ci_recover_timer * HZ);
53  		if (rv)
54  			break;
55  	}
56  
57  	if (dlm_recovery_stopped(ls)) {
58  		log_debug(ls, "dlm_wait_function aborted");
59  		error = -EINTR;
60  	}
61  	return error;
62  }
63  
64  /*
65   * An efficient way for all nodes to wait for all others to have a certain
66   * status.  The node with the lowest nodeid polls all the others for their
67   * status (wait_status_all) and all the others poll the node with the low id
68   * for its accumulated result (wait_status_low).  When all nodes have set
69   * status flag X, then status flag X_ALL will be set on the low nodeid.
70   */
71  
72  uint32_t dlm_recover_status(struct dlm_ls *ls)
73  {
74  	uint32_t status;
75  	spin_lock(&ls->ls_recover_lock);
76  	status = ls->ls_recover_status;
77  	spin_unlock(&ls->ls_recover_lock);
78  	return status;
79  }
80  
81  static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
82  {
83  	ls->ls_recover_status |= status;
84  }
85  
86  void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
87  {
88  	spin_lock(&ls->ls_recover_lock);
89  	_set_recover_status(ls, status);
90  	spin_unlock(&ls->ls_recover_lock);
91  }
92  
93  static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
94  			   int save_slots)
95  {
96  	struct dlm_rcom *rc = ls->ls_recover_buf;
97  	struct dlm_member *memb;
98  	int error = 0, delay;
99  
100  	list_for_each_entry(memb, &ls->ls_nodes, list) {
101  		delay = 0;
102  		for (;;) {
103  			if (dlm_recovery_stopped(ls)) {
104  				error = -EINTR;
105  				goto out;
106  			}
107  
108  			error = dlm_rcom_status(ls, memb->nodeid, 0);
109  			if (error)
110  				goto out;
111  
112  			if (save_slots)
113  				dlm_slot_save(ls, rc, memb);
114  
115  			if (rc->rc_result & wait_status)
116  				break;
117  			if (delay < 1000)
118  				delay += 20;
119  			msleep(delay);
120  		}
121  	}
122   out:
123  	return error;
124  }
125  
126  static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
127  			   uint32_t status_flags)
128  {
129  	struct dlm_rcom *rc = ls->ls_recover_buf;
130  	int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
131  
132  	for (;;) {
133  		if (dlm_recovery_stopped(ls)) {
134  			error = -EINTR;
135  			goto out;
136  		}
137  
138  		error = dlm_rcom_status(ls, nodeid, status_flags);
139  		if (error)
140  			break;
141  
142  		if (rc->rc_result & wait_status)
143  			break;
144  		if (delay < 1000)
145  			delay += 20;
146  		msleep(delay);
147  	}
148   out:
149  	return error;
150  }
151  
152  static int wait_status(struct dlm_ls *ls, uint32_t status)
153  {
154  	uint32_t status_all = status << 1;
155  	int error;
156  
157  	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
158  		error = wait_status_all(ls, status, 0);
159  		if (!error)
160  			dlm_set_recover_status(ls, status_all);
161  	} else
162  		error = wait_status_low(ls, status_all, 0);
163  
164  	return error;
165  }
166  
167  int dlm_recover_members_wait(struct dlm_ls *ls)
168  {
169  	struct dlm_member *memb;
170  	struct dlm_slot *slots;
171  	int num_slots, slots_size;
172  	int error, rv;
173  	uint32_t gen;
174  
175  	list_for_each_entry(memb, &ls->ls_nodes, list) {
176  		memb->slot = -1;
177  		memb->generation = 0;
178  	}
179  
180  	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
181  		error = wait_status_all(ls, DLM_RS_NODES, 1);
182  		if (error)
183  			goto out;
184  
185  		/* slots array is sparse, slots_size may be > num_slots */
186  
187  		rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
188  		if (!rv) {
189  			spin_lock(&ls->ls_recover_lock);
190  			_set_recover_status(ls, DLM_RS_NODES_ALL);
191  			ls->ls_num_slots = num_slots;
192  			ls->ls_slots_size = slots_size;
193  			ls->ls_slots = slots;
194  			ls->ls_generation = gen;
195  			spin_unlock(&ls->ls_recover_lock);
196  		} else {
197  			dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
198  		}
199  	} else {
200  		error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
201  		if (error)
202  			goto out;
203  
204  		dlm_slots_copy_in(ls);
205  	}
206   out:
207  	return error;
208  }
209  
210  int dlm_recover_directory_wait(struct dlm_ls *ls)
211  {
212  	return wait_status(ls, DLM_RS_DIR);
213  }
214  
215  int dlm_recover_locks_wait(struct dlm_ls *ls)
216  {
217  	return wait_status(ls, DLM_RS_LOCKS);
218  }
219  
220  int dlm_recover_done_wait(struct dlm_ls *ls)
221  {
222  	return wait_status(ls, DLM_RS_DONE);
223  }
224  
225  /*
226   * The recover_list contains all the rsb's for which we've requested the new
227   * master nodeid.  As replies are returned from the resource directories the
228   * rsb's are removed from the list.  When the list is empty we're done.
229   *
230   * The recover_list is later similarly used for all rsb's for which we've sent
231   * new lkb's and need to receive new corresponding lkid's.
232   *
233   * We use the address of the rsb struct as a simple local identifier for the
234   * rsb so we can match an rcom reply with the rsb it was sent for.
235   */
236  
237  static int recover_list_empty(struct dlm_ls *ls)
238  {
239  	int empty;
240  
241  	spin_lock(&ls->ls_recover_list_lock);
242  	empty = list_empty(&ls->ls_recover_list);
243  	spin_unlock(&ls->ls_recover_list_lock);
244  
245  	return empty;
246  }
247  
248  static void recover_list_add(struct dlm_rsb *r)
249  {
250  	struct dlm_ls *ls = r->res_ls;
251  
252  	spin_lock(&ls->ls_recover_list_lock);
253  	if (list_empty(&r->res_recover_list)) {
254  		list_add_tail(&r->res_recover_list, &ls->ls_recover_list);
255  		ls->ls_recover_list_count++;
256  		dlm_hold_rsb(r);
257  	}
258  	spin_unlock(&ls->ls_recover_list_lock);
259  }
260  
261  static void recover_list_del(struct dlm_rsb *r)
262  {
263  	struct dlm_ls *ls = r->res_ls;
264  
265  	spin_lock(&ls->ls_recover_list_lock);
266  	list_del_init(&r->res_recover_list);
267  	ls->ls_recover_list_count--;
268  	spin_unlock(&ls->ls_recover_list_lock);
269  
270  	dlm_put_rsb(r);
271  }
272  
273  static void recover_list_clear(struct dlm_ls *ls)
274  {
275  	struct dlm_rsb *r, *s;
276  
277  	spin_lock(&ls->ls_recover_list_lock);
278  	list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
279  		list_del_init(&r->res_recover_list);
280  		r->res_recover_locks_count = 0;
281  		dlm_put_rsb(r);
282  		ls->ls_recover_list_count--;
283  	}
284  
285  	if (ls->ls_recover_list_count != 0) {
286  		log_error(ls, "warning: recover_list_count %d",
287  			  ls->ls_recover_list_count);
288  		ls->ls_recover_list_count = 0;
289  	}
290  	spin_unlock(&ls->ls_recover_list_lock);
291  }
292  
293  static int recover_idr_empty(struct dlm_ls *ls)
294  {
295  	int empty = 1;
296  
297  	spin_lock(&ls->ls_recover_idr_lock);
298  	if (ls->ls_recover_list_count)
299  		empty = 0;
300  	spin_unlock(&ls->ls_recover_idr_lock);
301  
302  	return empty;
303  }
304  
305  static int recover_idr_add(struct dlm_rsb *r)
306  {
307  	struct dlm_ls *ls = r->res_ls;
308  	int rv, id;
309  
310  	rv = idr_pre_get(&ls->ls_recover_idr, GFP_NOFS);
311  	if (!rv)
312  		return -ENOMEM;
313  
314  	spin_lock(&ls->ls_recover_idr_lock);
315  	if (r->res_id) {
316  		spin_unlock(&ls->ls_recover_idr_lock);
317  		return -1;
318  	}
319  	rv = idr_get_new_above(&ls->ls_recover_idr, r, 1, &id);
320  	if (rv) {
321  		spin_unlock(&ls->ls_recover_idr_lock);
322  		return rv;
323  	}
324  	r->res_id = id;
325  	ls->ls_recover_list_count++;
326  	dlm_hold_rsb(r);
327  	spin_unlock(&ls->ls_recover_idr_lock);
328  	return 0;
329  }
330  
331  static void recover_idr_del(struct dlm_rsb *r)
332  {
333  	struct dlm_ls *ls = r->res_ls;
334  
335  	spin_lock(&ls->ls_recover_idr_lock);
336  	idr_remove(&ls->ls_recover_idr, r->res_id);
337  	r->res_id = 0;
338  	ls->ls_recover_list_count--;
339  	spin_unlock(&ls->ls_recover_idr_lock);
340  
341  	dlm_put_rsb(r);
342  }
343  
344  static struct dlm_rsb *recover_idr_find(struct dlm_ls *ls, uint64_t id)
345  {
346  	struct dlm_rsb *r;
347  
348  	spin_lock(&ls->ls_recover_idr_lock);
349  	r = idr_find(&ls->ls_recover_idr, (int)id);
350  	spin_unlock(&ls->ls_recover_idr_lock);
351  	return r;
352  }
353  
354  static int recover_idr_clear_rsb(int id, void *p, void *data)
355  {
356  	struct dlm_ls *ls = data;
357  	struct dlm_rsb *r = p;
358  
359  	r->res_id = 0;
360  	r->res_recover_locks_count = 0;
361  	ls->ls_recover_list_count--;
362  
363  	dlm_put_rsb(r);
364  	return 0;
365  }
366  
367  static void recover_idr_clear(struct dlm_ls *ls)
368  {
369  	spin_lock(&ls->ls_recover_idr_lock);
370  	idr_for_each(&ls->ls_recover_idr, recover_idr_clear_rsb, ls);
371  	idr_remove_all(&ls->ls_recover_idr);
372  
373  	if (ls->ls_recover_list_count != 0) {
374  		log_error(ls, "warning: recover_list_count %d",
375  			  ls->ls_recover_list_count);
376  		ls->ls_recover_list_count = 0;
377  	}
378  	spin_unlock(&ls->ls_recover_idr_lock);
379  }
380  
381  
382  /* Master recovery: find new master node for rsb's that were
383     mastered on nodes that have been removed.
384  
385     dlm_recover_masters
386     recover_master
387     dlm_send_rcom_lookup            ->  receive_rcom_lookup
388                                         dlm_dir_lookup
389     receive_rcom_lookup_reply       <-
390     dlm_recover_master_reply
391     set_new_master
392     set_master_lkbs
393     set_lock_master
394  */
395  
396  /*
397   * Set the lock master for all LKBs in a lock queue
398   * If we are the new master of the rsb, we may have received new
399   * MSTCPY locks from other nodes already which we need to ignore
400   * when setting the new nodeid.
401   */
402  
403  static void set_lock_master(struct list_head *queue, int nodeid)
404  {
405  	struct dlm_lkb *lkb;
406  
407  	list_for_each_entry(lkb, queue, lkb_statequeue) {
408  		if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
409  			lkb->lkb_nodeid = nodeid;
410  			lkb->lkb_remid = 0;
411  		}
412  	}
413  }
414  
415  static void set_master_lkbs(struct dlm_rsb *r)
416  {
417  	set_lock_master(&r->res_grantqueue, r->res_nodeid);
418  	set_lock_master(&r->res_convertqueue, r->res_nodeid);
419  	set_lock_master(&r->res_waitqueue, r->res_nodeid);
420  }
421  
422  /*
423   * Propagate the new master nodeid to locks
424   * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
425   * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
426   * rsb's to consider.
427   */
428  
429  static void set_new_master(struct dlm_rsb *r)
430  {
431  	set_master_lkbs(r);
432  	rsb_set_flag(r, RSB_NEW_MASTER);
433  	rsb_set_flag(r, RSB_NEW_MASTER2);
434  }
435  
436  /*
437   * We do async lookups on rsb's that need new masters.  The rsb's
438   * waiting for a lookup reply are kept on the recover_list.
439   *
440   * Another node recovering the master may have sent us a rcom lookup,
441   * and our dlm_master_lookup() set it as the new master, along with
442   * NEW_MASTER so that we'll recover it here (this implies dir_nodeid
443   * equals our_nodeid below).
444   */
445  
446  static int recover_master(struct dlm_rsb *r, unsigned int *count)
447  {
448  	struct dlm_ls *ls = r->res_ls;
449  	int our_nodeid, dir_nodeid;
450  	int is_removed = 0;
451  	int error;
452  
453  	if (is_master(r))
454  		return 0;
455  
456  	is_removed = dlm_is_removed(ls, r->res_nodeid);
457  
458  	if (!is_removed && !rsb_flag(r, RSB_NEW_MASTER))
459  		return 0;
460  
461  	our_nodeid = dlm_our_nodeid();
462  	dir_nodeid = dlm_dir_nodeid(r);
463  
464  	if (dir_nodeid == our_nodeid) {
465  		if (is_removed) {
466  			r->res_master_nodeid = our_nodeid;
467  			r->res_nodeid = 0;
468  		}
469  
470  		/* set master of lkbs to ourself when is_removed, or to
471  		   another new master which we set along with NEW_MASTER
472  		   in dlm_master_lookup */
473  		set_new_master(r);
474  		error = 0;
475  	} else {
476  		recover_idr_add(r);
477  		error = dlm_send_rcom_lookup(r, dir_nodeid);
478  	}
479  
480  	(*count)++;
481  	return error;
482  }
483  
484  /*
485   * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
486   * This is necessary because recovery can be started, aborted and restarted,
487   * causing the master nodeid to briefly change during the aborted recovery, and
488   * change back to the original value in the second recovery.  The MSTCPY locks
489   * may or may not have been purged during the aborted recovery.  Another node
490   * with an outstanding request in waiters list and a request reply saved in the
491   * requestqueue, cannot know whether it should ignore the reply and resend the
492   * request, or accept the reply and complete the request.  It must do the
493   * former if the remote node purged MSTCPY locks, and it must do the later if
494   * the remote node did not.  This is solved by always purging MSTCPY locks, in
495   * which case, the request reply would always be ignored and the request
496   * resent.
497   */
498  
499  static int recover_master_static(struct dlm_rsb *r, unsigned int *count)
500  {
501  	int dir_nodeid = dlm_dir_nodeid(r);
502  	int new_master = dir_nodeid;
503  
504  	if (dir_nodeid == dlm_our_nodeid())
505  		new_master = 0;
506  
507  	dlm_purge_mstcpy_locks(r);
508  	r->res_master_nodeid = dir_nodeid;
509  	r->res_nodeid = new_master;
510  	set_new_master(r);
511  	(*count)++;
512  	return 0;
513  }
514  
515  /*
516   * Go through local root resources and for each rsb which has a master which
517   * has departed, get the new master nodeid from the directory.  The dir will
518   * assign mastery to the first node to look up the new master.  That means
519   * we'll discover in this lookup if we're the new master of any rsb's.
520   *
521   * We fire off all the dir lookup requests individually and asynchronously to
522   * the correct dir node.
523   */
524  
525  int dlm_recover_masters(struct dlm_ls *ls)
526  {
527  	struct dlm_rsb *r;
528  	unsigned int total = 0;
529  	unsigned int count = 0;
530  	int nodir = dlm_no_directory(ls);
531  	int error;
532  
533  	log_debug(ls, "dlm_recover_masters");
534  
535  	down_read(&ls->ls_root_sem);
536  	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
537  		if (dlm_recovery_stopped(ls)) {
538  			up_read(&ls->ls_root_sem);
539  			error = -EINTR;
540  			goto out;
541  		}
542  
543  		lock_rsb(r);
544  		if (nodir)
545  			error = recover_master_static(r, &count);
546  		else
547  			error = recover_master(r, &count);
548  		unlock_rsb(r);
549  		cond_resched();
550  		total++;
551  
552  		if (error) {
553  			up_read(&ls->ls_root_sem);
554  			goto out;
555  		}
556  	}
557  	up_read(&ls->ls_root_sem);
558  
559  	log_debug(ls, "dlm_recover_masters %u of %u", count, total);
560  
561  	error = dlm_wait_function(ls, &recover_idr_empty);
562   out:
563  	if (error)
564  		recover_idr_clear(ls);
565  	return error;
566  }
567  
568  int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
569  {
570  	struct dlm_rsb *r;
571  	int ret_nodeid, new_master;
572  
573  	r = recover_idr_find(ls, rc->rc_id);
574  	if (!r) {
575  		log_error(ls, "dlm_recover_master_reply no id %llx",
576  			  (unsigned long long)rc->rc_id);
577  		goto out;
578  	}
579  
580  	ret_nodeid = rc->rc_result;
581  
582  	if (ret_nodeid == dlm_our_nodeid())
583  		new_master = 0;
584  	else
585  		new_master = ret_nodeid;
586  
587  	lock_rsb(r);
588  	r->res_master_nodeid = ret_nodeid;
589  	r->res_nodeid = new_master;
590  	set_new_master(r);
591  	unlock_rsb(r);
592  	recover_idr_del(r);
593  
594  	if (recover_idr_empty(ls))
595  		wake_up(&ls->ls_wait_general);
596   out:
597  	return 0;
598  }
599  
600  
601  /* Lock recovery: rebuild the process-copy locks we hold on a
602     remastered rsb on the new rsb master.
603  
604     dlm_recover_locks
605     recover_locks
606     recover_locks_queue
607     dlm_send_rcom_lock              ->  receive_rcom_lock
608                                         dlm_recover_master_copy
609     receive_rcom_lock_reply         <-
610     dlm_recover_process_copy
611  */
612  
613  
614  /*
615   * keep a count of the number of lkb's we send to the new master; when we get
616   * an equal number of replies then recovery for the rsb is done
617   */
618  
619  static int recover_locks_queue(struct dlm_rsb *r, struct list_head *head)
620  {
621  	struct dlm_lkb *lkb;
622  	int error = 0;
623  
624  	list_for_each_entry(lkb, head, lkb_statequeue) {
625  	   	error = dlm_send_rcom_lock(r, lkb);
626  		if (error)
627  			break;
628  		r->res_recover_locks_count++;
629  	}
630  
631  	return error;
632  }
633  
634  static int recover_locks(struct dlm_rsb *r)
635  {
636  	int error = 0;
637  
638  	lock_rsb(r);
639  
640  	DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
641  
642  	error = recover_locks_queue(r, &r->res_grantqueue);
643  	if (error)
644  		goto out;
645  	error = recover_locks_queue(r, &r->res_convertqueue);
646  	if (error)
647  		goto out;
648  	error = recover_locks_queue(r, &r->res_waitqueue);
649  	if (error)
650  		goto out;
651  
652  	if (r->res_recover_locks_count)
653  		recover_list_add(r);
654  	else
655  		rsb_clear_flag(r, RSB_NEW_MASTER);
656   out:
657  	unlock_rsb(r);
658  	return error;
659  }
660  
661  int dlm_recover_locks(struct dlm_ls *ls)
662  {
663  	struct dlm_rsb *r;
664  	int error, count = 0;
665  
666  	down_read(&ls->ls_root_sem);
667  	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
668  		if (is_master(r)) {
669  			rsb_clear_flag(r, RSB_NEW_MASTER);
670  			continue;
671  		}
672  
673  		if (!rsb_flag(r, RSB_NEW_MASTER))
674  			continue;
675  
676  		if (dlm_recovery_stopped(ls)) {
677  			error = -EINTR;
678  			up_read(&ls->ls_root_sem);
679  			goto out;
680  		}
681  
682  		error = recover_locks(r);
683  		if (error) {
684  			up_read(&ls->ls_root_sem);
685  			goto out;
686  		}
687  
688  		count += r->res_recover_locks_count;
689  	}
690  	up_read(&ls->ls_root_sem);
691  
692  	log_debug(ls, "dlm_recover_locks %d out", count);
693  
694  	error = dlm_wait_function(ls, &recover_list_empty);
695   out:
696  	if (error)
697  		recover_list_clear(ls);
698  	return error;
699  }
700  
701  void dlm_recovered_lock(struct dlm_rsb *r)
702  {
703  	DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_dump_rsb(r););
704  
705  	r->res_recover_locks_count--;
706  	if (!r->res_recover_locks_count) {
707  		rsb_clear_flag(r, RSB_NEW_MASTER);
708  		recover_list_del(r);
709  	}
710  
711  	if (recover_list_empty(r->res_ls))
712  		wake_up(&r->res_ls->ls_wait_general);
713  }
714  
715  /*
716   * The lvb needs to be recovered on all master rsb's.  This includes setting
717   * the VALNOTVALID flag if necessary, and determining the correct lvb contents
718   * based on the lvb's of the locks held on the rsb.
719   *
720   * RSB_VALNOTVALID is set in two cases:
721   *
722   * 1. we are master, but not new, and we purged an EX/PW lock held by a
723   * failed node (in dlm_recover_purge which set RSB_RECOVER_LVB_INVAL)
724   *
725   * 2. we are a new master, and there are only NL/CR locks left.
726   * (We could probably improve this by only invaliding in this way when
727   * the previous master left uncleanly.  VMS docs mention that.)
728   *
729   * The LVB contents are only considered for changing when this is a new master
730   * of the rsb (NEW_MASTER2).  Then, the rsb's lvb is taken from any lkb with
731   * mode > CR.  If no lkb's exist with mode above CR, the lvb contents are taken
732   * from the lkb with the largest lvb sequence number.
733   */
734  
735  static void recover_lvb(struct dlm_rsb *r)
736  {
737  	struct dlm_lkb *lkb, *high_lkb = NULL;
738  	uint32_t high_seq = 0;
739  	int lock_lvb_exists = 0;
740  	int big_lock_exists = 0;
741  	int lvblen = r->res_ls->ls_lvblen;
742  
743  	if (!rsb_flag(r, RSB_NEW_MASTER2) &&
744  	    rsb_flag(r, RSB_RECOVER_LVB_INVAL)) {
745  		/* case 1 above */
746  		rsb_set_flag(r, RSB_VALNOTVALID);
747  		return;
748  	}
749  
750  	if (!rsb_flag(r, RSB_NEW_MASTER2))
751  		return;
752  
753  	/* we are the new master, so figure out if VALNOTVALID should
754  	   be set, and set the rsb lvb from the best lkb available. */
755  
756  	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
757  		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
758  			continue;
759  
760  		lock_lvb_exists = 1;
761  
762  		if (lkb->lkb_grmode > DLM_LOCK_CR) {
763  			big_lock_exists = 1;
764  			goto setflag;
765  		}
766  
767  		if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
768  			high_lkb = lkb;
769  			high_seq = lkb->lkb_lvbseq;
770  		}
771  	}
772  
773  	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
774  		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
775  			continue;
776  
777  		lock_lvb_exists = 1;
778  
779  		if (lkb->lkb_grmode > DLM_LOCK_CR) {
780  			big_lock_exists = 1;
781  			goto setflag;
782  		}
783  
784  		if (((int)lkb->lkb_lvbseq - (int)high_seq) >= 0) {
785  			high_lkb = lkb;
786  			high_seq = lkb->lkb_lvbseq;
787  		}
788  	}
789  
790   setflag:
791  	if (!lock_lvb_exists)
792  		goto out;
793  
794  	/* lvb is invalidated if only NL/CR locks remain */
795  	if (!big_lock_exists)
796  		rsb_set_flag(r, RSB_VALNOTVALID);
797  
798  	if (!r->res_lvbptr) {
799  		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
800  		if (!r->res_lvbptr)
801  			goto out;
802  	}
803  
804  	if (big_lock_exists) {
805  		r->res_lvbseq = lkb->lkb_lvbseq;
806  		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, lvblen);
807  	} else if (high_lkb) {
808  		r->res_lvbseq = high_lkb->lkb_lvbseq;
809  		memcpy(r->res_lvbptr, high_lkb->lkb_lvbptr, lvblen);
810  	} else {
811  		r->res_lvbseq = 0;
812  		memset(r->res_lvbptr, 0, lvblen);
813  	}
814   out:
815  	return;
816  }
817  
818  /* All master rsb's flagged RECOVER_CONVERT need to be looked at.  The locks
819     converting PR->CW or CW->PR need to have their lkb_grmode set. */
820  
821  static void recover_conversion(struct dlm_rsb *r)
822  {
823  	struct dlm_ls *ls = r->res_ls;
824  	struct dlm_lkb *lkb;
825  	int grmode = -1;
826  
827  	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue) {
828  		if (lkb->lkb_grmode == DLM_LOCK_PR ||
829  		    lkb->lkb_grmode == DLM_LOCK_CW) {
830  			grmode = lkb->lkb_grmode;
831  			break;
832  		}
833  	}
834  
835  	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue) {
836  		if (lkb->lkb_grmode != DLM_LOCK_IV)
837  			continue;
838  		if (grmode == -1) {
839  			log_debug(ls, "recover_conversion %x set gr to rq %d",
840  				  lkb->lkb_id, lkb->lkb_rqmode);
841  			lkb->lkb_grmode = lkb->lkb_rqmode;
842  		} else {
843  			log_debug(ls, "recover_conversion %x set gr %d",
844  				  lkb->lkb_id, grmode);
845  			lkb->lkb_grmode = grmode;
846  		}
847  	}
848  }
849  
850  /* We've become the new master for this rsb and waiting/converting locks may
851     need to be granted in dlm_recover_grant() due to locks that may have
852     existed from a removed node. */
853  
854  static void recover_grant(struct dlm_rsb *r)
855  {
856  	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
857  		rsb_set_flag(r, RSB_RECOVER_GRANT);
858  }
859  
860  void dlm_recover_rsbs(struct dlm_ls *ls)
861  {
862  	struct dlm_rsb *r;
863  	unsigned int count = 0;
864  
865  	down_read(&ls->ls_root_sem);
866  	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
867  		lock_rsb(r);
868  		if (is_master(r)) {
869  			if (rsb_flag(r, RSB_RECOVER_CONVERT))
870  				recover_conversion(r);
871  
872  			/* recover lvb before granting locks so the updated
873  			   lvb/VALNOTVALID is presented in the completion */
874  			recover_lvb(r);
875  
876  			if (rsb_flag(r, RSB_NEW_MASTER2))
877  				recover_grant(r);
878  			count++;
879  		} else {
880  			rsb_clear_flag(r, RSB_VALNOTVALID);
881  		}
882  		rsb_clear_flag(r, RSB_RECOVER_CONVERT);
883  		rsb_clear_flag(r, RSB_RECOVER_LVB_INVAL);
884  		rsb_clear_flag(r, RSB_NEW_MASTER2);
885  		unlock_rsb(r);
886  	}
887  	up_read(&ls->ls_root_sem);
888  
889  	if (count)
890  		log_debug(ls, "dlm_recover_rsbs %d done", count);
891  }
892  
893  /* Create a single list of all root rsb's to be used during recovery */
894  
895  int dlm_create_root_list(struct dlm_ls *ls)
896  {
897  	struct rb_node *n;
898  	struct dlm_rsb *r;
899  	int i, error = 0;
900  
901  	down_write(&ls->ls_root_sem);
902  	if (!list_empty(&ls->ls_root_list)) {
903  		log_error(ls, "root list not empty");
904  		error = -EINVAL;
905  		goto out;
906  	}
907  
908  	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
909  		spin_lock(&ls->ls_rsbtbl[i].lock);
910  		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
911  			r = rb_entry(n, struct dlm_rsb, res_hashnode);
912  			list_add(&r->res_root_list, &ls->ls_root_list);
913  			dlm_hold_rsb(r);
914  		}
915  
916  		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
917  			log_error(ls, "dlm_create_root_list toss not empty");
918  		spin_unlock(&ls->ls_rsbtbl[i].lock);
919  	}
920   out:
921  	up_write(&ls->ls_root_sem);
922  	return error;
923  }
924  
925  void dlm_release_root_list(struct dlm_ls *ls)
926  {
927  	struct dlm_rsb *r, *safe;
928  
929  	down_write(&ls->ls_root_sem);
930  	list_for_each_entry_safe(r, safe, &ls->ls_root_list, res_root_list) {
931  		list_del_init(&r->res_root_list);
932  		dlm_put_rsb(r);
933  	}
934  	up_write(&ls->ls_root_sem);
935  }
936  
937  void dlm_clear_toss(struct dlm_ls *ls)
938  {
939  	struct rb_node *n, *next;
940  	struct dlm_rsb *r;
941  	unsigned int count = 0;
942  	int i;
943  
944  	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
945  		spin_lock(&ls->ls_rsbtbl[i].lock);
946  		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
947  			next = rb_next(n);
948  			r = rb_entry(n, struct dlm_rsb, res_hashnode);
949  			rb_erase(n, &ls->ls_rsbtbl[i].toss);
950  			dlm_free_rsb(r);
951  			count++;
952  		}
953  		spin_unlock(&ls->ls_rsbtbl[i].lock);
954  	}
955  
956  	if (count)
957  		log_debug(ls, "dlm_clear_toss %u done", count);
958  }
959  
960