1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. 5 ** 6 ** This copyrighted material is made available to anyone wishing to use, 7 ** modify, copy, or redistribute it subject to the terms and conditions 8 ** of the GNU General Public License v.2. 9 ** 10 ******************************************************************************* 11 ******************************************************************************/ 12 13 #include "dlm_internal.h" 14 #include "lockspace.h" 15 #include "member.h" 16 #include "recoverd.h" 17 #include "recover.h" 18 #include "rcom.h" 19 #include "config.h" 20 21 /* 22 * Following called by dlm_recoverd thread 23 */ 24 25 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) 26 { 27 struct dlm_member *memb = NULL; 28 struct list_head *tmp; 29 struct list_head *newlist = &new->list; 30 struct list_head *head = &ls->ls_nodes; 31 32 list_for_each(tmp, head) { 33 memb = list_entry(tmp, struct dlm_member, list); 34 if (new->nodeid < memb->nodeid) 35 break; 36 } 37 38 if (!memb) 39 list_add_tail(newlist, head); 40 else { 41 /* FIXME: can use list macro here */ 42 newlist->prev = tmp->prev; 43 newlist->next = tmp; 44 tmp->prev->next = newlist; 45 tmp->prev = newlist; 46 } 47 } 48 49 static int dlm_add_member(struct dlm_ls *ls, int nodeid) 50 { 51 struct dlm_member *memb; 52 int w; 53 54 memb = kzalloc(sizeof(struct dlm_member), GFP_KERNEL); 55 if (!memb) 56 return -ENOMEM; 57 58 w = dlm_node_weight(ls->ls_name, nodeid); 59 if (w < 0) 60 return w; 61 62 memb->nodeid = nodeid; 63 memb->weight = w; 64 add_ordered_member(ls, memb); 65 ls->ls_num_nodes++; 66 return 0; 67 } 68 69 static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb) 70 { 71 list_move(&memb->list, &ls->ls_nodes_gone); 72 ls->ls_num_nodes--; 73 } 74 75 static int dlm_is_member(struct dlm_ls *ls, int nodeid) 76 { 77 struct dlm_member *memb; 78 79 list_for_each_entry(memb, &ls->ls_nodes, list) { 80 if (memb->nodeid == nodeid) 81 return 1; 82 } 83 return 0; 84 } 85 86 int dlm_is_removed(struct dlm_ls *ls, int nodeid) 87 { 88 struct dlm_member *memb; 89 90 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 91 if (memb->nodeid == nodeid) 92 return 1; 93 } 94 return 0; 95 } 96 97 static void clear_memb_list(struct list_head *head) 98 { 99 struct dlm_member *memb; 100 101 while (!list_empty(head)) { 102 memb = list_entry(head->next, struct dlm_member, list); 103 list_del(&memb->list); 104 kfree(memb); 105 } 106 } 107 108 void dlm_clear_members(struct dlm_ls *ls) 109 { 110 clear_memb_list(&ls->ls_nodes); 111 ls->ls_num_nodes = 0; 112 } 113 114 void dlm_clear_members_gone(struct dlm_ls *ls) 115 { 116 clear_memb_list(&ls->ls_nodes_gone); 117 } 118 119 static void make_member_array(struct dlm_ls *ls) 120 { 121 struct dlm_member *memb; 122 int i, w, x = 0, total = 0, all_zero = 0, *array; 123 124 kfree(ls->ls_node_array); 125 ls->ls_node_array = NULL; 126 127 list_for_each_entry(memb, &ls->ls_nodes, list) { 128 if (memb->weight) 129 total += memb->weight; 130 } 131 132 /* all nodes revert to weight of 1 if all have weight 0 */ 133 134 if (!total) { 135 total = ls->ls_num_nodes; 136 all_zero = 1; 137 } 138 139 ls->ls_total_weight = total; 140 141 array = kmalloc(sizeof(int) * total, GFP_KERNEL); 142 if (!array) 143 return; 144 145 list_for_each_entry(memb, &ls->ls_nodes, list) { 146 if (!all_zero && !memb->weight) 147 continue; 148 149 if (all_zero) 150 w = 1; 151 else 152 w = memb->weight; 153 154 DLM_ASSERT(x < total, printk("total %d x %d\n", total, x);); 155 156 for (i = 0; i < w; i++) 157 array[x++] = memb->nodeid; 158 } 159 160 ls->ls_node_array = array; 161 } 162 163 /* send a status request to all members just to establish comms connections */ 164 165 static int ping_members(struct dlm_ls *ls) 166 { 167 struct dlm_member *memb; 168 int error = 0; 169 170 list_for_each_entry(memb, &ls->ls_nodes, list) { 171 error = dlm_recovery_stopped(ls); 172 if (error) 173 break; 174 error = dlm_rcom_status(ls, memb->nodeid); 175 if (error) 176 break; 177 } 178 if (error) 179 log_debug(ls, "ping_members aborted %d last nodeid %d", 180 error, ls->ls_recover_nodeid); 181 return error; 182 } 183 184 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) 185 { 186 struct dlm_member *memb, *safe; 187 int i, error, found, pos = 0, neg = 0, low = -1; 188 189 /* previously removed members that we've not finished removing need to 190 count as a negative change so the "neg" recovery steps will happen */ 191 192 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 193 log_debug(ls, "prev removed member %d", memb->nodeid); 194 neg++; 195 } 196 197 /* move departed members from ls_nodes to ls_nodes_gone */ 198 199 list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) { 200 found = 0; 201 for (i = 0; i < rv->node_count; i++) { 202 if (memb->nodeid == rv->nodeids[i]) { 203 found = 1; 204 break; 205 } 206 } 207 208 if (!found) { 209 neg++; 210 dlm_remove_member(ls, memb); 211 log_debug(ls, "remove member %d", memb->nodeid); 212 } 213 } 214 215 /* add new members to ls_nodes */ 216 217 for (i = 0; i < rv->node_count; i++) { 218 if (dlm_is_member(ls, rv->nodeids[i])) 219 continue; 220 dlm_add_member(ls, rv->nodeids[i]); 221 pos++; 222 log_debug(ls, "add member %d", rv->nodeids[i]); 223 } 224 225 list_for_each_entry(memb, &ls->ls_nodes, list) { 226 if (low == -1 || memb->nodeid < low) 227 low = memb->nodeid; 228 } 229 ls->ls_low_nodeid = low; 230 231 make_member_array(ls); 232 dlm_set_recover_status(ls, DLM_RS_NODES); 233 *neg_out = neg; 234 235 error = ping_members(ls); 236 if (!error || error == -EPROTO) { 237 /* new_lockspace() may be waiting to know if the config 238 is good or bad */ 239 ls->ls_members_result = error; 240 complete(&ls->ls_members_done); 241 } 242 if (error) 243 goto out; 244 245 error = dlm_recover_members_wait(ls); 246 out: 247 log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error); 248 return error; 249 } 250 251 /* 252 * Following called from lockspace.c 253 */ 254 255 int dlm_ls_stop(struct dlm_ls *ls) 256 { 257 int new; 258 259 /* 260 * A stop cancels any recovery that's in progress (see RECOVERY_STOP, 261 * dlm_recovery_stopped()) and prevents any new locks from being 262 * processed (see RUNNING, dlm_locking_stopped()). 263 */ 264 265 spin_lock(&ls->ls_recover_lock); 266 set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); 267 new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); 268 ls->ls_recover_seq++; 269 spin_unlock(&ls->ls_recover_lock); 270 271 /* 272 * This in_recovery lock does two things: 273 * 274 * 1) Keeps this function from returning until all threads are out 275 * of locking routines and locking is truely stopped. 276 * 2) Keeps any new requests from being processed until it's unlocked 277 * when recovery is complete. 278 */ 279 280 if (new) 281 down_write(&ls->ls_in_recovery); 282 283 /* 284 * The recoverd suspend/resume makes sure that dlm_recoverd (if 285 * running) has noticed the clearing of RUNNING above and quit 286 * processing the previous recovery. This will be true for all nodes 287 * before any nodes start the new recovery. 288 */ 289 290 dlm_recoverd_suspend(ls); 291 ls->ls_recover_status = 0; 292 dlm_recoverd_resume(ls); 293 294 if (!ls->ls_recover_begin) 295 ls->ls_recover_begin = jiffies; 296 return 0; 297 } 298 299 int dlm_ls_start(struct dlm_ls *ls) 300 { 301 struct dlm_recover *rv = NULL, *rv_old; 302 int *ids = NULL; 303 int error, count; 304 305 rv = kzalloc(sizeof(struct dlm_recover), GFP_KERNEL); 306 if (!rv) 307 return -ENOMEM; 308 309 error = count = dlm_nodeid_list(ls->ls_name, &ids); 310 if (error <= 0) 311 goto fail; 312 313 spin_lock(&ls->ls_recover_lock); 314 315 /* the lockspace needs to be stopped before it can be started */ 316 317 if (!dlm_locking_stopped(ls)) { 318 spin_unlock(&ls->ls_recover_lock); 319 log_error(ls, "start ignored: lockspace running"); 320 error = -EINVAL; 321 goto fail; 322 } 323 324 rv->nodeids = ids; 325 rv->node_count = count; 326 rv->seq = ++ls->ls_recover_seq; 327 rv_old = ls->ls_recover_args; 328 ls->ls_recover_args = rv; 329 spin_unlock(&ls->ls_recover_lock); 330 331 if (rv_old) { 332 kfree(rv_old->nodeids); 333 kfree(rv_old); 334 } 335 336 dlm_recoverd_kick(ls); 337 return 0; 338 339 fail: 340 kfree(rv); 341 kfree(ids); 342 return error; 343 } 344 345