1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 #include "dlm_internal.h" 13 #include "lockspace.h" 14 #include "member.h" 15 #include "lowcomms.h" 16 #include "rcom.h" 17 #include "config.h" 18 #include "memory.h" 19 #include "recover.h" 20 #include "util.h" 21 #include "lock.h" 22 #include "dir.h" 23 24 /* 25 * We use the upper 16 bits of the hash value to select the directory node. 26 * Low bits are used for distribution of rsb's among hash buckets on each node. 27 * 28 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of 29 * num_nodes to the hash value. This value in the desired range is used as an 30 * offset into the sorted list of nodeid's to give the particular nodeid. 31 */ 32 33 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) 34 { 35 uint32_t node; 36 37 if (ls->ls_num_nodes == 1) 38 return dlm_our_nodeid(); 39 else { 40 node = (hash >> 16) % ls->ls_total_weight; 41 return ls->ls_node_array[node]; 42 } 43 } 44 45 int dlm_dir_nodeid(struct dlm_rsb *r) 46 { 47 return r->res_dir_nodeid; 48 } 49 50 void dlm_recover_dir_nodeid(struct dlm_ls *ls) 51 { 52 struct dlm_rsb *r; 53 54 down_read(&ls->ls_root_sem); 55 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 56 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash); 57 } 58 up_read(&ls->ls_root_sem); 59 } 60 61 int dlm_recover_directory(struct dlm_ls *ls, uint64_t seq) 62 { 63 struct dlm_member *memb; 64 char *b, *last_name = NULL; 65 int error = -ENOMEM, last_len, nodeid, result; 66 uint16_t namelen; 67 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0; 68 69 log_rinfo(ls, "dlm_recover_directory"); 70 71 if (dlm_no_directory(ls)) 72 goto out_status; 73 74 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS); 75 if (!last_name) 76 goto out; 77 78 list_for_each_entry(memb, &ls->ls_nodes, list) { 79 if (memb->nodeid == dlm_our_nodeid()) 80 continue; 81 82 memset(last_name, 0, DLM_RESNAME_MAXLEN); 83 last_len = 0; 84 85 for (;;) { 86 int left; 87 if (dlm_recovery_stopped(ls)) { 88 error = -EINTR; 89 goto out_free; 90 } 91 92 error = dlm_rcom_names(ls, memb->nodeid, 93 last_name, last_len, seq); 94 if (error) 95 goto out_free; 96 97 cond_resched(); 98 99 /* 100 * pick namelen/name pairs out of received buffer 101 */ 102 103 b = ls->ls_recover_buf->rc_buf; 104 left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length); 105 left -= sizeof(struct dlm_rcom); 106 107 for (;;) { 108 __be16 v; 109 110 error = -EINVAL; 111 if (left < sizeof(__be16)) 112 goto out_free; 113 114 memcpy(&v, b, sizeof(__be16)); 115 namelen = be16_to_cpu(v); 116 b += sizeof(__be16); 117 left -= sizeof(__be16); 118 119 /* namelen of 0xFFFFF marks end of names for 120 this node; namelen of 0 marks end of the 121 buffer */ 122 123 if (namelen == 0xFFFF) 124 goto done; 125 if (!namelen) 126 break; 127 128 if (namelen > left) 129 goto out_free; 130 131 if (namelen > DLM_RESNAME_MAXLEN) 132 goto out_free; 133 134 error = dlm_master_lookup(ls, memb->nodeid, 135 b, namelen, 136 DLM_LU_RECOVER_DIR, 137 &nodeid, &result); 138 if (error) { 139 log_error(ls, "recover_dir lookup %d", 140 error); 141 goto out_free; 142 } 143 144 /* The name was found in rsbtbl, but the 145 * master nodeid is different from 146 * memb->nodeid which says it is the master. 147 * This should not happen. */ 148 149 if (result == DLM_LU_MATCH && 150 nodeid != memb->nodeid) { 151 count_bad++; 152 log_error(ls, "recover_dir lookup %d " 153 "nodeid %d memb %d bad %u", 154 result, nodeid, memb->nodeid, 155 count_bad); 156 print_hex_dump_bytes("dlm_recover_dir ", 157 DUMP_PREFIX_NONE, 158 b, namelen); 159 } 160 161 /* The name was found in rsbtbl, and the 162 * master nodeid matches memb->nodeid. */ 163 164 if (result == DLM_LU_MATCH && 165 nodeid == memb->nodeid) { 166 count_match++; 167 } 168 169 /* The name was not found in rsbtbl and was 170 * added with memb->nodeid as the master. */ 171 172 if (result == DLM_LU_ADD) { 173 count_add++; 174 } 175 176 last_len = namelen; 177 memcpy(last_name, b, namelen); 178 b += namelen; 179 left -= namelen; 180 count++; 181 } 182 } 183 done: 184 ; 185 } 186 187 out_status: 188 error = 0; 189 dlm_set_recover_status(ls, DLM_RS_DIR); 190 191 log_rinfo(ls, "dlm_recover_directory %u in %u new", 192 count, count_add); 193 out_free: 194 kfree(last_name); 195 out: 196 return error; 197 } 198 199 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name, 200 int len) 201 { 202 struct dlm_rsb *r; 203 uint32_t hash, bucket; 204 int rv; 205 206 hash = jhash(name, len, 0); 207 bucket = hash & (ls->ls_rsbtbl_size - 1); 208 209 spin_lock(&ls->ls_rsbtbl[bucket].lock); 210 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r); 211 if (rv) 212 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss, 213 name, len, &r); 214 spin_unlock(&ls->ls_rsbtbl[bucket].lock); 215 216 if (!rv) 217 return r; 218 219 down_read(&ls->ls_root_sem); 220 list_for_each_entry(r, &ls->ls_root_list, res_root_list) { 221 if (len == r->res_length && !memcmp(name, r->res_name, len)) { 222 up_read(&ls->ls_root_sem); 223 log_debug(ls, "find_rsb_root revert to root_list %s", 224 r->res_name); 225 return r; 226 } 227 } 228 up_read(&ls->ls_root_sem); 229 return NULL; 230 } 231 232 /* Find the rsb where we left off (or start again), then send rsb names 233 for rsb's we're master of and whose directory node matches the requesting 234 node. inbuf is the rsb name last sent, inlen is the name's length */ 235 236 void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen, 237 char *outbuf, int outlen, int nodeid) 238 { 239 struct list_head *list; 240 struct dlm_rsb *r; 241 int offset = 0, dir_nodeid; 242 __be16 be_namelen; 243 244 down_read(&ls->ls_root_sem); 245 246 if (inlen > 1) { 247 r = find_rsb_root(ls, inbuf, inlen); 248 if (!r) { 249 log_error(ls, "copy_master_names from %d start %d %.*s", 250 nodeid, inlen, inlen, inbuf); 251 goto out; 252 } 253 list = r->res_root_list.next; 254 } else { 255 list = ls->ls_root_list.next; 256 } 257 258 for (offset = 0; list != &ls->ls_root_list; list = list->next) { 259 r = list_entry(list, struct dlm_rsb, res_root_list); 260 if (r->res_nodeid) 261 continue; 262 263 dir_nodeid = dlm_dir_nodeid(r); 264 if (dir_nodeid != nodeid) 265 continue; 266 267 /* 268 * The block ends when we can't fit the following in the 269 * remaining buffer space: 270 * namelen (uint16_t) + 271 * name (r->res_length) + 272 * end-of-block record 0x0000 (uint16_t) 273 */ 274 275 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) { 276 /* Write end-of-block record */ 277 be_namelen = cpu_to_be16(0); 278 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 279 offset += sizeof(__be16); 280 ls->ls_recover_dir_sent_msg++; 281 goto out; 282 } 283 284 be_namelen = cpu_to_be16(r->res_length); 285 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 286 offset += sizeof(__be16); 287 memcpy(outbuf + offset, r->res_name, r->res_length); 288 offset += r->res_length; 289 ls->ls_recover_dir_sent_res++; 290 } 291 292 /* 293 * If we've reached the end of the list (and there's room) write a 294 * terminating record. 295 */ 296 297 if ((list == &ls->ls_root_list) && 298 (offset + sizeof(uint16_t) <= outlen)) { 299 be_namelen = cpu_to_be16(0xFFFF); 300 memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); 301 offset += sizeof(__be16); 302 ls->ls_recover_dir_sent_msg++; 303 } 304 out: 305 up_read(&ls->ls_root_sem); 306 } 307 308