1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 6 ** 7 ** This copyrighted material is made available to anyone wishing to use, 8 ** modify, copy, or redistribute it subject to the terms and conditions 9 ** of the GNU General Public License v.2. 10 ** 11 ******************************************************************************* 12 ******************************************************************************/ 13 14 #include "dlm_internal.h" 15 #include "lockspace.h" 16 #include "member.h" 17 #include "lowcomms.h" 18 #include "rcom.h" 19 #include "config.h" 20 #include "memory.h" 21 #include "recover.h" 22 #include "util.h" 23 #include "lock.h" 24 #include "dir.h" 25 26 27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de) 28 { 29 spin_lock(&ls->ls_recover_list_lock); 30 list_add(&de->list, &ls->ls_recover_list); 31 spin_unlock(&ls->ls_recover_list_lock); 32 } 33 34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len) 35 { 36 int found = 0; 37 struct dlm_direntry *de; 38 39 spin_lock(&ls->ls_recover_list_lock); 40 list_for_each_entry(de, &ls->ls_recover_list, list) { 41 if (de->length == len) { 42 list_del(&de->list); 43 de->master_nodeid = 0; 44 memset(de->name, 0, len); 45 found = 1; 46 break; 47 } 48 } 49 spin_unlock(&ls->ls_recover_list_lock); 50 51 if (!found) 52 de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_KERNEL); 53 return de; 54 } 55 56 void dlm_clear_free_entries(struct dlm_ls *ls) 57 { 58 struct dlm_direntry *de; 59 60 spin_lock(&ls->ls_recover_list_lock); 61 while (!list_empty(&ls->ls_recover_list)) { 62 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry, 63 list); 64 list_del(&de->list); 65 kfree(de); 66 } 67 spin_unlock(&ls->ls_recover_list_lock); 68 } 69 70 /* 71 * We use the upper 16 bits of the hash value to select the directory node. 72 * Low bits are used for distribution of rsb's among hash buckets on each node. 73 * 74 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of 75 * num_nodes to the hash value. This value in the desired range is used as an 76 * offset into the sorted list of nodeid's to give the particular nodeid. 77 */ 78 79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) 80 { 81 struct list_head *tmp; 82 struct dlm_member *memb = NULL; 83 uint32_t node, n = 0; 84 int nodeid; 85 86 if (ls->ls_num_nodes == 1) { 87 nodeid = dlm_our_nodeid(); 88 goto out; 89 } 90 91 if (ls->ls_node_array) { 92 node = (hash >> 16) % ls->ls_total_weight; 93 nodeid = ls->ls_node_array[node]; 94 goto out; 95 } 96 97 /* make_member_array() failed to kmalloc ls_node_array... */ 98 99 node = (hash >> 16) % ls->ls_num_nodes; 100 101 list_for_each(tmp, &ls->ls_nodes) { 102 if (n++ != node) 103 continue; 104 memb = list_entry(tmp, struct dlm_member, list); 105 break; 106 } 107 108 DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n", 109 ls->ls_num_nodes, n, node);); 110 nodeid = memb->nodeid; 111 out: 112 return nodeid; 113 } 114 115 int dlm_dir_nodeid(struct dlm_rsb *r) 116 { 117 return dlm_hash2nodeid(r->res_ls, r->res_hash); 118 } 119 120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len) 121 { 122 uint32_t val; 123 124 val = jhash(name, len, 0); 125 val &= (ls->ls_dirtbl_size - 1); 126 127 return val; 128 } 129 130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de) 131 { 132 uint32_t bucket; 133 134 bucket = dir_hash(ls, de->name, de->length); 135 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); 136 } 137 138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name, 139 int namelen, uint32_t bucket) 140 { 141 struct dlm_direntry *de; 142 143 list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) { 144 if (de->length == namelen && !memcmp(name, de->name, namelen)) 145 goto out; 146 } 147 de = NULL; 148 out: 149 return de; 150 } 151 152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen) 153 { 154 struct dlm_direntry *de; 155 uint32_t bucket; 156 157 bucket = dir_hash(ls, name, namelen); 158 159 spin_lock(&ls->ls_dirtbl[bucket].lock); 160 161 de = search_bucket(ls, name, namelen, bucket); 162 163 if (!de) { 164 log_error(ls, "remove fr %u none", nodeid); 165 goto out; 166 } 167 168 if (de->master_nodeid != nodeid) { 169 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid); 170 goto out; 171 } 172 173 list_del(&de->list); 174 kfree(de); 175 out: 176 spin_unlock(&ls->ls_dirtbl[bucket].lock); 177 } 178 179 void dlm_dir_clear(struct dlm_ls *ls) 180 { 181 struct list_head *head; 182 struct dlm_direntry *de; 183 int i; 184 185 DLM_ASSERT(list_empty(&ls->ls_recover_list), ); 186 187 for (i = 0; i < ls->ls_dirtbl_size; i++) { 188 spin_lock(&ls->ls_dirtbl[i].lock); 189 head = &ls->ls_dirtbl[i].list; 190 while (!list_empty(head)) { 191 de = list_entry(head->next, struct dlm_direntry, list); 192 list_del(&de->list); 193 put_free_de(ls, de); 194 } 195 spin_unlock(&ls->ls_dirtbl[i].lock); 196 } 197 } 198 199 int dlm_recover_directory(struct dlm_ls *ls) 200 { 201 struct dlm_member *memb; 202 struct dlm_direntry *de; 203 char *b, *last_name = NULL; 204 int error = -ENOMEM, last_len, count = 0; 205 uint16_t namelen; 206 207 log_debug(ls, "dlm_recover_directory"); 208 209 if (dlm_no_directory(ls)) 210 goto out_status; 211 212 dlm_dir_clear(ls); 213 214 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_KERNEL); 215 if (!last_name) 216 goto out; 217 218 list_for_each_entry(memb, &ls->ls_nodes, list) { 219 memset(last_name, 0, DLM_RESNAME_MAXLEN); 220 last_len = 0; 221 222 for (;;) { 223 int left; 224 error = dlm_recovery_stopped(ls); 225 if (error) 226 goto out_free; 227 228 error = dlm_rcom_names(ls, memb->nodeid, 229 last_name, last_len); 230 if (error) 231 goto out_free; 232 233 schedule(); 234 235 /* 236 * pick namelen/name pairs out of received buffer 237 */ 238 239 b = ls->ls_recover_buf->rc_buf; 240 left = ls->ls_recover_buf->rc_header.h_length; 241 left -= sizeof(struct dlm_rcom); 242 243 for (;;) { 244 __be16 v; 245 246 error = -EINVAL; 247 if (left < sizeof(__be16)) 248 goto out_free; 249 250 memcpy(&v, b, sizeof(__be16)); 251 namelen = be16_to_cpu(v); 252 b += sizeof(__be16); 253 left -= sizeof(__be16); 254 255 /* namelen of 0xFFFFF marks end of names for 256 this node; namelen of 0 marks end of the 257 buffer */ 258 259 if (namelen == 0xFFFF) 260 goto done; 261 if (!namelen) 262 break; 263 264 if (namelen > left) 265 goto out_free; 266 267 if (namelen > DLM_RESNAME_MAXLEN) 268 goto out_free; 269 270 error = -ENOMEM; 271 de = get_free_de(ls, namelen); 272 if (!de) 273 goto out_free; 274 275 de->master_nodeid = memb->nodeid; 276 de->length = namelen; 277 last_len = namelen; 278 memcpy(de->name, b, namelen); 279 memcpy(last_name, b, namelen); 280 b += namelen; 281 left -= namelen; 282 283 add_entry_to_hash(ls, de); 284 count++; 285 } 286 } 287 done: 288 ; 289 } 290 291 out_status: 292 error = 0; 293 dlm_set_recover_status(ls, DLM_RS_DIR); 294 log_debug(ls, "dlm_recover_directory %d entries", count); 295 out_free: 296 kfree(last_name); 297 out: 298 dlm_clear_free_entries(ls); 299 return error; 300 } 301 302 static int get_entry(struct dlm_ls *ls, int nodeid, char *name, 303 int namelen, int *r_nodeid) 304 { 305 struct dlm_direntry *de, *tmp; 306 uint32_t bucket; 307 308 bucket = dir_hash(ls, name, namelen); 309 310 spin_lock(&ls->ls_dirtbl[bucket].lock); 311 de = search_bucket(ls, name, namelen, bucket); 312 if (de) { 313 *r_nodeid = de->master_nodeid; 314 spin_unlock(&ls->ls_dirtbl[bucket].lock); 315 if (*r_nodeid == nodeid) 316 return -EEXIST; 317 return 0; 318 } 319 320 spin_unlock(&ls->ls_dirtbl[bucket].lock); 321 322 if (namelen > DLM_RESNAME_MAXLEN) 323 return -EINVAL; 324 325 de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_KERNEL); 326 if (!de) 327 return -ENOMEM; 328 329 de->master_nodeid = nodeid; 330 de->length = namelen; 331 memcpy(de->name, name, namelen); 332 333 spin_lock(&ls->ls_dirtbl[bucket].lock); 334 tmp = search_bucket(ls, name, namelen, bucket); 335 if (tmp) { 336 kfree(de); 337 de = tmp; 338 } else { 339 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); 340 } 341 *r_nodeid = de->master_nodeid; 342 spin_unlock(&ls->ls_dirtbl[bucket].lock); 343 return 0; 344 } 345 346 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen, 347 int *r_nodeid) 348 { 349 return get_entry(ls, nodeid, name, namelen, r_nodeid); 350 } 351 352 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) 353 { 354 struct dlm_rsb *r; 355 356 down_read(&ls->ls_root_sem); 357 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 358 if (len == r->res_length && !memcmp(name, r->res_name, len)) { 359 up_read(&ls->ls_root_sem); 360 return r; 361 } 362 } 363 up_read(&ls->ls_root_sem); 364 return NULL; 365 } 366 367 /* Find the rsb where we left off (or start again), then send rsb names 368 for rsb's we're master of and whose directory node matches the requesting 369 node. inbuf is the rsb name last sent, inlen is the name's length */ 370 371 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, 372 char *outbuf, int outlen, int nodeid) 373 { 374 struct list_head *list; 375 struct dlm_rsb *r; 376 int offset = 0, dir_nodeid; 377 __be16 be_namelen; 378 379 down_read(&ls->ls_root_sem); 380 381 if (inlen > 1) { 382 r = find_rsb_root(ls, inbuf, inlen); 383 if (!r) { 384 inbuf[inlen - 1] = '\0'; 385 log_error(ls, "copy_master_names from %d start %d %s", 386 nodeid, inlen, inbuf); 387 goto out; 388 } 389 list = r->res_root_list.next; 390 } else { 391 list = ls->ls_root_list.next; 392 } 393 394 for (offset = 0; list != &ls->ls_root_list; list = list->next) { 395 r = list_entry(list, struct dlm_rsb, res_root_list); 396 if (r->res_nodeid) 397 continue; 398 399 dir_nodeid = dlm_dir_nodeid(r); 400 if (dir_nodeid != nodeid) 401 continue; 402 403 /* 404 * The block ends when we can't fit the following in the 405 * remaining buffer space: 406 * namelen (uint16_t) + 407 * name (r->res_length) + 408 * end-of-block record 0x0000 (uint16_t) 409 */ 410 411 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) { 412 /* Write end-of-block record */ 413 be_namelen = cpu_to_be16(0); 414 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 415 offset += sizeof(__be16); 416 goto out; 417 } 418 419 be_namelen = cpu_to_be16(r->res_length); 420 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 421 offset += sizeof(__be16); 422 memcpy(outbuf + offset, r->res_name, r->res_length); 423 offset += r->res_length; 424 } 425 426 /* 427 * If we've reached the end of the list (and there's room) write a 428 * terminating record. 429 */ 430 431 if ((list == &ls->ls_root_list) && 432 (offset + sizeof(uint16_t) <= outlen)) { 433 be_namelen = cpu_to_be16(0xFFFF); 434 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 435 offset += sizeof(__be16); 436 } 437 438 out: 439 up_read(&ls->ls_root_sem); 440 } 441 442