1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) 2005 Red Hat, Inc. All rights reserved. 5 ** 6 ** This copyrighted material is made available to anyone wishing to use, 7 ** modify, copy, or redistribute it subject to the terms and conditions 8 ** of the GNU General Public License v.2. 9 ** 10 ******************************************************************************* 11 ******************************************************************************/ 12 13 #include "dlm_internal.h" 14 #include "lockspace.h" 15 #include "member.h" 16 #include "recoverd.h" 17 #include "recover.h" 18 #include "rcom.h" 19 #include "config.h" 20 21 /* 22 * Following called by dlm_recoverd thread 23 */ 24 25 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new) 26 { 27 struct dlm_member *memb = NULL; 28 struct list_head *tmp; 29 struct list_head *newlist = &new->list; 30 struct list_head *head = &ls->ls_nodes; 31 32 list_for_each(tmp, head) { 33 memb = list_entry(tmp, struct dlm_member, list); 34 if (new->nodeid < memb->nodeid) 35 break; 36 } 37 38 if (!memb) 39 list_add_tail(newlist, head); 40 else { 41 /* FIXME: can use list macro here */ 42 newlist->prev = tmp->prev; 43 newlist->next = tmp; 44 tmp->prev->next = newlist; 45 tmp->prev = newlist; 46 } 47 } 48 49 static int dlm_add_member(struct dlm_ls *ls, int nodeid) 50 { 51 struct dlm_member *memb; 52 int w; 53 54 memb = kzalloc(sizeof(struct dlm_member), GFP_KERNEL); 55 if (!memb) 56 return -ENOMEM; 57 58 w = dlm_node_weight(ls->ls_name, nodeid); 59 if (w < 0) 60 return w; 61 62 memb->nodeid = nodeid; 63 memb->weight = w; 64 add_ordered_member(ls, memb); 65 ls->ls_num_nodes++; 66 return 0; 67 } 68 69 static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb) 70 { 71 list_move(&memb->list, &ls->ls_nodes_gone); 72 ls->ls_num_nodes--; 73 } 74 75 static int dlm_is_member(struct dlm_ls *ls, int nodeid) 76 { 77 struct dlm_member *memb; 78 79 list_for_each_entry(memb, &ls->ls_nodes, list) { 80 if (memb->nodeid == nodeid) 81 return 1; 82 } 83 return 0; 84 } 85 86 int dlm_is_removed(struct dlm_ls *ls, int nodeid) 87 { 88 struct dlm_member *memb; 89 90 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 91 if (memb->nodeid == nodeid) 92 return 1; 93 } 94 return 0; 95 } 96 97 static void clear_memb_list(struct list_head *head) 98 { 99 struct dlm_member *memb; 100 101 while (!list_empty(head)) { 102 memb = list_entry(head->next, struct dlm_member, list); 103 list_del(&memb->list); 104 kfree(memb); 105 } 106 } 107 108 void dlm_clear_members(struct dlm_ls *ls) 109 { 110 clear_memb_list(&ls->ls_nodes); 111 ls->ls_num_nodes = 0; 112 } 113 114 void dlm_clear_members_gone(struct dlm_ls *ls) 115 { 116 clear_memb_list(&ls->ls_nodes_gone); 117 } 118 119 static void make_member_array(struct dlm_ls *ls) 120 { 121 struct dlm_member *memb; 122 int i, w, x = 0, total = 0, all_zero = 0, *array; 123 124 kfree(ls->ls_node_array); 125 ls->ls_node_array = NULL; 126 127 list_for_each_entry(memb, &ls->ls_nodes, list) { 128 if (memb->weight) 129 total += memb->weight; 130 } 131 132 /* all nodes revert to weight of 1 if all have weight 0 */ 133 134 if (!total) { 135 total = ls->ls_num_nodes; 136 all_zero = 1; 137 } 138 139 ls->ls_total_weight = total; 140 141 array = kmalloc(sizeof(int) * total, GFP_KERNEL); 142 if (!array) 143 return; 144 145 list_for_each_entry(memb, &ls->ls_nodes, list) { 146 if (!all_zero && !memb->weight) 147 continue; 148 149 if (all_zero) 150 w = 1; 151 else 152 w = memb->weight; 153 154 DLM_ASSERT(x < total, printk("total %d x %d\n", total, x);); 155 156 for (i = 0; i < w; i++) 157 array[x++] = memb->nodeid; 158 } 159 160 ls->ls_node_array = array; 161 } 162 163 /* send a status request to all members just to establish comms connections */ 164 165 static int ping_members(struct dlm_ls *ls) 166 { 167 struct dlm_member *memb; 168 int error = 0; 169 170 list_for_each_entry(memb, &ls->ls_nodes, list) { 171 error = dlm_recovery_stopped(ls); 172 if (error) 173 break; 174 error = dlm_rcom_status(ls, memb->nodeid); 175 if (error) 176 break; 177 } 178 if (error) 179 log_debug(ls, "ping_members aborted %d last nodeid %d", 180 error, ls->ls_recover_nodeid); 181 return error; 182 } 183 184 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out) 185 { 186 struct dlm_member *memb, *safe; 187 int i, error, found, pos = 0, neg = 0, low = -1; 188 189 /* previously removed members that we've not finished removing need to 190 count as a negative change so the "neg" recovery steps will happen */ 191 192 list_for_each_entry(memb, &ls->ls_nodes_gone, list) { 193 log_debug(ls, "prev removed member %d", memb->nodeid); 194 neg++; 195 } 196 197 /* move departed members from ls_nodes to ls_nodes_gone */ 198 199 list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) { 200 found = 0; 201 for (i = 0; i < rv->node_count; i++) { 202 if (memb->nodeid == rv->nodeids[i]) { 203 found = 1; 204 break; 205 } 206 } 207 208 if (!found) { 209 neg++; 210 dlm_remove_member(ls, memb); 211 log_debug(ls, "remove member %d", memb->nodeid); 212 } 213 } 214 215 /* add new members to ls_nodes */ 216 217 for (i = 0; i < rv->node_count; i++) { 218 if (dlm_is_member(ls, rv->nodeids[i])) 219 continue; 220 dlm_add_member(ls, rv->nodeids[i]); 221 pos++; 222 log_debug(ls, "add member %d", rv->nodeids[i]); 223 } 224 225 list_for_each_entry(memb, &ls->ls_nodes, list) { 226 if (low == -1 || memb->nodeid < low) 227 low = memb->nodeid; 228 } 229 ls->ls_low_nodeid = low; 230 231 make_member_array(ls); 232 dlm_set_recover_status(ls, DLM_RS_NODES); 233 *neg_out = neg; 234 235 error = ping_members(ls); 236 if (error) 237 goto out; 238 239 error = dlm_recover_members_wait(ls); 240 out: 241 log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error); 242 return error; 243 } 244 245 /* 246 * Following called from lockspace.c 247 */ 248 249 int dlm_ls_stop(struct dlm_ls *ls) 250 { 251 int new; 252 253 /* 254 * A stop cancels any recovery that's in progress (see RECOVERY_STOP, 255 * dlm_recovery_stopped()) and prevents any new locks from being 256 * processed (see RUNNING, dlm_locking_stopped()). 257 */ 258 259 spin_lock(&ls->ls_recover_lock); 260 set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); 261 new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); 262 ls->ls_recover_seq++; 263 spin_unlock(&ls->ls_recover_lock); 264 265 /* 266 * This in_recovery lock does two things: 267 * 268 * 1) Keeps this function from returning until all threads are out 269 * of locking routines and locking is truely stopped. 270 * 2) Keeps any new requests from being processed until it's unlocked 271 * when recovery is complete. 272 */ 273 274 if (new) 275 down_write(&ls->ls_in_recovery); 276 277 /* 278 * The recoverd suspend/resume makes sure that dlm_recoverd (if 279 * running) has noticed the clearing of RUNNING above and quit 280 * processing the previous recovery. This will be true for all nodes 281 * before any nodes start the new recovery. 282 */ 283 284 dlm_recoverd_suspend(ls); 285 ls->ls_recover_status = 0; 286 dlm_recoverd_resume(ls); 287 return 0; 288 } 289 290 int dlm_ls_start(struct dlm_ls *ls) 291 { 292 struct dlm_recover *rv = NULL, *rv_old; 293 int *ids = NULL; 294 int error, count; 295 296 rv = kzalloc(sizeof(struct dlm_recover), GFP_KERNEL); 297 if (!rv) 298 return -ENOMEM; 299 300 error = count = dlm_nodeid_list(ls->ls_name, &ids); 301 if (error <= 0) 302 goto fail; 303 304 spin_lock(&ls->ls_recover_lock); 305 306 /* the lockspace needs to be stopped before it can be started */ 307 308 if (!dlm_locking_stopped(ls)) { 309 spin_unlock(&ls->ls_recover_lock); 310 log_error(ls, "start ignored: lockspace running"); 311 error = -EINVAL; 312 goto fail; 313 } 314 315 rv->nodeids = ids; 316 rv->node_count = count; 317 rv->seq = ++ls->ls_recover_seq; 318 rv_old = ls->ls_recover_args; 319 ls->ls_recover_args = rv; 320 spin_unlock(&ls->ls_recover_lock); 321 322 if (rv_old) { 323 kfree(rv_old->nodeids); 324 kfree(rv_old); 325 } 326 327 dlm_recoverd_kick(ls); 328 return 0; 329 330 fail: 331 kfree(rv); 332 kfree(ids); 333 return error; 334 } 335 336