xref: /openbmc/linux/fs/dlm/recoverd.c (revision 96de0e252cedffad61b3cb5e05662c591898e69a)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "dir.h"
18 #include "ast.h"
19 #include "recover.h"
20 #include "lowcomms.h"
21 #include "lock.h"
22 #include "requestqueue.h"
23 #include "recoverd.h"
24 
25 
26 /* If the start for which we're re-enabling locking (seq) has been superseded
27    by a newer stop (ls_recover_seq), we need to leave locking disabled.
28 
29    We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
30    locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
31    enables locking and clears the requestqueue between a and b. */
32 
33 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
34 {
35 	int error = -EINTR;
36 
37 	down_write(&ls->ls_recv_active);
38 
39 	spin_lock(&ls->ls_recover_lock);
40 	if (ls->ls_recover_seq == seq) {
41 		set_bit(LSFL_RUNNING, &ls->ls_flags);
42 		/* unblocks processes waiting to enter the dlm */
43 		up_write(&ls->ls_in_recovery);
44 		error = 0;
45 	}
46 	spin_unlock(&ls->ls_recover_lock);
47 
48 	up_write(&ls->ls_recv_active);
49 	return error;
50 }
51 
52 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
53 {
54 	unsigned long start;
55 	int error, neg = 0;
56 
57 	log_debug(ls, "recover %llx", (unsigned long long)rv->seq);
58 
59 	mutex_lock(&ls->ls_recoverd_active);
60 
61 	/*
62 	 * Suspending and resuming dlm_astd ensures that no lkb's from this ls
63 	 * will be processed by dlm_astd during recovery.
64 	 */
65 
66 	dlm_astd_suspend();
67 	dlm_astd_resume();
68 
69 	/*
70 	 * This list of root rsb's will be the basis of most of the recovery
71 	 * routines.
72 	 */
73 
74 	dlm_create_root_list(ls);
75 
76 	/*
77 	 * Free all the tossed rsb's so we don't have to recover them.
78 	 */
79 
80 	dlm_clear_toss_list(ls);
81 
82 	/*
83 	 * Add or remove nodes from the lockspace's ls_nodes list.
84 	 * Also waits for all nodes to complete dlm_recover_members.
85 	 */
86 
87 	error = dlm_recover_members(ls, rv, &neg);
88 	if (error) {
89 		log_debug(ls, "recover_members failed %d", error);
90 		goto fail;
91 	}
92 	start = jiffies;
93 
94 	/*
95 	 * Rebuild our own share of the directory by collecting from all other
96 	 * nodes their master rsb names that hash to us.
97 	 */
98 
99 	error = dlm_recover_directory(ls);
100 	if (error) {
101 		log_debug(ls, "recover_directory failed %d", error);
102 		goto fail;
103 	}
104 
105 	/*
106 	 * Wait for all nodes to complete directory rebuild.
107 	 */
108 
109 	error = dlm_recover_directory_wait(ls);
110 	if (error) {
111 		log_debug(ls, "recover_directory_wait failed %d", error);
112 		goto fail;
113 	}
114 
115 	/*
116 	 * We may have outstanding operations that are waiting for a reply from
117 	 * a failed node.  Mark these to be resent after recovery.  Unlock and
118 	 * cancel ops can just be completed.
119 	 */
120 
121 	dlm_recover_waiters_pre(ls);
122 
123 	error = dlm_recovery_stopped(ls);
124 	if (error)
125 		goto fail;
126 
127 	if (neg || dlm_no_directory(ls)) {
128 		/*
129 		 * Clear lkb's for departed nodes.
130 		 */
131 
132 		dlm_purge_locks(ls);
133 
134 		/*
135 		 * Get new master nodeid's for rsb's that were mastered on
136 		 * departed nodes.
137 		 */
138 
139 		error = dlm_recover_masters(ls);
140 		if (error) {
141 			log_debug(ls, "recover_masters failed %d", error);
142 			goto fail;
143 		}
144 
145 		/*
146 		 * Send our locks on remastered rsb's to the new masters.
147 		 */
148 
149 		error = dlm_recover_locks(ls);
150 		if (error) {
151 			log_debug(ls, "recover_locks failed %d", error);
152 			goto fail;
153 		}
154 
155 		error = dlm_recover_locks_wait(ls);
156 		if (error) {
157 			log_debug(ls, "recover_locks_wait failed %d", error);
158 			goto fail;
159 		}
160 
161 		/*
162 		 * Finalize state in master rsb's now that all locks can be
163 		 * checked.  This includes conversion resolution and lvb
164 		 * settings.
165 		 */
166 
167 		dlm_recover_rsbs(ls);
168 	} else {
169 		/*
170 		 * Other lockspace members may be going through the "neg" steps
171 		 * while also adding us to the lockspace, in which case they'll
172 		 * be doing the recover_locks (RS_LOCKS) barrier.
173 		 */
174 		dlm_set_recover_status(ls, DLM_RS_LOCKS);
175 
176 		error = dlm_recover_locks_wait(ls);
177 		if (error) {
178 			log_debug(ls, "recover_locks_wait failed %d", error);
179 			goto fail;
180 		}
181 	}
182 
183 	dlm_release_root_list(ls);
184 
185 	/*
186 	 * Purge directory-related requests that are saved in requestqueue.
187 	 * All dir requests from before recovery are invalid now due to the dir
188 	 * rebuild and will be resent by the requesting nodes.
189 	 */
190 
191 	dlm_purge_requestqueue(ls);
192 
193 	dlm_set_recover_status(ls, DLM_RS_DONE);
194 	error = dlm_recover_done_wait(ls);
195 	if (error) {
196 		log_debug(ls, "recover_done_wait failed %d", error);
197 		goto fail;
198 	}
199 
200 	dlm_clear_members_gone(ls);
201 
202 	dlm_adjust_timeouts(ls);
203 
204 	error = enable_locking(ls, rv->seq);
205 	if (error) {
206 		log_debug(ls, "enable_locking failed %d", error);
207 		goto fail;
208 	}
209 
210 	error = dlm_process_requestqueue(ls);
211 	if (error) {
212 		log_debug(ls, "process_requestqueue failed %d", error);
213 		goto fail;
214 	}
215 
216 	error = dlm_recover_waiters_post(ls);
217 	if (error) {
218 		log_debug(ls, "recover_waiters_post failed %d", error);
219 		goto fail;
220 	}
221 
222 	dlm_grant_after_purge(ls);
223 
224 	dlm_astd_wake();
225 
226 	log_debug(ls, "recover %llx done: %u ms",
227 		  (unsigned long long)rv->seq,
228 		  jiffies_to_msecs(jiffies - start));
229 	mutex_unlock(&ls->ls_recoverd_active);
230 
231 	return 0;
232 
233  fail:
234 	dlm_release_root_list(ls);
235 	log_debug(ls, "recover %llx error %d",
236 		  (unsigned long long)rv->seq, error);
237 	mutex_unlock(&ls->ls_recoverd_active);
238 	return error;
239 }
240 
241 /* The dlm_ls_start() that created the rv we take here may already have been
242    stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
243    flag set. */
244 
245 static void do_ls_recovery(struct dlm_ls *ls)
246 {
247 	struct dlm_recover *rv = NULL;
248 
249 	spin_lock(&ls->ls_recover_lock);
250 	rv = ls->ls_recover_args;
251 	ls->ls_recover_args = NULL;
252 	if (rv && ls->ls_recover_seq == rv->seq)
253 		clear_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
254 	spin_unlock(&ls->ls_recover_lock);
255 
256 	if (rv) {
257 		ls_recover(ls, rv);
258 		kfree(rv->nodeids);
259 		kfree(rv);
260 	}
261 }
262 
263 static int dlm_recoverd(void *arg)
264 {
265 	struct dlm_ls *ls;
266 
267 	ls = dlm_find_lockspace_local(arg);
268 	if (!ls) {
269 		log_print("dlm_recoverd: no lockspace %p", arg);
270 		return -1;
271 	}
272 
273 	while (!kthread_should_stop()) {
274 		set_current_state(TASK_INTERRUPTIBLE);
275 		if (!test_bit(LSFL_WORK, &ls->ls_flags))
276 			schedule();
277 		set_current_state(TASK_RUNNING);
278 
279 		if (test_and_clear_bit(LSFL_WORK, &ls->ls_flags))
280 			do_ls_recovery(ls);
281 	}
282 
283 	dlm_put_lockspace(ls);
284 	return 0;
285 }
286 
287 void dlm_recoverd_kick(struct dlm_ls *ls)
288 {
289 	set_bit(LSFL_WORK, &ls->ls_flags);
290 	wake_up_process(ls->ls_recoverd_task);
291 }
292 
293 int dlm_recoverd_start(struct dlm_ls *ls)
294 {
295 	struct task_struct *p;
296 	int error = 0;
297 
298 	p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
299 	if (IS_ERR(p))
300 		error = PTR_ERR(p);
301 	else
302                 ls->ls_recoverd_task = p;
303 	return error;
304 }
305 
306 void dlm_recoverd_stop(struct dlm_ls *ls)
307 {
308 	kthread_stop(ls->ls_recoverd_task);
309 }
310 
311 void dlm_recoverd_suspend(struct dlm_ls *ls)
312 {
313 	wake_up(&ls->ls_wait_general);
314 	mutex_lock(&ls->ls_recoverd_active);
315 }
316 
317 void dlm_recoverd_resume(struct dlm_ls *ls)
318 {
319 	mutex_unlock(&ls->ls_recoverd_active);
320 }
321 
322