1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. 5 ** 6 ** This copyrighted material is made available to anyone wishing to use, 7 ** modify, copy, or redistribute it subject to the terms and conditions 8 ** of the GNU General Public License v.2. 9 ** 10 ******************************************************************************* 11 ******************************************************************************/ 12 13 #include "dlm_internal.h" 14 #include "lockspace.h" 15 #include "member.h" 16 #include "recoverd.h" 17 #include "recover.h" 18 #include "rcom.h" 19 #include "config.h" 20 21 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) 22 { 23 struct dlm_member *memb = NULL; 24 struct list_head *tmp; 25 struct list_head *newlist = &new->list; 26 struct list_head *head = &ls->ls_nodes; 27 28 list_for_each(tmp, head) { 29 memb = list_entry(tmp, struct dlm_member, list); 30 if (new->nodeid < memb->nodeid) 31 break; 32 } 33 34 if (!memb) 35 list_add_tail(newlist, head); 36 else { 37 /* FIXME: can use list macro here */ 38 newlist->prev = tmp->prev; 39 newlist->next = tmp; 40 tmp->prev->next = newlist; 41 tmp->prev = newlist; 42 } 43 } 44 45 static int dlm_add_member(struct dlm_ls *ls, int nodeid) 46 { 47 struct dlm_member *memb; 48 int w; 49 50 memb = kzalloc(sizeof(struct dlm_member), GFP_KERNEL); 51 if (!memb) 52 return -ENOMEM; 53 54 w = dlm_node_weight(ls->ls_name, nodeid); 55 if (w < 0) { 56 kfree(memb); 57 return w; 58 } 59 60 memb->nodeid = nodeid; 61 memb->weight = w; 62 add_ordered_member(ls, memb); 63 ls->ls_num_nodes++; 64 return 0; 65 } 66 67 static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb) 68 { 69 list_move(&memb->list, &ls->ls_nodes_gone); 70 ls->ls_num_nodes--; 71 } 72 73 int dlm_is_member(struct dlm_ls *ls, int nodeid) 74 { 75 struct dlm_member *memb; 76 77 list_for_each_entry(memb, &ls->ls_nodes, list) { 78 if (memb->nodeid == nodeid) 79 return 1; 80 } 81 return 0; 82 } 83 84 int dlm_is_removed(struct dlm_ls *ls, int nodeid) 85 { 86 struct dlm_member *memb; 87 88 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 89 if (memb->nodeid == nodeid) 90 return 1; 91 } 92 return 0; 93 } 94 95 static void clear_memb_list(struct list_head *head) 96 { 97 struct dlm_member *memb; 98 99 while (!list_empty(head)) { 100 memb = list_entry(head->next, struct dlm_member, list); 101 list_del(&memb->list); 102 kfree(memb); 103 } 104 } 105 106 void dlm_clear_members(struct dlm_ls *ls) 107 { 108 clear_memb_list(&ls->ls_nodes); 109 ls->ls_num_nodes = 0; 110 } 111 112 void dlm_clear_members_gone(struct dlm_ls *ls) 113 { 114 clear_memb_list(&ls->ls_nodes_gone); 115 } 116 117 static void make_member_array(struct dlm_ls *ls) 118 { 119 struct dlm_member *memb; 120 int i, w, x = 0, total = 0, all_zero = 0, *array; 121 122 kfree(ls->ls_node_array); 123 ls->ls_node_array = NULL; 124 125 list_for_each_entry(memb, &ls->ls_nodes, list) { 126 if (memb->weight) 127 total += memb->weight; 128 } 129 130 /* all nodes revert to weight of 1 if all have weight 0 */ 131 132 if (!total) { 133 total = ls->ls_num_nodes; 134 all_zero = 1; 135 } 136 137 ls->ls_total_weight = total; 138 139 array = kmalloc(sizeof(int) * total, GFP_KERNEL); 140 if (!array) 141 return; 142 143 list_for_each_entry(memb, &ls->ls_nodes, list) { 144 if (!all_zero && !memb->weight) 145 continue; 146 147 if (all_zero) 148 w = 1; 149 else 150 w = memb->weight; 151 152 DLM_ASSERT(x < total, printk("total %d x %d\n", total, x);); 153 154 for (i = 0; i < w; i++) 155 array[x++] = memb->nodeid; 156 } 157 158 ls->ls_node_array = array; 159 } 160 161 /* send a status request to all members just to establish comms connections */ 162 163 static int ping_members(struct dlm_ls *ls) 164 { 165 struct dlm_member *memb; 166 int error = 0; 167 168 list_for_each_entry(memb, &ls->ls_nodes, list) { 169 error = dlm_recovery_stopped(ls); 170 if (error) 171 break; 172 error = dlm_rcom_status(ls, memb->nodeid); 173 if (error) 174 break; 175 } 176 if (error) 177 log_debug(ls, "ping_members aborted %d last nodeid %d", 178 error, ls->ls_recover_nodeid); 179 return error; 180 } 181 182 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) 183 { 184 struct dlm_member *memb, *safe; 185 int i, error, found, pos = 0, neg = 0, low = -1; 186 187 /* previously removed members that we've not finished removing need to 188 count as a negative change so the "neg" recovery steps will happen */ 189 190 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 191 log_debug(ls, "prev removed member %d", memb->nodeid); 192 neg++; 193 } 194 195 /* move departed members from ls_nodes to ls_nodes_gone */ 196 197 list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) { 198 found = 0; 199 for (i = 0; i < rv->node_count; i++) { 200 if (memb->nodeid == rv->nodeids[i]) { 201 found = 1; 202 break; 203 } 204 } 205 206 if (!found) { 207 neg++; 208 dlm_remove_member(ls, memb); 209 log_debug(ls, "remove member %d", memb->nodeid); 210 } 211 } 212 213 /* Add an entry to ls_nodes_gone for members that were removed and 214 then added again, so that previous state for these nodes will be 215 cleared during recovery. */ 216 217 for (i = 0; i < rv->new_count; i++) { 218 if (!dlm_is_member(ls, rv->new[i])) 219 continue; 220 log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]); 221 222 memb = kzalloc(sizeof(struct dlm_member), GFP_KERNEL); 223 if (!memb) 224 return -ENOMEM; 225 memb->nodeid = rv->new[i]; 226 list_add_tail(&memb->list, &ls->ls_nodes_gone); 227 neg++; 228 } 229 230 /* add new members to ls_nodes */ 231 232 for (i = 0; i < rv->node_count; i++) { 233 if (dlm_is_member(ls, rv->nodeids[i])) 234 continue; 235 dlm_add_member(ls, rv->nodeids[i]); 236 pos++; 237 log_debug(ls, "add member %d", rv->nodeids[i]); 238 } 239 240 list_for_each_entry(memb, &ls->ls_nodes, list) { 241 if (low == -1 || memb->nodeid < low) 242 low = memb->nodeid; 243 } 244 ls->ls_low_nodeid = low; 245 246 make_member_array(ls); 247 dlm_set_recover_status(ls, DLM_RS_NODES); 248 *neg_out = neg; 249 250 error = ping_members(ls); 251 if (!error || error == -EPROTO) { 252 /* new_lockspace() may be waiting to know if the config 253 is good or bad */ 254 ls->ls_members_result = error; 255 complete(&ls->ls_members_done); 256 } 257 if (error) 258 goto out; 259 260 error = dlm_recover_members_wait(ls); 261 out: 262 log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error); 263 return error; 264 } 265 266 /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before 267 dlm_ls_start() is called on any of them to start the new recovery. */ 268 269 int dlm_ls_stop(struct dlm_ls *ls) 270 { 271 int new; 272 273 /* 274 * Prevent dlm_recv from being in the middle of something when we do 275 * the stop. This includes ensuring dlm_recv isn't processing a 276 * recovery message (rcom), while dlm_recoverd is aborting and 277 * resetting things from an in-progress recovery. i.e. we want 278 * dlm_recoverd to abort its recovery without worrying about dlm_recv 279 * processing an rcom at the same time. Stopping dlm_recv also makes 280 * it easy for dlm_receive_message() to check locking stopped and add a 281 * message to the requestqueue without races. 282 */ 283 284 down_write(&ls->ls_recv_active); 285 286 /* 287 * Abort any recovery that's in progress (see RECOVERY_STOP, 288 * dlm_recovery_stopped()) and tell any other threads running in the 289 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()). 290 */ 291 292 spin_lock(&ls->ls_recover_lock); 293 set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); 294 new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); 295 ls->ls_recover_seq++; 296 spin_unlock(&ls->ls_recover_lock); 297 298 /* 299 * Let dlm_recv run again, now any normal messages will be saved on the 300 * requestqueue for later. 301 */ 302 303 up_write(&ls->ls_recv_active); 304 305 /* 306 * This in_recovery lock does two things: 307 * 1) Keeps this function from returning until all threads are out 308 * of locking routines and locking is truely stopped. 309 * 2) Keeps any new requests from being processed until it's unlocked 310 * when recovery is complete. 311 */ 312 313 if (new) 314 down_write(&ls->ls_in_recovery); 315 316 /* 317 * The recoverd suspend/resume makes sure that dlm_recoverd (if 318 * running) has noticed RECOVERY_STOP above and quit processing the 319 * previous recovery. 320 */ 321 322 dlm_recoverd_suspend(ls); 323 ls->ls_recover_status = 0; 324 dlm_recoverd_resume(ls); 325 326 if (!ls->ls_recover_begin) 327 ls->ls_recover_begin = jiffies; 328 return 0; 329 } 330 331 int dlm_ls_start(struct dlm_ls *ls) 332 { 333 struct dlm_recover *rv = NULL, *rv_old; 334 int *ids = NULL, *new = NULL; 335 int error, ids_count = 0, new_count = 0; 336 337 rv = kzalloc(sizeof(struct dlm_recover), GFP_KERNEL); 338 if (!rv) 339 return -ENOMEM; 340 341 error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count, 342 &new, &new_count); 343 if (error < 0) 344 goto fail; 345 346 spin_lock(&ls->ls_recover_lock); 347 348 /* the lockspace needs to be stopped before it can be started */ 349 350 if (!dlm_locking_stopped(ls)) { 351 spin_unlock(&ls->ls_recover_lock); 352 log_error(ls, "start ignored: lockspace running"); 353 error = -EINVAL; 354 goto fail; 355 } 356 357 rv->nodeids = ids; 358 rv->node_count = ids_count; 359 rv->new = new; 360 rv->new_count = new_count; 361 rv->seq = ++ls->ls_recover_seq; 362 rv_old = ls->ls_recover_args; 363 ls->ls_recover_args = rv; 364 spin_unlock(&ls->ls_recover_lock); 365 366 if (rv_old) { 367 log_error(ls, "unused recovery %llx %d", 368 (unsigned long long)rv_old->seq, rv_old->node_count); 369 kfree(rv_old->nodeids); 370 kfree(rv_old->new); 371 kfree(rv_old); 372 } 373 374 dlm_recoverd_kick(ls); 375 return 0; 376 377 fail: 378 kfree(rv); 379 kfree(ids); 380 kfree(new); 381 return error; 382 } 383 384