1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. 5 ** 6 ** This copyrighted material is made available to anyone wishing to use, 7 ** modify, copy, or redistribute it subject to the terms and conditions 8 ** of the GNU General Public License v.2. 9 ** 10 ******************************************************************************* 11 ******************************************************************************/ 12 13 #include "dlm_internal.h" 14 #include "lockspace.h" 15 #include "member.h" 16 #include "recoverd.h" 17 #include "recover.h" 18 #include "rcom.h" 19 #include "config.h" 20 21 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) 22 { 23 struct dlm_member *memb = NULL; 24 struct list_head *tmp; 25 struct list_head *newlist = &new->list; 26 struct list_head *head = &ls->ls_nodes; 27 28 list_for_each(tmp, head) { 29 memb = list_entry(tmp, struct dlm_member, list); 30 if (new->nodeid < memb->nodeid) 31 break; 32 } 33 34 if (!memb) 35 list_add_tail(newlist, head); 36 else { 37 /* FIXME: can use list macro here */ 38 newlist->prev = tmp->prev; 39 newlist->next = tmp; 40 tmp->prev->next = newlist; 41 tmp->prev = newlist; 42 } 43 } 44 45 static int dlm_add_member(struct dlm_ls *ls, int nodeid) 46 { 47 struct dlm_member *memb; 48 int w; 49 50 memb = kzalloc(sizeof(struct dlm_member), GFP_KERNEL); 51 if (!memb) 52 return -ENOMEM; 53 54 w = dlm_node_weight(ls->ls_name, nodeid); 55 if (w < 0) { 56 kfree(memb); 57 return w; 58 } 59 60 memb->nodeid = nodeid; 61 memb->weight = w; 62 add_ordered_member(ls, memb); 63 ls->ls_num_nodes++; 64 return 0; 65 } 66 67 static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb) 68 { 69 list_move(&memb->list, &ls->ls_nodes_gone); 70 ls->ls_num_nodes--; 71 } 72 73 int dlm_is_member(struct dlm_ls *ls, int nodeid) 74 { 75 struct dlm_member *memb; 76 77 list_for_each_entry(memb, &ls->ls_nodes, list) { 78 if (memb->nodeid == nodeid) 79 return 1; 80 } 81 return 0; 82 } 83 84 int dlm_is_removed(struct dlm_ls *ls, int nodeid) 85 { 86 struct dlm_member *memb; 87 88 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 89 if (memb->nodeid == nodeid) 90 return 1; 91 } 92 return 0; 93 } 94 95 static void clear_memb_list(struct list_head *head) 96 { 97 struct dlm_member *memb; 98 99 while (!list_empty(head)) { 100 memb = list_entry(head->next, struct dlm_member, list); 101 list_del(&memb->list); 102 kfree(memb); 103 } 104 } 105 106 void dlm_clear_members(struct dlm_ls *ls) 107 { 108 clear_memb_list(&ls->ls_nodes); 109 ls->ls_num_nodes = 0; 110 } 111 112 void dlm_clear_members_gone(struct dlm_ls *ls) 113 { 114 clear_memb_list(&ls->ls_nodes_gone); 115 } 116 117 static void make_member_array(struct dlm_ls *ls) 118 { 119 struct dlm_member *memb; 120 int i, w, x = 0, total = 0, all_zero = 0, *array; 121 122 kfree(ls->ls_node_array); 123 ls->ls_node_array = NULL; 124 125 list_for_each_entry(memb, &ls->ls_nodes, list) { 126 if (memb->weight) 127 total += memb->weight; 128 } 129 130 /* all nodes revert to weight of 1 if all have weight 0 */ 131 132 if (!total) { 133 total = ls->ls_num_nodes; 134 all_zero = 1; 135 } 136 137 ls->ls_total_weight = total; 138 139 array = kmalloc(sizeof(int) * total, GFP_KERNEL); 140 if (!array) 141 return; 142 143 list_for_each_entry(memb, &ls->ls_nodes, list) { 144 if (!all_zero && !memb->weight) 145 continue; 146 147 if (all_zero) 148 w = 1; 149 else 150 w = memb->weight; 151 152 DLM_ASSERT(x < total, printk("total %d x %d\n", total, x);); 153 154 for (i = 0; i < w; i++) 155 array[x++] = memb->nodeid; 156 } 157 158 ls->ls_node_array = array; 159 } 160 161 /* send a status request to all members just to establish comms connections */ 162 163 static int ping_members(struct dlm_ls *ls) 164 { 165 struct dlm_member *memb; 166 int error = 0; 167 168 list_for_each_entry(memb, &ls->ls_nodes, list) { 169 error = dlm_recovery_stopped(ls); 170 if (error) 171 break; 172 error = dlm_rcom_status(ls, memb->nodeid); 173 if (error) 174 break; 175 } 176 if (error) 177 log_debug(ls, "ping_members aborted %d last nodeid %d", 178 error, ls->ls_recover_nodeid); 179 return error; 180 } 181 182 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) 183 { 184 struct dlm_member *memb, *safe; 185 int i, error, found, pos = 0, neg = 0, low = -1; 186 187 /* previously removed members that we've not finished removing need to 188 count as a negative change so the "neg" recovery steps will happen */ 189 190 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 191 log_debug(ls, "prev removed member %d", memb->nodeid); 192 neg++; 193 } 194 195 /* move departed members from ls_nodes to ls_nodes_gone */ 196 197 list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) { 198 found = 0; 199 for (i = 0; i < rv->node_count; i++) { 200 if (memb->nodeid == rv->nodeids[i]) { 201 found = 1; 202 break; 203 } 204 } 205 206 if (!found) { 207 neg++; 208 dlm_remove_member(ls, memb); 209 log_debug(ls, "remove member %d", memb->nodeid); 210 } 211 } 212 213 /* add new members to ls_nodes */ 214 215 for (i = 0; i < rv->node_count; i++) { 216 if (dlm_is_member(ls, rv->nodeids[i])) 217 continue; 218 dlm_add_member(ls, rv->nodeids[i]); 219 pos++; 220 log_debug(ls, "add member %d", rv->nodeids[i]); 221 } 222 223 list_for_each_entry(memb, &ls->ls_nodes, list) { 224 if (low == -1 || memb->nodeid < low) 225 low = memb->nodeid; 226 } 227 ls->ls_low_nodeid = low; 228 229 make_member_array(ls); 230 dlm_set_recover_status(ls, DLM_RS_NODES); 231 *neg_out = neg; 232 233 error = ping_members(ls); 234 if (!error || error == -EPROTO) { 235 /* new_lockspace() may be waiting to know if the config 236 is good or bad */ 237 ls->ls_members_result = error; 238 complete(&ls->ls_members_done); 239 } 240 if (error) 241 goto out; 242 243 error = dlm_recover_members_wait(ls); 244 out: 245 log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error); 246 return error; 247 } 248 249 /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before 250 dlm_ls_start() is called on any of them to start the new recovery. */ 251 252 int dlm_ls_stop(struct dlm_ls *ls) 253 { 254 int new; 255 256 /* 257 * Prevent dlm_recv from being in the middle of something when we do 258 * the stop. This includes ensuring dlm_recv isn't processing a 259 * recovery message (rcom), while dlm_recoverd is aborting and 260 * resetting things from an in-progress recovery. i.e. we want 261 * dlm_recoverd to abort its recovery without worrying about dlm_recv 262 * processing an rcom at the same time. Stopping dlm_recv also makes 263 * it easy for dlm_receive_message() to check locking stopped and add a 264 * message to the requestqueue without races. 265 */ 266 267 down_write(&ls->ls_recv_active); 268 269 /* 270 * Abort any recovery that's in progress (see RECOVERY_STOP, 271 * dlm_recovery_stopped()) and tell any other threads running in the 272 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()). 273 */ 274 275 spin_lock(&ls->ls_recover_lock); 276 set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); 277 new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); 278 ls->ls_recover_seq++; 279 spin_unlock(&ls->ls_recover_lock); 280 281 /* 282 * Let dlm_recv run again, now any normal messages will be saved on the 283 * requestqueue for later. 284 */ 285 286 up_write(&ls->ls_recv_active); 287 288 /* 289 * This in_recovery lock does two things: 290 * 1) Keeps this function from returning until all threads are out 291 * of locking routines and locking is truely stopped. 292 * 2) Keeps any new requests from being processed until it's unlocked 293 * when recovery is complete. 294 */ 295 296 if (new) 297 down_write(&ls->ls_in_recovery); 298 299 /* 300 * The recoverd suspend/resume makes sure that dlm_recoverd (if 301 * running) has noticed RECOVERY_STOP above and quit processing the 302 * previous recovery. 303 */ 304 305 dlm_recoverd_suspend(ls); 306 ls->ls_recover_status = 0; 307 dlm_recoverd_resume(ls); 308 309 if (!ls->ls_recover_begin) 310 ls->ls_recover_begin = jiffies; 311 return 0; 312 } 313 314 int dlm_ls_start(struct dlm_ls *ls) 315 { 316 struct dlm_recover *rv = NULL, *rv_old; 317 int *ids = NULL; 318 int error, count; 319 320 rv = kzalloc(sizeof(struct dlm_recover), GFP_KERNEL); 321 if (!rv) 322 return -ENOMEM; 323 324 error = count = dlm_nodeid_list(ls->ls_name, &ids); 325 if (error <= 0) 326 goto fail; 327 328 spin_lock(&ls->ls_recover_lock); 329 330 /* the lockspace needs to be stopped before it can be started */ 331 332 if (!dlm_locking_stopped(ls)) { 333 spin_unlock(&ls->ls_recover_lock); 334 log_error(ls, "start ignored: lockspace running"); 335 error = -EINVAL; 336 goto fail; 337 } 338 339 rv->nodeids = ids; 340 rv->node_count = count; 341 rv->seq = ++ls->ls_recover_seq; 342 rv_old = ls->ls_recover_args; 343 ls->ls_recover_args = rv; 344 spin_unlock(&ls->ls_recover_lock); 345 346 if (rv_old) { 347 kfree(rv_old->nodeids); 348 kfree(rv_old); 349 } 350 351 dlm_recoverd_kick(ls); 352 return 0; 353 354 fail: 355 kfree(rv); 356 kfree(ids); 357 return error; 358 } 359 360