1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 #include "dlm_internal.h" 13 #include "lockspace.h" 14 #include "member.h" 15 #include "lowcomms.h" 16 #include "rcom.h" 17 #include "config.h" 18 #include "memory.h" 19 #include "recover.h" 20 #include "util.h" 21 #include "lock.h" 22 #include "dir.h" 23 24 /* 25 * We use the upper 16 bits of the hash value to select the directory node. 26 * Low bits are used for distribution of rsb's among hash buckets on each node. 27 * 28 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of 29 * num_nodes to the hash value. This value in the desired range is used as an 30 * offset into the sorted list of nodeid's to give the particular nodeid. 31 */ 32 33 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) 34 { 35 uint32_t node; 36 37 if (ls->ls_num_nodes == 1) 38 return dlm_our_nodeid(); 39 else { 40 node = (hash >> 16) % ls->ls_total_weight; 41 return ls->ls_node_array[node]; 42 } 43 } 44 45 int dlm_dir_nodeid(struct dlm_rsb *r) 46 { 47 return r->res_dir_nodeid; 48 } 49 50 void dlm_recover_dir_nodeid(struct dlm_ls *ls) 51 { 52 struct dlm_rsb *r; 53 54 down_read(&ls->ls_root_sem); 55 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 56 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash); 57 } 58 up_read(&ls->ls_root_sem); 59 } 60 61 int dlm_recover_directory(struct dlm_ls *ls) 62 { 63 struct dlm_member *memb; 64 char *b, *last_name = NULL; 65 int error = -ENOMEM, last_len, nodeid, result; 66 uint16_t namelen; 67 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0; 68 69 log_rinfo(ls, "dlm_recover_directory"); 70 71 if (dlm_no_directory(ls)) 72 goto out_status; 73 74 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS); 75 if (!last_name) 76 goto out; 77 78 list_for_each_entry(memb, &ls->ls_nodes, list) { 79 if (memb->nodeid == dlm_our_nodeid()) 80 continue; 81 82 memset(last_name, 0, DLM_RESNAME_MAXLEN); 83 last_len = 0; 84 85 for (;;) { 86 int left; 87 error = dlm_recovery_stopped(ls); 88 if (error) 89 goto out_free; 90 91 error = dlm_rcom_names(ls, memb->nodeid, 92 last_name, last_len); 93 if (error) 94 goto out_free; 95 96 cond_resched(); 97 98 /* 99 * pick namelen/name pairs out of received buffer 100 */ 101 102 b = ls->ls_recover_buf->rc_buf; 103 left = ls->ls_recover_buf->rc_header.h_length; 104 left -= sizeof(struct dlm_rcom); 105 106 for (;;) { 107 __be16 v; 108 109 error = -EINVAL; 110 if (left < sizeof(__be16)) 111 goto out_free; 112 113 memcpy(&v, b, sizeof(__be16)); 114 namelen = be16_to_cpu(v); 115 b += sizeof(__be16); 116 left -= sizeof(__be16); 117 118 /* namelen of 0xFFFFF marks end of names for 119 this node; namelen of 0 marks end of the 120 buffer */ 121 122 if (namelen == 0xFFFF) 123 goto done; 124 if (!namelen) 125 break; 126 127 if (namelen > left) 128 goto out_free; 129 130 if (namelen > DLM_RESNAME_MAXLEN) 131 goto out_free; 132 133 error = dlm_master_lookup(ls, memb->nodeid, 134 b, namelen, 135 DLM_LU_RECOVER_DIR, 136 &nodeid, &result); 137 if (error) { 138 log_error(ls, "recover_dir lookup %d", 139 error); 140 goto out_free; 141 } 142 143 /* The name was found in rsbtbl, but the 144 * master nodeid is different from 145 * memb->nodeid which says it is the master. 146 * This should not happen. */ 147 148 if (result == DLM_LU_MATCH && 149 nodeid != memb->nodeid) { 150 count_bad++; 151 log_error(ls, "recover_dir lookup %d " 152 "nodeid %d memb %d bad %u", 153 result, nodeid, memb->nodeid, 154 count_bad); 155 print_hex_dump_bytes("dlm_recover_dir ", 156 DUMP_PREFIX_NONE, 157 b, namelen); 158 } 159 160 /* The name was found in rsbtbl, and the 161 * master nodeid matches memb->nodeid. */ 162 163 if (result == DLM_LU_MATCH && 164 nodeid == memb->nodeid) { 165 count_match++; 166 } 167 168 /* The name was not found in rsbtbl and was 169 * added with memb->nodeid as the master. */ 170 171 if (result == DLM_LU_ADD) { 172 count_add++; 173 } 174 175 last_len = namelen; 176 memcpy(last_name, b, namelen); 177 b += namelen; 178 left -= namelen; 179 count++; 180 } 181 } 182 done: 183 ; 184 } 185 186 out_status: 187 error = 0; 188 dlm_set_recover_status(ls, DLM_RS_DIR); 189 190 log_rinfo(ls, "dlm_recover_directory %u in %u new", 191 count, count_add); 192 out_free: 193 kfree(last_name); 194 out: 195 return error; 196 } 197 198 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) 199 { 200 struct dlm_rsb *r; 201 uint32_t hash, bucket; 202 int rv; 203 204 hash = jhash(name, len, 0); 205 bucket = hash & (ls->ls_rsbtbl_size - 1); 206 207 spin_lock(&ls->ls_rsbtbl[bucket].lock); 208 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r); 209 if (rv) 210 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss, 211 name, len, &r); 212 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 213 214 if (!rv) 215 return r; 216 217 down_read(&ls->ls_root_sem); 218 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 219 if (len == r->res_length && !memcmp(name, r->res_name, len)) { 220 up_read(&ls->ls_root_sem); 221 log_debug(ls, "find_rsb_root revert to root_list %s", 222 r->res_name); 223 return r; 224 } 225 } 226 up_read(&ls->ls_root_sem); 227 return NULL; 228 } 229 230 /* Find the rsb where we left off (or start again), then send rsb names 231 for rsb's we're master of and whose directory node matches the requesting 232 node. inbuf is the rsb name last sent, inlen is the name's length */ 233 234 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, 235 char *outbuf, int outlen, int nodeid) 236 { 237 struct list_head *list; 238 struct dlm_rsb *r; 239 int offset = 0, dir_nodeid; 240 __be16 be_namelen; 241 242 down_read(&ls->ls_root_sem); 243 244 if (inlen > 1) { 245 r = find_rsb_root(ls, inbuf, inlen); 246 if (!r) { 247 inbuf[inlen - 1] = '\0'; 248 log_error(ls, "copy_master_names from %d start %d %s", 249 nodeid, inlen, inbuf); 250 goto out; 251 } 252 list = r->res_root_list.next; 253 } else { 254 list = ls->ls_root_list.next; 255 } 256 257 for (offset = 0; list != &ls->ls_root_list; list = list->next) { 258 r = list_entry(list, struct dlm_rsb, res_root_list); 259 if (r->res_nodeid) 260 continue; 261 262 dir_nodeid = dlm_dir_nodeid(r); 263 if (dir_nodeid != nodeid) 264 continue; 265 266 /* 267 * The block ends when we can't fit the following in the 268 * remaining buffer space: 269 * namelen (uint16_t) + 270 * name (r->res_length) + 271 * end-of-block record 0x0000 (uint16_t) 272 */ 273 274 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) { 275 /* Write end-of-block record */ 276 be_namelen = cpu_to_be16(0); 277 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 278 offset += sizeof(__be16); 279 ls->ls_recover_dir_sent_msg++; 280 goto out; 281 } 282 283 be_namelen = cpu_to_be16(r->res_length); 284 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 285 offset += sizeof(__be16); 286 memcpy(outbuf + offset, r->res_name, r->res_length); 287 offset += r->res_length; 288 ls->ls_recover_dir_sent_res++; 289 } 290 291 /* 292 * If we've reached the end of the list (and there's room) write a 293 * terminating record. 294 */ 295 296 if ((list == &ls->ls_root_list) && 297 (offset + sizeof(uint16_t) <= outlen)) { 298 be_namelen = cpu_to_be16(0xFFFF); 299 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 300 offset += sizeof(__be16); 301 ls->ls_recover_dir_sent_msg++; 302 } 303 out: 304 up_read(&ls->ls_root_sem); 305 } 306 307