xref: /openbmc/linux/fs/gfs2/lock_dlm.c (revision b181f7029bd71238ac2754ce7052dffd69432085)
1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
4   * Copyright 2004-2011 Red Hat, Inc.
5   */
6  
7  #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
8  
9  #include <linux/fs.h>
10  #include <linux/dlm.h>
11  #include <linux/slab.h>
12  #include <linux/types.h>
13  #include <linux/delay.h>
14  #include <linux/gfs2_ondisk.h>
15  #include <linux/sched/signal.h>
16  
17  #include "incore.h"
18  #include "glock.h"
19  #include "glops.h"
20  #include "recovery.h"
21  #include "util.h"
22  #include "sys.h"
23  #include "trace_gfs2.h"
24  
25  /**
26   * gfs2_update_stats - Update time based stats
27   * @s: The stats to update (local or global)
28   * @index: The index inside @s
29   * @sample: New data to include
30   */
gfs2_update_stats(struct gfs2_lkstats * s,unsigned index,s64 sample)31  static inline void gfs2_update_stats(struct gfs2_lkstats *s, unsigned index,
32  				     s64 sample)
33  {
34  	/*
35  	 * @delta is the difference between the current rtt sample and the
36  	 * running average srtt. We add 1/8 of that to the srtt in order to
37  	 * update the current srtt estimate. The variance estimate is a bit
38  	 * more complicated. We subtract the current variance estimate from
39  	 * the abs value of the @delta and add 1/4 of that to the running
40  	 * total.  That's equivalent to 3/4 of the current variance
41  	 * estimate plus 1/4 of the abs of @delta.
42  	 *
43  	 * Note that the index points at the array entry containing the
44  	 * smoothed mean value, and the variance is always in the following
45  	 * entry
46  	 *
47  	 * Reference: TCP/IP Illustrated, vol 2, p. 831,832
48  	 * All times are in units of integer nanoseconds. Unlike the TCP/IP
49  	 * case, they are not scaled fixed point.
50  	 */
51  
52  	s64 delta = sample - s->stats[index];
53  	s->stats[index] += (delta >> 3);
54  	index++;
55  	s->stats[index] += (s64)(abs(delta) - s->stats[index]) >> 2;
56  }
57  
58  /**
59   * gfs2_update_reply_times - Update locking statistics
60   * @gl: The glock to update
61   *
62   * This assumes that gl->gl_dstamp has been set earlier.
63   *
64   * The rtt (lock round trip time) is an estimate of the time
65   * taken to perform a dlm lock request. We update it on each
66   * reply from the dlm.
67   *
68   * The blocking flag is set on the glock for all dlm requests
69   * which may potentially block due to lock requests from other nodes.
70   * DLM requests where the current lock state is exclusive, the
71   * requested state is null (or unlocked) or where the TRY or
72   * TRY_1CB flags are set are classified as non-blocking. All
73   * other DLM requests are counted as (potentially) blocking.
74   */
gfs2_update_reply_times(struct gfs2_glock * gl)75  static inline void gfs2_update_reply_times(struct gfs2_glock *gl)
76  {
77  	struct gfs2_pcpu_lkstats *lks;
78  	const unsigned gltype = gl->gl_name.ln_type;
79  	unsigned index = test_bit(GLF_BLOCKING, &gl->gl_flags) ?
80  			 GFS2_LKS_SRTTB : GFS2_LKS_SRTT;
81  	s64 rtt;
82  
83  	preempt_disable();
84  	rtt = ktime_to_ns(ktime_sub(ktime_get_real(), gl->gl_dstamp));
85  	lks = this_cpu_ptr(gl->gl_name.ln_sbd->sd_lkstats);
86  	gfs2_update_stats(&gl->gl_stats, index, rtt);		/* Local */
87  	gfs2_update_stats(&lks->lkstats[gltype], index, rtt);	/* Global */
88  	preempt_enable();
89  
90  	trace_gfs2_glock_lock_time(gl, rtt);
91  }
92  
93  /**
94   * gfs2_update_request_times - Update locking statistics
95   * @gl: The glock to update
96   *
97   * The irt (lock inter-request times) measures the average time
98   * between requests to the dlm. It is updated immediately before
99   * each dlm call.
100   */
101  
gfs2_update_request_times(struct gfs2_glock * gl)102  static inline void gfs2_update_request_times(struct gfs2_glock *gl)
103  {
104  	struct gfs2_pcpu_lkstats *lks;
105  	const unsigned gltype = gl->gl_name.ln_type;
106  	ktime_t dstamp;
107  	s64 irt;
108  
109  	preempt_disable();
110  	dstamp = gl->gl_dstamp;
111  	gl->gl_dstamp = ktime_get_real();
112  	irt = ktime_to_ns(ktime_sub(gl->gl_dstamp, dstamp));
113  	lks = this_cpu_ptr(gl->gl_name.ln_sbd->sd_lkstats);
114  	gfs2_update_stats(&gl->gl_stats, GFS2_LKS_SIRT, irt);		/* Local */
115  	gfs2_update_stats(&lks->lkstats[gltype], GFS2_LKS_SIRT, irt);	/* Global */
116  	preempt_enable();
117  }
118  
gdlm_ast(void * arg)119  static void gdlm_ast(void *arg)
120  {
121  	struct gfs2_glock *gl = arg;
122  	unsigned ret = gl->gl_state;
123  
124  	/* If the glock is dead, we only react to a dlm_unlock() reply. */
125  	if (__lockref_is_dead(&gl->gl_lockref) &&
126  	    gl->gl_lksb.sb_status != -DLM_EUNLOCK)
127  		return;
128  
129  	gfs2_update_reply_times(gl);
130  	BUG_ON(gl->gl_lksb.sb_flags & DLM_SBF_DEMOTED);
131  
132  	if ((gl->gl_lksb.sb_flags & DLM_SBF_VALNOTVALID) && gl->gl_lksb.sb_lvbptr)
133  		memset(gl->gl_lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);
134  
135  	switch (gl->gl_lksb.sb_status) {
136  	case -DLM_EUNLOCK: /* Unlocked, so glock can be freed */
137  		if (gl->gl_ops->go_free)
138  			gl->gl_ops->go_free(gl);
139  		gfs2_glock_free(gl);
140  		return;
141  	case -DLM_ECANCEL: /* Cancel while getting lock */
142  		ret |= LM_OUT_CANCELED;
143  		goto out;
144  	case -EAGAIN: /* Try lock fails */
145  	case -EDEADLK: /* Deadlock detected */
146  		goto out;
147  	case -ETIMEDOUT: /* Canceled due to timeout */
148  		ret |= LM_OUT_ERROR;
149  		goto out;
150  	case 0: /* Success */
151  		break;
152  	default: /* Something unexpected */
153  		BUG();
154  	}
155  
156  	ret = gl->gl_req;
157  	if (gl->gl_lksb.sb_flags & DLM_SBF_ALTMODE) {
158  		if (gl->gl_req == LM_ST_SHARED)
159  			ret = LM_ST_DEFERRED;
160  		else if (gl->gl_req == LM_ST_DEFERRED)
161  			ret = LM_ST_SHARED;
162  		else
163  			BUG();
164  	}
165  
166  	set_bit(GLF_INITIAL, &gl->gl_flags);
167  	gfs2_glock_complete(gl, ret);
168  	return;
169  out:
170  	if (!test_bit(GLF_INITIAL, &gl->gl_flags))
171  		gl->gl_lksb.sb_lkid = 0;
172  	gfs2_glock_complete(gl, ret);
173  }
174  
gdlm_bast(void * arg,int mode)175  static void gdlm_bast(void *arg, int mode)
176  {
177  	struct gfs2_glock *gl = arg;
178  
179  	if (__lockref_is_dead(&gl->gl_lockref))
180  		return;
181  
182  	switch (mode) {
183  	case DLM_LOCK_EX:
184  		gfs2_glock_cb(gl, LM_ST_UNLOCKED);
185  		break;
186  	case DLM_LOCK_CW:
187  		gfs2_glock_cb(gl, LM_ST_DEFERRED);
188  		break;
189  	case DLM_LOCK_PR:
190  		gfs2_glock_cb(gl, LM_ST_SHARED);
191  		break;
192  	default:
193  		fs_err(gl->gl_name.ln_sbd, "unknown bast mode %d\n", mode);
194  		BUG();
195  	}
196  }
197  
198  /* convert gfs lock-state to dlm lock-mode */
199  
make_mode(struct gfs2_sbd * sdp,const unsigned int lmstate)200  static int make_mode(struct gfs2_sbd *sdp, const unsigned int lmstate)
201  {
202  	switch (lmstate) {
203  	case LM_ST_UNLOCKED:
204  		return DLM_LOCK_NL;
205  	case LM_ST_EXCLUSIVE:
206  		return DLM_LOCK_EX;
207  	case LM_ST_DEFERRED:
208  		return DLM_LOCK_CW;
209  	case LM_ST_SHARED:
210  		return DLM_LOCK_PR;
211  	}
212  	fs_err(sdp, "unknown LM state %d\n", lmstate);
213  	BUG();
214  	return -1;
215  }
216  
make_flags(struct gfs2_glock * gl,const unsigned int gfs_flags,const int req)217  static u32 make_flags(struct gfs2_glock *gl, const unsigned int gfs_flags,
218  		      const int req)
219  {
220  	u32 lkf = 0;
221  
222  	if (gl->gl_lksb.sb_lvbptr)
223  		lkf |= DLM_LKF_VALBLK;
224  
225  	if (gfs_flags & LM_FLAG_TRY)
226  		lkf |= DLM_LKF_NOQUEUE;
227  
228  	if (gfs_flags & LM_FLAG_TRY_1CB) {
229  		lkf |= DLM_LKF_NOQUEUE;
230  		lkf |= DLM_LKF_NOQUEUEBAST;
231  	}
232  
233  	if (gfs_flags & LM_FLAG_ANY) {
234  		if (req == DLM_LOCK_PR)
235  			lkf |= DLM_LKF_ALTCW;
236  		else if (req == DLM_LOCK_CW)
237  			lkf |= DLM_LKF_ALTPR;
238  		else
239  			BUG();
240  	}
241  
242  	if (gl->gl_lksb.sb_lkid != 0) {
243  		lkf |= DLM_LKF_CONVERT;
244  		if (test_bit(GLF_BLOCKING, &gl->gl_flags))
245  			lkf |= DLM_LKF_QUECVT;
246  	}
247  
248  	return lkf;
249  }
250  
gfs2_reverse_hex(char * c,u64 value)251  static void gfs2_reverse_hex(char *c, u64 value)
252  {
253  	*c = '0';
254  	while (value) {
255  		*c-- = hex_asc[value & 0x0f];
256  		value >>= 4;
257  	}
258  }
259  
gdlm_lock(struct gfs2_glock * gl,unsigned int req_state,unsigned int flags)260  static int gdlm_lock(struct gfs2_glock *gl, unsigned int req_state,
261  		     unsigned int flags)
262  {
263  	struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct;
264  	int req;
265  	u32 lkf;
266  	char strname[GDLM_STRNAME_BYTES] = "";
267  	int error;
268  
269  	req = make_mode(gl->gl_name.ln_sbd, req_state);
270  	lkf = make_flags(gl, flags, req);
271  	gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
272  	gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
273  	if (gl->gl_lksb.sb_lkid) {
274  		gfs2_update_request_times(gl);
275  	} else {
276  		memset(strname, ' ', GDLM_STRNAME_BYTES - 1);
277  		strname[GDLM_STRNAME_BYTES - 1] = '\0';
278  		gfs2_reverse_hex(strname + 7, gl->gl_name.ln_type);
279  		gfs2_reverse_hex(strname + 23, gl->gl_name.ln_number);
280  		gl->gl_dstamp = ktime_get_real();
281  	}
282  	/*
283  	 * Submit the actual lock request.
284  	 */
285  
286  again:
287  	error = dlm_lock(ls->ls_dlm, req, &gl->gl_lksb, lkf, strname,
288  			GDLM_STRNAME_BYTES - 1, 0, gdlm_ast, gl, gdlm_bast);
289  	if (error == -EBUSY) {
290  		msleep(20);
291  		goto again;
292  	}
293  	return error;
294  }
295  
gdlm_put_lock(struct gfs2_glock * gl)296  static void gdlm_put_lock(struct gfs2_glock *gl)
297  {
298  	struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
299  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
300  	int error;
301  
302  	BUG_ON(!__lockref_is_dead(&gl->gl_lockref));
303  
304  	if (gl->gl_lksb.sb_lkid == 0) {
305  		gfs2_glock_free(gl);
306  		return;
307  	}
308  
309  	clear_bit(GLF_BLOCKING, &gl->gl_flags);
310  	gfs2_glstats_inc(gl, GFS2_LKS_DCOUNT);
311  	gfs2_sbstats_inc(gl, GFS2_LKS_DCOUNT);
312  	gfs2_update_request_times(gl);
313  
314  	/* don't want to call dlm if we've unmounted the lock protocol */
315  	if (test_bit(DFL_UNMOUNT, &ls->ls_recover_flags)) {
316  		gfs2_glock_free(gl);
317  		return;
318  	}
319  	/* don't want to skip dlm_unlock writing the lvb when lock has one */
320  
321  	if (test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags) &&
322  	    !gl->gl_lksb.sb_lvbptr) {
323  		gfs2_glock_free_later(gl);
324  		return;
325  	}
326  
327  again:
328  	error = dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_VALBLK,
329  			   NULL, gl);
330  	if (error == -EBUSY) {
331  		msleep(20);
332  		goto again;
333  	}
334  
335  	if (error) {
336  		fs_err(sdp, "gdlm_unlock %x,%llx err=%d\n",
337  		       gl->gl_name.ln_type,
338  		       (unsigned long long)gl->gl_name.ln_number, error);
339  	}
340  }
341  
gdlm_cancel(struct gfs2_glock * gl)342  static void gdlm_cancel(struct gfs2_glock *gl)
343  {
344  	struct lm_lockstruct *ls = &gl->gl_name.ln_sbd->sd_lockstruct;
345  	dlm_unlock(ls->ls_dlm, gl->gl_lksb.sb_lkid, DLM_LKF_CANCEL, NULL, gl);
346  }
347  
348  /*
349   * dlm/gfs2 recovery coordination using dlm_recover callbacks
350   *
351   *  0. gfs2 checks for another cluster node withdraw, needing journal replay
352   *  1. dlm_controld sees lockspace members change
353   *  2. dlm_controld blocks dlm-kernel locking activity
354   *  3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep)
355   *  4. dlm_controld starts and finishes its own user level recovery
356   *  5. dlm_controld starts dlm-kernel dlm_recoverd to do kernel recovery
357   *  6. dlm_recoverd notifies gfs2 of failed nodes (recover_slot)
358   *  7. dlm_recoverd does its own lock recovery
359   *  8. dlm_recoverd unblocks dlm-kernel locking activity
360   *  9. dlm_recoverd notifies gfs2 when done (recover_done with new generation)
361   * 10. gfs2_control updates control_lock lvb with new generation and jid bits
362   * 11. gfs2_control enqueues journals for gfs2_recover to recover (maybe none)
363   * 12. gfs2_recover dequeues and recovers journals of failed nodes
364   * 13. gfs2_recover provides recovery results to gfs2_control (recovery_result)
365   * 14. gfs2_control updates control_lock lvb jid bits for recovered journals
366   * 15. gfs2_control unblocks normal locking when all journals are recovered
367   *
368   * - failures during recovery
369   *
370   * recover_prep() may set BLOCK_LOCKS (step 3) again before gfs2_control
371   * clears BLOCK_LOCKS (step 15), e.g. another node fails while still
372   * recovering for a prior failure.  gfs2_control needs a way to detect
373   * this so it can leave BLOCK_LOCKS set in step 15.  This is managed using
374   * the recover_block and recover_start values.
375   *
376   * recover_done() provides a new lockspace generation number each time it
377   * is called (step 9).  This generation number is saved as recover_start.
378   * When recover_prep() is called, it sets BLOCK_LOCKS and sets
379   * recover_block = recover_start.  So, while recover_block is equal to
380   * recover_start, BLOCK_LOCKS should remain set.  (recover_spin must
381   * be held around the BLOCK_LOCKS/recover_block/recover_start logic.)
382   *
383   * - more specific gfs2 steps in sequence above
384   *
385   *  3. recover_prep sets BLOCK_LOCKS and sets recover_block = recover_start
386   *  6. recover_slot records any failed jids (maybe none)
387   *  9. recover_done sets recover_start = new generation number
388   * 10. gfs2_control sets control_lock lvb = new gen + bits for failed jids
389   * 12. gfs2_recover does journal recoveries for failed jids identified above
390   * 14. gfs2_control clears control_lock lvb bits for recovered jids
391   * 15. gfs2_control checks if recover_block == recover_start (step 3 occured
392   *     again) then do nothing, otherwise if recover_start > recover_block
393   *     then clear BLOCK_LOCKS.
394   *
395   * - parallel recovery steps across all nodes
396   *
397   * All nodes attempt to update the control_lock lvb with the new generation
398   * number and jid bits, but only the first to get the control_lock EX will
399   * do so; others will see that it's already done (lvb already contains new
400   * generation number.)
401   *
402   * . All nodes get the same recover_prep/recover_slot/recover_done callbacks
403   * . All nodes attempt to set control_lock lvb gen + bits for the new gen
404   * . One node gets control_lock first and writes the lvb, others see it's done
405   * . All nodes attempt to recover jids for which they see control_lock bits set
406   * . One node succeeds for a jid, and that one clears the jid bit in the lvb
407   * . All nodes will eventually see all lvb bits clear and unblock locks
408   *
409   * - is there a problem with clearing an lvb bit that should be set
410   *   and missing a journal recovery?
411   *
412   * 1. jid fails
413   * 2. lvb bit set for step 1
414   * 3. jid recovered for step 1
415   * 4. jid taken again (new mount)
416   * 5. jid fails (for step 4)
417   * 6. lvb bit set for step 5 (will already be set)
418   * 7. lvb bit cleared for step 3
419   *
420   * This is not a problem because the failure in step 5 does not
421   * require recovery, because the mount in step 4 could not have
422   * progressed far enough to unblock locks and access the fs.  The
423   * control_mount() function waits for all recoveries to be complete
424   * for the latest lockspace generation before ever unblocking locks
425   * and returning.  The mount in step 4 waits until the recovery in
426   * step 1 is done.
427   *
428   * - special case of first mounter: first node to mount the fs
429   *
430   * The first node to mount a gfs2 fs needs to check all the journals
431   * and recover any that need recovery before other nodes are allowed
432   * to mount the fs.  (Others may begin mounting, but they must wait
433   * for the first mounter to be done before taking locks on the fs
434   * or accessing the fs.)  This has two parts:
435   *
436   * 1. The mounted_lock tells a node it's the first to mount the fs.
437   * Each node holds the mounted_lock in PR while it's mounted.
438   * Each node tries to acquire the mounted_lock in EX when it mounts.
439   * If a node is granted the mounted_lock EX it means there are no
440   * other mounted nodes (no PR locks exist), and it is the first mounter.
441   * The mounted_lock is demoted to PR when first recovery is done, so
442   * others will fail to get an EX lock, but will get a PR lock.
443   *
444   * 2. The control_lock blocks others in control_mount() while the first
445   * mounter is doing first mount recovery of all journals.
446   * A mounting node needs to acquire control_lock in EX mode before
447   * it can proceed.  The first mounter holds control_lock in EX while doing
448   * the first mount recovery, blocking mounts from other nodes, then demotes
449   * control_lock to NL when it's done (others_may_mount/first_done),
450   * allowing other nodes to continue mounting.
451   *
452   * first mounter:
453   * control_lock EX/NOQUEUE success
454   * mounted_lock EX/NOQUEUE success (no other PR, so no other mounters)
455   * set first=1
456   * do first mounter recovery
457   * mounted_lock EX->PR
458   * control_lock EX->NL, write lvb generation
459   *
460   * other mounter:
461   * control_lock EX/NOQUEUE success (if fail -EAGAIN, retry)
462   * mounted_lock EX/NOQUEUE fail -EAGAIN (expected due to other mounters PR)
463   * mounted_lock PR/NOQUEUE success
464   * read lvb generation
465   * control_lock EX->NL
466   * set first=0
467   *
468   * - mount during recovery
469   *
470   * If a node mounts while others are doing recovery (not first mounter),
471   * the mounting node will get its initial recover_done() callback without
472   * having seen any previous failures/callbacks.
473   *
474   * It must wait for all recoveries preceding its mount to be finished
475   * before it unblocks locks.  It does this by repeating the "other mounter"
476   * steps above until the lvb generation number is >= its mount generation
477   * number (from initial recover_done) and all lvb bits are clear.
478   *
479   * - control_lock lvb format
480   *
481   * 4 bytes generation number: the latest dlm lockspace generation number
482   * from recover_done callback.  Indicates the jid bitmap has been updated
483   * to reflect all slot failures through that generation.
484   * 4 bytes unused.
485   * GDLM_LVB_SIZE-8 bytes of jid bit map. If bit N is set, it indicates
486   * that jid N needs recovery.
487   */
488  
489  #define JID_BITMAP_OFFSET 8 /* 4 byte generation number + 4 byte unused */
490  
control_lvb_read(struct lm_lockstruct * ls,uint32_t * lvb_gen,char * lvb_bits)491  static void control_lvb_read(struct lm_lockstruct *ls, uint32_t *lvb_gen,
492  			     char *lvb_bits)
493  {
494  	__le32 gen;
495  	memcpy(lvb_bits, ls->ls_control_lvb, GDLM_LVB_SIZE);
496  	memcpy(&gen, lvb_bits, sizeof(__le32));
497  	*lvb_gen = le32_to_cpu(gen);
498  }
499  
control_lvb_write(struct lm_lockstruct * ls,uint32_t lvb_gen,char * lvb_bits)500  static void control_lvb_write(struct lm_lockstruct *ls, uint32_t lvb_gen,
501  			      char *lvb_bits)
502  {
503  	__le32 gen;
504  	memcpy(ls->ls_control_lvb, lvb_bits, GDLM_LVB_SIZE);
505  	gen = cpu_to_le32(lvb_gen);
506  	memcpy(ls->ls_control_lvb, &gen, sizeof(__le32));
507  }
508  
all_jid_bits_clear(char * lvb)509  static int all_jid_bits_clear(char *lvb)
510  {
511  	return !memchr_inv(lvb + JID_BITMAP_OFFSET, 0,
512  			GDLM_LVB_SIZE - JID_BITMAP_OFFSET);
513  }
514  
sync_wait_cb(void * arg)515  static void sync_wait_cb(void *arg)
516  {
517  	struct lm_lockstruct *ls = arg;
518  	complete(&ls->ls_sync_wait);
519  }
520  
sync_unlock(struct gfs2_sbd * sdp,struct dlm_lksb * lksb,char * name)521  static int sync_unlock(struct gfs2_sbd *sdp, struct dlm_lksb *lksb, char *name)
522  {
523  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
524  	int error;
525  
526  	error = dlm_unlock(ls->ls_dlm, lksb->sb_lkid, 0, lksb, ls);
527  	if (error) {
528  		fs_err(sdp, "%s lkid %x error %d\n",
529  		       name, lksb->sb_lkid, error);
530  		return error;
531  	}
532  
533  	wait_for_completion(&ls->ls_sync_wait);
534  
535  	if (lksb->sb_status != -DLM_EUNLOCK) {
536  		fs_err(sdp, "%s lkid %x status %d\n",
537  		       name, lksb->sb_lkid, lksb->sb_status);
538  		return -1;
539  	}
540  	return 0;
541  }
542  
sync_lock(struct gfs2_sbd * sdp,int mode,uint32_t flags,unsigned int num,struct dlm_lksb * lksb,char * name)543  static int sync_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags,
544  		     unsigned int num, struct dlm_lksb *lksb, char *name)
545  {
546  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
547  	char strname[GDLM_STRNAME_BYTES];
548  	int error, status;
549  
550  	memset(strname, 0, GDLM_STRNAME_BYTES);
551  	snprintf(strname, GDLM_STRNAME_BYTES, "%8x%16x", LM_TYPE_NONDISK, num);
552  
553  	error = dlm_lock(ls->ls_dlm, mode, lksb, flags,
554  			 strname, GDLM_STRNAME_BYTES - 1,
555  			 0, sync_wait_cb, ls, NULL);
556  	if (error) {
557  		fs_err(sdp, "%s lkid %x flags %x mode %d error %d\n",
558  		       name, lksb->sb_lkid, flags, mode, error);
559  		return error;
560  	}
561  
562  	wait_for_completion(&ls->ls_sync_wait);
563  
564  	status = lksb->sb_status;
565  
566  	if (status && status != -EAGAIN) {
567  		fs_err(sdp, "%s lkid %x flags %x mode %d status %d\n",
568  		       name, lksb->sb_lkid, flags, mode, status);
569  	}
570  
571  	return status;
572  }
573  
mounted_unlock(struct gfs2_sbd * sdp)574  static int mounted_unlock(struct gfs2_sbd *sdp)
575  {
576  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
577  	return sync_unlock(sdp, &ls->ls_mounted_lksb, "mounted_lock");
578  }
579  
mounted_lock(struct gfs2_sbd * sdp,int mode,uint32_t flags)580  static int mounted_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
581  {
582  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
583  	return sync_lock(sdp, mode, flags, GFS2_MOUNTED_LOCK,
584  			 &ls->ls_mounted_lksb, "mounted_lock");
585  }
586  
control_unlock(struct gfs2_sbd * sdp)587  static int control_unlock(struct gfs2_sbd *sdp)
588  {
589  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
590  	return sync_unlock(sdp, &ls->ls_control_lksb, "control_lock");
591  }
592  
control_lock(struct gfs2_sbd * sdp,int mode,uint32_t flags)593  static int control_lock(struct gfs2_sbd *sdp, int mode, uint32_t flags)
594  {
595  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
596  	return sync_lock(sdp, mode, flags, GFS2_CONTROL_LOCK,
597  			 &ls->ls_control_lksb, "control_lock");
598  }
599  
600  /**
601   * remote_withdraw - react to a node withdrawing from the file system
602   * @sdp: The superblock
603   */
remote_withdraw(struct gfs2_sbd * sdp)604  static void remote_withdraw(struct gfs2_sbd *sdp)
605  {
606  	struct gfs2_jdesc *jd;
607  	int ret = 0, count = 0;
608  
609  	list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
610  		if (jd->jd_jid == sdp->sd_lockstruct.ls_jid)
611  			continue;
612  		ret = gfs2_recover_journal(jd, true);
613  		if (ret)
614  			break;
615  		count++;
616  	}
617  
618  	/* Now drop the additional reference we acquired */
619  	fs_err(sdp, "Journals checked: %d, ret = %d.\n", count, ret);
620  }
621  
gfs2_control_func(struct work_struct * work)622  static void gfs2_control_func(struct work_struct *work)
623  {
624  	struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, sd_control_work.work);
625  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
626  	uint32_t block_gen, start_gen, lvb_gen, flags;
627  	int recover_set = 0;
628  	int write_lvb = 0;
629  	int recover_size;
630  	int i, error;
631  
632  	/* First check for other nodes that may have done a withdraw. */
633  	if (test_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags)) {
634  		remote_withdraw(sdp);
635  		clear_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags);
636  		return;
637  	}
638  
639  	spin_lock(&ls->ls_recover_spin);
640  	/*
641  	 * No MOUNT_DONE means we're still mounting; control_mount()
642  	 * will set this flag, after which this thread will take over
643  	 * all further clearing of BLOCK_LOCKS.
644  	 *
645  	 * FIRST_MOUNT means this node is doing first mounter recovery,
646  	 * for which recovery control is handled by
647  	 * control_mount()/control_first_done(), not this thread.
648  	 */
649  	if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
650  	     test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
651  		spin_unlock(&ls->ls_recover_spin);
652  		return;
653  	}
654  	block_gen = ls->ls_recover_block;
655  	start_gen = ls->ls_recover_start;
656  	spin_unlock(&ls->ls_recover_spin);
657  
658  	/*
659  	 * Equal block_gen and start_gen implies we are between
660  	 * recover_prep and recover_done callbacks, which means
661  	 * dlm recovery is in progress and dlm locking is blocked.
662  	 * There's no point trying to do any work until recover_done.
663  	 */
664  
665  	if (block_gen == start_gen)
666  		return;
667  
668  	/*
669  	 * Propagate recover_submit[] and recover_result[] to lvb:
670  	 * dlm_recoverd adds to recover_submit[] jids needing recovery
671  	 * gfs2_recover adds to recover_result[] journal recovery results
672  	 *
673  	 * set lvb bit for jids in recover_submit[] if the lvb has not
674  	 * yet been updated for the generation of the failure
675  	 *
676  	 * clear lvb bit for jids in recover_result[] if the result of
677  	 * the journal recovery is SUCCESS
678  	 */
679  
680  	error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
681  	if (error) {
682  		fs_err(sdp, "control lock EX error %d\n", error);
683  		return;
684  	}
685  
686  	control_lvb_read(ls, &lvb_gen, ls->ls_lvb_bits);
687  
688  	spin_lock(&ls->ls_recover_spin);
689  	if (block_gen != ls->ls_recover_block ||
690  	    start_gen != ls->ls_recover_start) {
691  		fs_info(sdp, "recover generation %u block1 %u %u\n",
692  			start_gen, block_gen, ls->ls_recover_block);
693  		spin_unlock(&ls->ls_recover_spin);
694  		control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
695  		return;
696  	}
697  
698  	recover_size = ls->ls_recover_size;
699  
700  	if (lvb_gen <= start_gen) {
701  		/*
702  		 * Clear lvb bits for jids we've successfully recovered.
703  		 * Because all nodes attempt to recover failed journals,
704  		 * a journal can be recovered multiple times successfully
705  		 * in succession.  Only the first will really do recovery,
706  		 * the others find it clean, but still report a successful
707  		 * recovery.  So, another node may have already recovered
708  		 * the jid and cleared the lvb bit for it.
709  		 */
710  		for (i = 0; i < recover_size; i++) {
711  			if (ls->ls_recover_result[i] != LM_RD_SUCCESS)
712  				continue;
713  
714  			ls->ls_recover_result[i] = 0;
715  
716  			if (!test_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET))
717  				continue;
718  
719  			__clear_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET);
720  			write_lvb = 1;
721  		}
722  	}
723  
724  	if (lvb_gen == start_gen) {
725  		/*
726  		 * Failed slots before start_gen are already set in lvb.
727  		 */
728  		for (i = 0; i < recover_size; i++) {
729  			if (!ls->ls_recover_submit[i])
730  				continue;
731  			if (ls->ls_recover_submit[i] < lvb_gen)
732  				ls->ls_recover_submit[i] = 0;
733  		}
734  	} else if (lvb_gen < start_gen) {
735  		/*
736  		 * Failed slots before start_gen are not yet set in lvb.
737  		 */
738  		for (i = 0; i < recover_size; i++) {
739  			if (!ls->ls_recover_submit[i])
740  				continue;
741  			if (ls->ls_recover_submit[i] < start_gen) {
742  				ls->ls_recover_submit[i] = 0;
743  				__set_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET);
744  			}
745  		}
746  		/* even if there are no bits to set, we need to write the
747  		   latest generation to the lvb */
748  		write_lvb = 1;
749  	} else {
750  		/*
751  		 * we should be getting a recover_done() for lvb_gen soon
752  		 */
753  	}
754  	spin_unlock(&ls->ls_recover_spin);
755  
756  	if (write_lvb) {
757  		control_lvb_write(ls, start_gen, ls->ls_lvb_bits);
758  		flags = DLM_LKF_CONVERT | DLM_LKF_VALBLK;
759  	} else {
760  		flags = DLM_LKF_CONVERT;
761  	}
762  
763  	error = control_lock(sdp, DLM_LOCK_NL, flags);
764  	if (error) {
765  		fs_err(sdp, "control lock NL error %d\n", error);
766  		return;
767  	}
768  
769  	/*
770  	 * Everyone will see jid bits set in the lvb, run gfs2_recover_set(),
771  	 * and clear a jid bit in the lvb if the recovery is a success.
772  	 * Eventually all journals will be recovered, all jid bits will
773  	 * be cleared in the lvb, and everyone will clear BLOCK_LOCKS.
774  	 */
775  
776  	for (i = 0; i < recover_size; i++) {
777  		if (test_bit_le(i, ls->ls_lvb_bits + JID_BITMAP_OFFSET)) {
778  			fs_info(sdp, "recover generation %u jid %d\n",
779  				start_gen, i);
780  			gfs2_recover_set(sdp, i);
781  			recover_set++;
782  		}
783  	}
784  	if (recover_set)
785  		return;
786  
787  	/*
788  	 * No more jid bits set in lvb, all recovery is done, unblock locks
789  	 * (unless a new recover_prep callback has occured blocking locks
790  	 * again while working above)
791  	 */
792  
793  	spin_lock(&ls->ls_recover_spin);
794  	if (ls->ls_recover_block == block_gen &&
795  	    ls->ls_recover_start == start_gen) {
796  		clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
797  		spin_unlock(&ls->ls_recover_spin);
798  		fs_info(sdp, "recover generation %u done\n", start_gen);
799  		gfs2_glock_thaw(sdp);
800  	} else {
801  		fs_info(sdp, "recover generation %u block2 %u %u\n",
802  			start_gen, block_gen, ls->ls_recover_block);
803  		spin_unlock(&ls->ls_recover_spin);
804  	}
805  }
806  
control_mount(struct gfs2_sbd * sdp)807  static int control_mount(struct gfs2_sbd *sdp)
808  {
809  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
810  	uint32_t start_gen, block_gen, mount_gen, lvb_gen;
811  	int mounted_mode;
812  	int retries = 0;
813  	int error;
814  
815  	memset(&ls->ls_mounted_lksb, 0, sizeof(struct dlm_lksb));
816  	memset(&ls->ls_control_lksb, 0, sizeof(struct dlm_lksb));
817  	memset(&ls->ls_control_lvb, 0, GDLM_LVB_SIZE);
818  	ls->ls_control_lksb.sb_lvbptr = ls->ls_control_lvb;
819  	init_completion(&ls->ls_sync_wait);
820  
821  	set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
822  
823  	error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_VALBLK);
824  	if (error) {
825  		fs_err(sdp, "control_mount control_lock NL error %d\n", error);
826  		return error;
827  	}
828  
829  	error = mounted_lock(sdp, DLM_LOCK_NL, 0);
830  	if (error) {
831  		fs_err(sdp, "control_mount mounted_lock NL error %d\n", error);
832  		control_unlock(sdp);
833  		return error;
834  	}
835  	mounted_mode = DLM_LOCK_NL;
836  
837  restart:
838  	if (retries++ && signal_pending(current)) {
839  		error = -EINTR;
840  		goto fail;
841  	}
842  
843  	/*
844  	 * We always start with both locks in NL. control_lock is
845  	 * demoted to NL below so we don't need to do it here.
846  	 */
847  
848  	if (mounted_mode != DLM_LOCK_NL) {
849  		error = mounted_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
850  		if (error)
851  			goto fail;
852  		mounted_mode = DLM_LOCK_NL;
853  	}
854  
855  	/*
856  	 * Other nodes need to do some work in dlm recovery and gfs2_control
857  	 * before the recover_done and control_lock will be ready for us below.
858  	 * A delay here is not required but often avoids having to retry.
859  	 */
860  
861  	msleep_interruptible(500);
862  
863  	/*
864  	 * Acquire control_lock in EX and mounted_lock in either EX or PR.
865  	 * control_lock lvb keeps track of any pending journal recoveries.
866  	 * mounted_lock indicates if any other nodes have the fs mounted.
867  	 */
868  
869  	error = control_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE|DLM_LKF_VALBLK);
870  	if (error == -EAGAIN) {
871  		goto restart;
872  	} else if (error) {
873  		fs_err(sdp, "control_mount control_lock EX error %d\n", error);
874  		goto fail;
875  	}
876  
877  	/**
878  	 * If we're a spectator, we don't want to take the lock in EX because
879  	 * we cannot do the first-mount responsibility it implies: recovery.
880  	 */
881  	if (sdp->sd_args.ar_spectator)
882  		goto locks_done;
883  
884  	error = mounted_lock(sdp, DLM_LOCK_EX, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
885  	if (!error) {
886  		mounted_mode = DLM_LOCK_EX;
887  		goto locks_done;
888  	} else if (error != -EAGAIN) {
889  		fs_err(sdp, "control_mount mounted_lock EX error %d\n", error);
890  		goto fail;
891  	}
892  
893  	error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_NOQUEUE);
894  	if (!error) {
895  		mounted_mode = DLM_LOCK_PR;
896  		goto locks_done;
897  	} else {
898  		/* not even -EAGAIN should happen here */
899  		fs_err(sdp, "control_mount mounted_lock PR error %d\n", error);
900  		goto fail;
901  	}
902  
903  locks_done:
904  	/*
905  	 * If we got both locks above in EX, then we're the first mounter.
906  	 * If not, then we need to wait for the control_lock lvb to be
907  	 * updated by other mounted nodes to reflect our mount generation.
908  	 *
909  	 * In simple first mounter cases, first mounter will see zero lvb_gen,
910  	 * but in cases where all existing nodes leave/fail before mounting
911  	 * nodes finish control_mount, then all nodes will be mounting and
912  	 * lvb_gen will be non-zero.
913  	 */
914  
915  	control_lvb_read(ls, &lvb_gen, ls->ls_lvb_bits);
916  
917  	if (lvb_gen == 0xFFFFFFFF) {
918  		/* special value to force mount attempts to fail */
919  		fs_err(sdp, "control_mount control_lock disabled\n");
920  		error = -EINVAL;
921  		goto fail;
922  	}
923  
924  	if (mounted_mode == DLM_LOCK_EX) {
925  		/* first mounter, keep both EX while doing first recovery */
926  		spin_lock(&ls->ls_recover_spin);
927  		clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
928  		set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
929  		set_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
930  		spin_unlock(&ls->ls_recover_spin);
931  		fs_info(sdp, "first mounter control generation %u\n", lvb_gen);
932  		return 0;
933  	}
934  
935  	error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT);
936  	if (error)
937  		goto fail;
938  
939  	/*
940  	 * We are not first mounter, now we need to wait for the control_lock
941  	 * lvb generation to be >= the generation from our first recover_done
942  	 * and all lvb bits to be clear (no pending journal recoveries.)
943  	 */
944  
945  	if (!all_jid_bits_clear(ls->ls_lvb_bits)) {
946  		/* journals need recovery, wait until all are clear */
947  		fs_info(sdp, "control_mount wait for journal recovery\n");
948  		goto restart;
949  	}
950  
951  	spin_lock(&ls->ls_recover_spin);
952  	block_gen = ls->ls_recover_block;
953  	start_gen = ls->ls_recover_start;
954  	mount_gen = ls->ls_recover_mount;
955  
956  	if (lvb_gen < mount_gen) {
957  		/* wait for mounted nodes to update control_lock lvb to our
958  		   generation, which might include new recovery bits set */
959  		if (sdp->sd_args.ar_spectator) {
960  			fs_info(sdp, "Recovery is required. Waiting for a "
961  				"non-spectator to mount.\n");
962  			msleep_interruptible(1000);
963  		} else {
964  			fs_info(sdp, "control_mount wait1 block %u start %u "
965  				"mount %u lvb %u flags %lx\n", block_gen,
966  				start_gen, mount_gen, lvb_gen,
967  				ls->ls_recover_flags);
968  		}
969  		spin_unlock(&ls->ls_recover_spin);
970  		goto restart;
971  	}
972  
973  	if (lvb_gen != start_gen) {
974  		/* wait for mounted nodes to update control_lock lvb to the
975  		   latest recovery generation */
976  		fs_info(sdp, "control_mount wait2 block %u start %u mount %u "
977  			"lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
978  			lvb_gen, ls->ls_recover_flags);
979  		spin_unlock(&ls->ls_recover_spin);
980  		goto restart;
981  	}
982  
983  	if (block_gen == start_gen) {
984  		/* dlm recovery in progress, wait for it to finish */
985  		fs_info(sdp, "control_mount wait3 block %u start %u mount %u "
986  			"lvb %u flags %lx\n", block_gen, start_gen, mount_gen,
987  			lvb_gen, ls->ls_recover_flags);
988  		spin_unlock(&ls->ls_recover_spin);
989  		goto restart;
990  	}
991  
992  	clear_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
993  	set_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags);
994  	memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
995  	memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
996  	spin_unlock(&ls->ls_recover_spin);
997  	return 0;
998  
999  fail:
1000  	mounted_unlock(sdp);
1001  	control_unlock(sdp);
1002  	return error;
1003  }
1004  
control_first_done(struct gfs2_sbd * sdp)1005  static int control_first_done(struct gfs2_sbd *sdp)
1006  {
1007  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1008  	uint32_t start_gen, block_gen;
1009  	int error;
1010  
1011  restart:
1012  	spin_lock(&ls->ls_recover_spin);
1013  	start_gen = ls->ls_recover_start;
1014  	block_gen = ls->ls_recover_block;
1015  
1016  	if (test_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags) ||
1017  	    !test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
1018  	    !test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1019  		/* sanity check, should not happen */
1020  		fs_err(sdp, "control_first_done start %u block %u flags %lx\n",
1021  		       start_gen, block_gen, ls->ls_recover_flags);
1022  		spin_unlock(&ls->ls_recover_spin);
1023  		control_unlock(sdp);
1024  		return -1;
1025  	}
1026  
1027  	if (start_gen == block_gen) {
1028  		/*
1029  		 * Wait for the end of a dlm recovery cycle to switch from
1030  		 * first mounter recovery.  We can ignore any recover_slot
1031  		 * callbacks between the recover_prep and next recover_done
1032  		 * because we are still the first mounter and any failed nodes
1033  		 * have not fully mounted, so they don't need recovery.
1034  		 */
1035  		spin_unlock(&ls->ls_recover_spin);
1036  		fs_info(sdp, "control_first_done wait gen %u\n", start_gen);
1037  
1038  		wait_on_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY,
1039  			    TASK_UNINTERRUPTIBLE);
1040  		goto restart;
1041  	}
1042  
1043  	clear_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
1044  	set_bit(DFL_FIRST_MOUNT_DONE, &ls->ls_recover_flags);
1045  	memset(ls->ls_recover_submit, 0, ls->ls_recover_size*sizeof(uint32_t));
1046  	memset(ls->ls_recover_result, 0, ls->ls_recover_size*sizeof(uint32_t));
1047  	spin_unlock(&ls->ls_recover_spin);
1048  
1049  	memset(ls->ls_lvb_bits, 0, GDLM_LVB_SIZE);
1050  	control_lvb_write(ls, start_gen, ls->ls_lvb_bits);
1051  
1052  	error = mounted_lock(sdp, DLM_LOCK_PR, DLM_LKF_CONVERT);
1053  	if (error)
1054  		fs_err(sdp, "control_first_done mounted PR error %d\n", error);
1055  
1056  	error = control_lock(sdp, DLM_LOCK_NL, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
1057  	if (error)
1058  		fs_err(sdp, "control_first_done control NL error %d\n", error);
1059  
1060  	return error;
1061  }
1062  
1063  /*
1064   * Expand static jid arrays if necessary (by increments of RECOVER_SIZE_INC)
1065   * to accommodate the largest slot number.  (NB dlm slot numbers start at 1,
1066   * gfs2 jids start at 0, so jid = slot - 1)
1067   */
1068  
1069  #define RECOVER_SIZE_INC 16
1070  
set_recover_size(struct gfs2_sbd * sdp,struct dlm_slot * slots,int num_slots)1071  static int set_recover_size(struct gfs2_sbd *sdp, struct dlm_slot *slots,
1072  			    int num_slots)
1073  {
1074  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1075  	uint32_t *submit = NULL;
1076  	uint32_t *result = NULL;
1077  	uint32_t old_size, new_size;
1078  	int i, max_jid;
1079  
1080  	if (!ls->ls_lvb_bits) {
1081  		ls->ls_lvb_bits = kzalloc(GDLM_LVB_SIZE, GFP_NOFS);
1082  		if (!ls->ls_lvb_bits)
1083  			return -ENOMEM;
1084  	}
1085  
1086  	max_jid = 0;
1087  	for (i = 0; i < num_slots; i++) {
1088  		if (max_jid < slots[i].slot - 1)
1089  			max_jid = slots[i].slot - 1;
1090  	}
1091  
1092  	old_size = ls->ls_recover_size;
1093  	new_size = old_size;
1094  	while (new_size < max_jid + 1)
1095  		new_size += RECOVER_SIZE_INC;
1096  	if (new_size == old_size)
1097  		return 0;
1098  
1099  	submit = kcalloc(new_size, sizeof(uint32_t), GFP_NOFS);
1100  	result = kcalloc(new_size, sizeof(uint32_t), GFP_NOFS);
1101  	if (!submit || !result) {
1102  		kfree(submit);
1103  		kfree(result);
1104  		return -ENOMEM;
1105  	}
1106  
1107  	spin_lock(&ls->ls_recover_spin);
1108  	memcpy(submit, ls->ls_recover_submit, old_size * sizeof(uint32_t));
1109  	memcpy(result, ls->ls_recover_result, old_size * sizeof(uint32_t));
1110  	kfree(ls->ls_recover_submit);
1111  	kfree(ls->ls_recover_result);
1112  	ls->ls_recover_submit = submit;
1113  	ls->ls_recover_result = result;
1114  	ls->ls_recover_size = new_size;
1115  	spin_unlock(&ls->ls_recover_spin);
1116  	return 0;
1117  }
1118  
free_recover_size(struct lm_lockstruct * ls)1119  static void free_recover_size(struct lm_lockstruct *ls)
1120  {
1121  	kfree(ls->ls_lvb_bits);
1122  	kfree(ls->ls_recover_submit);
1123  	kfree(ls->ls_recover_result);
1124  	ls->ls_recover_submit = NULL;
1125  	ls->ls_recover_result = NULL;
1126  	ls->ls_recover_size = 0;
1127  	ls->ls_lvb_bits = NULL;
1128  }
1129  
1130  /* dlm calls before it does lock recovery */
1131  
gdlm_recover_prep(void * arg)1132  static void gdlm_recover_prep(void *arg)
1133  {
1134  	struct gfs2_sbd *sdp = arg;
1135  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1136  
1137  	if (gfs2_withdrawing_or_withdrawn(sdp)) {
1138  		fs_err(sdp, "recover_prep ignored due to withdraw.\n");
1139  		return;
1140  	}
1141  	spin_lock(&ls->ls_recover_spin);
1142  	ls->ls_recover_block = ls->ls_recover_start;
1143  	set_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1144  
1145  	if (!test_bit(DFL_MOUNT_DONE, &ls->ls_recover_flags) ||
1146  	     test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1147  		spin_unlock(&ls->ls_recover_spin);
1148  		return;
1149  	}
1150  	set_bit(DFL_BLOCK_LOCKS, &ls->ls_recover_flags);
1151  	spin_unlock(&ls->ls_recover_spin);
1152  }
1153  
1154  /* dlm calls after recover_prep has been completed on all lockspace members;
1155     identifies slot/jid of failed member */
1156  
gdlm_recover_slot(void * arg,struct dlm_slot * slot)1157  static void gdlm_recover_slot(void *arg, struct dlm_slot *slot)
1158  {
1159  	struct gfs2_sbd *sdp = arg;
1160  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1161  	int jid = slot->slot - 1;
1162  
1163  	if (gfs2_withdrawing_or_withdrawn(sdp)) {
1164  		fs_err(sdp, "recover_slot jid %d ignored due to withdraw.\n",
1165  		       jid);
1166  		return;
1167  	}
1168  	spin_lock(&ls->ls_recover_spin);
1169  	if (ls->ls_recover_size < jid + 1) {
1170  		fs_err(sdp, "recover_slot jid %d gen %u short size %d\n",
1171  		       jid, ls->ls_recover_block, ls->ls_recover_size);
1172  		spin_unlock(&ls->ls_recover_spin);
1173  		return;
1174  	}
1175  
1176  	if (ls->ls_recover_submit[jid]) {
1177  		fs_info(sdp, "recover_slot jid %d gen %u prev %u\n",
1178  			jid, ls->ls_recover_block, ls->ls_recover_submit[jid]);
1179  	}
1180  	ls->ls_recover_submit[jid] = ls->ls_recover_block;
1181  	spin_unlock(&ls->ls_recover_spin);
1182  }
1183  
1184  /* dlm calls after recover_slot and after it completes lock recovery */
1185  
gdlm_recover_done(void * arg,struct dlm_slot * slots,int num_slots,int our_slot,uint32_t generation)1186  static void gdlm_recover_done(void *arg, struct dlm_slot *slots, int num_slots,
1187  			      int our_slot, uint32_t generation)
1188  {
1189  	struct gfs2_sbd *sdp = arg;
1190  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1191  
1192  	if (gfs2_withdrawing_or_withdrawn(sdp)) {
1193  		fs_err(sdp, "recover_done ignored due to withdraw.\n");
1194  		return;
1195  	}
1196  	/* ensure the ls jid arrays are large enough */
1197  	set_recover_size(sdp, slots, num_slots);
1198  
1199  	spin_lock(&ls->ls_recover_spin);
1200  	ls->ls_recover_start = generation;
1201  
1202  	if (!ls->ls_recover_mount) {
1203  		ls->ls_recover_mount = generation;
1204  		ls->ls_jid = our_slot - 1;
1205  	}
1206  
1207  	if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1208  		queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0);
1209  
1210  	clear_bit(DFL_DLM_RECOVERY, &ls->ls_recover_flags);
1211  	smp_mb__after_atomic();
1212  	wake_up_bit(&ls->ls_recover_flags, DFL_DLM_RECOVERY);
1213  	spin_unlock(&ls->ls_recover_spin);
1214  }
1215  
1216  /* gfs2_recover thread has a journal recovery result */
1217  
gdlm_recovery_result(struct gfs2_sbd * sdp,unsigned int jid,unsigned int result)1218  static void gdlm_recovery_result(struct gfs2_sbd *sdp, unsigned int jid,
1219  				 unsigned int result)
1220  {
1221  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1222  
1223  	if (gfs2_withdrawing_or_withdrawn(sdp)) {
1224  		fs_err(sdp, "recovery_result jid %d ignored due to withdraw.\n",
1225  		       jid);
1226  		return;
1227  	}
1228  	if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1229  		return;
1230  
1231  	/* don't care about the recovery of own journal during mount */
1232  	if (jid == ls->ls_jid)
1233  		return;
1234  
1235  	spin_lock(&ls->ls_recover_spin);
1236  	if (test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags)) {
1237  		spin_unlock(&ls->ls_recover_spin);
1238  		return;
1239  	}
1240  	if (ls->ls_recover_size < jid + 1) {
1241  		fs_err(sdp, "recovery_result jid %d short size %d\n",
1242  		       jid, ls->ls_recover_size);
1243  		spin_unlock(&ls->ls_recover_spin);
1244  		return;
1245  	}
1246  
1247  	fs_info(sdp, "recover jid %d result %s\n", jid,
1248  		result == LM_RD_GAVEUP ? "busy" : "success");
1249  
1250  	ls->ls_recover_result[jid] = result;
1251  
1252  	/* GAVEUP means another node is recovering the journal; delay our
1253  	   next attempt to recover it, to give the other node a chance to
1254  	   finish before trying again */
1255  
1256  	if (!test_bit(DFL_UNMOUNT, &ls->ls_recover_flags))
1257  		queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work,
1258  				   result == LM_RD_GAVEUP ? HZ : 0);
1259  	spin_unlock(&ls->ls_recover_spin);
1260  }
1261  
1262  static const struct dlm_lockspace_ops gdlm_lockspace_ops = {
1263  	.recover_prep = gdlm_recover_prep,
1264  	.recover_slot = gdlm_recover_slot,
1265  	.recover_done = gdlm_recover_done,
1266  };
1267  
gdlm_mount(struct gfs2_sbd * sdp,const char * table)1268  static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
1269  {
1270  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1271  	char cluster[GFS2_LOCKNAME_LEN];
1272  	const char *fsname;
1273  	uint32_t flags;
1274  	int error, ops_result;
1275  
1276  	/*
1277  	 * initialize everything
1278  	 */
1279  
1280  	INIT_DELAYED_WORK(&sdp->sd_control_work, gfs2_control_func);
1281  	spin_lock_init(&ls->ls_recover_spin);
1282  	ls->ls_recover_flags = 0;
1283  	ls->ls_recover_mount = 0;
1284  	ls->ls_recover_start = 0;
1285  	ls->ls_recover_block = 0;
1286  	ls->ls_recover_size = 0;
1287  	ls->ls_recover_submit = NULL;
1288  	ls->ls_recover_result = NULL;
1289  	ls->ls_lvb_bits = NULL;
1290  
1291  	error = set_recover_size(sdp, NULL, 0);
1292  	if (error)
1293  		goto fail;
1294  
1295  	/*
1296  	 * prepare dlm_new_lockspace args
1297  	 */
1298  
1299  	fsname = strchr(table, ':');
1300  	if (!fsname) {
1301  		fs_info(sdp, "no fsname found\n");
1302  		error = -EINVAL;
1303  		goto fail_free;
1304  	}
1305  	memset(cluster, 0, sizeof(cluster));
1306  	memcpy(cluster, table, strlen(table) - strlen(fsname));
1307  	fsname++;
1308  
1309  	flags = DLM_LSFL_NEWEXCL;
1310  
1311  	/*
1312  	 * create/join lockspace
1313  	 */
1314  
1315  	error = dlm_new_lockspace(fsname, cluster, flags, GDLM_LVB_SIZE,
1316  				  &gdlm_lockspace_ops, sdp, &ops_result,
1317  				  &ls->ls_dlm);
1318  	if (error) {
1319  		fs_err(sdp, "dlm_new_lockspace error %d\n", error);
1320  		goto fail_free;
1321  	}
1322  
1323  	if (ops_result < 0) {
1324  		/*
1325  		 * dlm does not support ops callbacks,
1326  		 * old dlm_controld/gfs_controld are used, try without ops.
1327  		 */
1328  		fs_info(sdp, "dlm lockspace ops not used\n");
1329  		free_recover_size(ls);
1330  		set_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags);
1331  		return 0;
1332  	}
1333  
1334  	if (!test_bit(SDF_NOJOURNALID, &sdp->sd_flags)) {
1335  		fs_err(sdp, "dlm lockspace ops disallow jid preset\n");
1336  		error = -EINVAL;
1337  		goto fail_release;
1338  	}
1339  
1340  	/*
1341  	 * control_mount() uses control_lock to determine first mounter,
1342  	 * and for later mounts, waits for any recoveries to be cleared.
1343  	 */
1344  
1345  	error = control_mount(sdp);
1346  	if (error) {
1347  		fs_err(sdp, "mount control error %d\n", error);
1348  		goto fail_release;
1349  	}
1350  
1351  	ls->ls_first = !!test_bit(DFL_FIRST_MOUNT, &ls->ls_recover_flags);
1352  	clear_bit(SDF_NOJOURNALID, &sdp->sd_flags);
1353  	smp_mb__after_atomic();
1354  	wake_up_bit(&sdp->sd_flags, SDF_NOJOURNALID);
1355  	return 0;
1356  
1357  fail_release:
1358  	dlm_release_lockspace(ls->ls_dlm, 2);
1359  fail_free:
1360  	free_recover_size(ls);
1361  fail:
1362  	return error;
1363  }
1364  
gdlm_first_done(struct gfs2_sbd * sdp)1365  static void gdlm_first_done(struct gfs2_sbd *sdp)
1366  {
1367  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1368  	int error;
1369  
1370  	if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1371  		return;
1372  
1373  	error = control_first_done(sdp);
1374  	if (error)
1375  		fs_err(sdp, "mount first_done error %d\n", error);
1376  }
1377  
gdlm_unmount(struct gfs2_sbd * sdp)1378  static void gdlm_unmount(struct gfs2_sbd *sdp)
1379  {
1380  	struct lm_lockstruct *ls = &sdp->sd_lockstruct;
1381  
1382  	if (test_bit(DFL_NO_DLM_OPS, &ls->ls_recover_flags))
1383  		goto release;
1384  
1385  	/* wait for gfs2_control_wq to be done with this mount */
1386  
1387  	spin_lock(&ls->ls_recover_spin);
1388  	set_bit(DFL_UNMOUNT, &ls->ls_recover_flags);
1389  	spin_unlock(&ls->ls_recover_spin);
1390  	flush_delayed_work(&sdp->sd_control_work);
1391  
1392  	/* mounted_lock and control_lock will be purged in dlm recovery */
1393  release:
1394  	if (ls->ls_dlm) {
1395  		dlm_release_lockspace(ls->ls_dlm, 2);
1396  		ls->ls_dlm = NULL;
1397  	}
1398  
1399  	free_recover_size(ls);
1400  }
1401  
1402  static const match_table_t dlm_tokens = {
1403  	{ Opt_jid, "jid=%d"},
1404  	{ Opt_id, "id=%d"},
1405  	{ Opt_first, "first=%d"},
1406  	{ Opt_nodir, "nodir=%d"},
1407  	{ Opt_err, NULL },
1408  };
1409  
1410  const struct lm_lockops gfs2_dlm_ops = {
1411  	.lm_proto_name = "lock_dlm",
1412  	.lm_mount = gdlm_mount,
1413  	.lm_first_done = gdlm_first_done,
1414  	.lm_recovery_result = gdlm_recovery_result,
1415  	.lm_unmount = gdlm_unmount,
1416  	.lm_put_lock = gdlm_put_lock,
1417  	.lm_lock = gdlm_lock,
1418  	.lm_cancel = gdlm_cancel,
1419  	.lm_tokens = &dlm_tokens,
1420  };
1421  
1422