1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5 ** Copyright (C) 2004-2007 Red Hat, Inc. All rights reserved. 6 ** 7 ** This copyrighted material is made available to anyone wishing to use, 8 ** modify, copy, or redistribute it subject to the terms and conditions 9 ** of the GNU General Public License v.2. 10 ** 11 ******************************************************************************* 12 ******************************************************************************/ 13 14 #include "dlm_internal.h" 15 #include "lockspace.h" 16 #include "member.h" 17 #include "dir.h" 18 #include "ast.h" 19 #include "recover.h" 20 #include "lowcomms.h" 21 #include "lock.h" 22 #include "requestqueue.h" 23 #include "recoverd.h" 24 25 26 /* If the start for which we're re-enabling locking (seq) has been superseded 27 by a newer stop (ls_recover_seq), we need to leave locking disabled. 28 29 We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees 30 locking stopped and b) adds a message to the requestqueue, but dlm_recoverd 31 enables locking and clears the requestqueue between a and b. */ 32 33 static int enable_locking(struct dlm_ls *ls, uint64_t seq) 34 { 35 int error = -EINTR; 36 37 down_write(&ls->ls_recv_active); 38 39 spin_lock(&ls->ls_recover_lock); 40 if (ls->ls_recover_seq == seq) { 41 set_bit(LSFL_RUNNING, &ls->ls_flags); 42 /* unblocks processes waiting to enter the dlm */ 43 up_write(&ls->ls_in_recovery); 44 error = 0; 45 } 46 spin_unlock(&ls->ls_recover_lock); 47 48 up_write(&ls->ls_recv_active); 49 return error; 50 } 51 52 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) 53 { 54 unsigned long start; 55 int error, neg = 0; 56 57 log_debug(ls, "recover %llx", (unsigned long long)rv->seq); 58 59 mutex_lock(&ls->ls_recoverd_active); 60 61 /* 62 * Suspending and resuming dlm_astd ensures that no lkb's from this ls 63 * will be processed by dlm_astd during recovery. 64 */ 65 66 dlm_astd_suspend(); 67 dlm_astd_resume(); 68 69 /* 70 * This list of root rsb's will be the basis of most of the recovery 71 * routines. 72 */ 73 74 dlm_create_root_list(ls); 75 76 /* 77 * Free all the tossed rsb's so we don't have to recover them. 78 */ 79 80 dlm_clear_toss_list(ls); 81 82 /* 83 * Add or remove nodes from the lockspace's ls_nodes list. 84 * Also waits for all nodes to complete dlm_recover_members. 85 */ 86 87 error = dlm_recover_members(ls, rv, &neg); 88 if (error) { 89 log_debug(ls, "recover_members failed %d", error); 90 goto fail; 91 } 92 start = jiffies; 93 94 /* 95 * Rebuild our own share of the directory by collecting from all other 96 * nodes their master rsb names that hash to us. 97 */ 98 99 error = dlm_recover_directory(ls); 100 if (error) { 101 log_debug(ls, "recover_directory failed %d", error); 102 goto fail; 103 } 104 105 /* 106 * Wait for all nodes to complete directory rebuild. 107 */ 108 109 error = dlm_recover_directory_wait(ls); 110 if (error) { 111 log_debug(ls, "recover_directory_wait failed %d", error); 112 goto fail; 113 } 114 115 /* 116 * We may have outstanding operations that are waiting for a reply from 117 * a failed node. Mark these to be resent after recovery. Unlock and 118 * cancel ops can just be completed. 119 */ 120 121 dlm_recover_waiters_pre(ls); 122 123 error = dlm_recovery_stopped(ls); 124 if (error) 125 goto fail; 126 127 if (neg || dlm_no_directory(ls)) { 128 /* 129 * Clear lkb's for departed nodes. 130 */ 131 132 dlm_purge_locks(ls); 133 134 /* 135 * Get new master nodeid's for rsb's that were mastered on 136 * departed nodes. 137 */ 138 139 error = dlm_recover_masters(ls); 140 if (error) { 141 log_debug(ls, "recover_masters failed %d", error); 142 goto fail; 143 } 144 145 /* 146 * Send our locks on remastered rsb's to the new masters. 147 */ 148 149 error = dlm_recover_locks(ls); 150 if (error) { 151 log_debug(ls, "recover_locks failed %d", error); 152 goto fail; 153 } 154 155 error = dlm_recover_locks_wait(ls); 156 if (error) { 157 log_debug(ls, "recover_locks_wait failed %d", error); 158 goto fail; 159 } 160 161 /* 162 * Finalize state in master rsb's now that all locks can be 163 * checked. This includes conversion resolution and lvb 164 * settings. 165 */ 166 167 dlm_recover_rsbs(ls); 168 } else { 169 /* 170 * Other lockspace members may be going through the "neg" steps 171 * while also adding us to the lockspace, in which case they'll 172 * be doing the recover_locks (RS_LOCKS) barrier. 173 */ 174 dlm_set_recover_status(ls, DLM_RS_LOCKS); 175 176 error = dlm_recover_locks_wait(ls); 177 if (error) { 178 log_debug(ls, "recover_locks_wait failed %d", error); 179 goto fail; 180 } 181 } 182 183 dlm_release_root_list(ls); 184 185 /* 186 * Purge directory-related requests that are saved in requestqueue. 187 * All dir requests from before recovery are invalid now due to the dir 188 * rebuild and will be resent by the requesting nodes. 189 */ 190 191 dlm_purge_requestqueue(ls); 192 193 dlm_set_recover_status(ls, DLM_RS_DONE); 194 error = dlm_recover_done_wait(ls); 195 if (error) { 196 log_debug(ls, "recover_done_wait failed %d", error); 197 goto fail; 198 } 199 200 dlm_clear_members_gone(ls); 201 202 dlm_adjust_timeouts(ls); 203 204 error = enable_locking(ls, rv->seq); 205 if (error) { 206 log_debug(ls, "enable_locking failed %d", error); 207 goto fail; 208 } 209 210 error = dlm_process_requestqueue(ls); 211 if (error) { 212 log_debug(ls, "process_requestqueue failed %d", error); 213 goto fail; 214 } 215 216 error = dlm_recover_waiters_post(ls); 217 if (error) { 218 log_debug(ls, "recover_waiters_post failed %d", error); 219 goto fail; 220 } 221 222 dlm_grant_after_purge(ls); 223 224 dlm_astd_wake(); 225 226 log_debug(ls, "recover %llx done: %u ms", 227 (unsigned long long)rv->seq, 228 jiffies_to_msecs(jiffies - start)); 229 mutex_unlock(&ls->ls_recoverd_active); 230 231 return 0; 232 233 fail: 234 dlm_release_root_list(ls); 235 log_debug(ls, "recover %llx error %d", 236 (unsigned long long)rv->seq, error); 237 mutex_unlock(&ls->ls_recoverd_active); 238 return error; 239 } 240 241 /* The dlm_ls_start() that created the rv we take here may already have been 242 stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP 243 flag set. */ 244 245 static void do_ls_recovery(struct dlm_ls *ls) 246 { 247 struct dlm_recover *rv = NULL; 248 249 spin_lock(&ls->ls_recover_lock); 250 rv = ls->ls_recover_args; 251 ls->ls_recover_args = NULL; 252 if (rv && ls->ls_recover_seq == rv->seq) 253 clear_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); 254 spin_unlock(&ls->ls_recover_lock); 255 256 if (rv) { 257 ls_recover(ls, rv); 258 kfree(rv->nodeids); 259 kfree(rv); 260 } 261 } 262 263 static int dlm_recoverd(void *arg) 264 { 265 struct dlm_ls *ls; 266 267 ls = dlm_find_lockspace_local(arg); 268 if (!ls) { 269 log_print("dlm_recoverd: no lockspace %p", arg); 270 return -1; 271 } 272 273 while (!kthread_should_stop()) { 274 set_current_state(TASK_INTERRUPTIBLE); 275 if (!test_bit(LSFL_WORK, &ls->ls_flags)) 276 schedule(); 277 set_current_state(TASK_RUNNING); 278 279 if (test_and_clear_bit(LSFL_WORK, &ls->ls_flags)) 280 do_ls_recovery(ls); 281 } 282 283 dlm_put_lockspace(ls); 284 return 0; 285 } 286 287 void dlm_recoverd_kick(struct dlm_ls *ls) 288 { 289 set_bit(LSFL_WORK, &ls->ls_flags); 290 wake_up_process(ls->ls_recoverd_task); 291 } 292 293 int dlm_recoverd_start(struct dlm_ls *ls) 294 { 295 struct task_struct *p; 296 int error = 0; 297 298 p = kthread_run(dlm_recoverd, ls, "dlm_recoverd"); 299 if (IS_ERR(p)) 300 error = PTR_ERR(p); 301 else 302 ls->ls_recoverd_task = p; 303 return error; 304 } 305 306 void dlm_recoverd_stop(struct dlm_ls *ls) 307 { 308 kthread_stop(ls->ls_recoverd_task); 309 } 310 311 void dlm_recoverd_suspend(struct dlm_ls *ls) 312 { 313 wake_up(&ls->ls_wait_general); 314 mutex_lock(&ls->ls_recoverd_active); 315 } 316 317 void dlm_recoverd_resume(struct dlm_ls *ls) 318 { 319 mutex_unlock(&ls->ls_recoverd_active); 320 } 321 322