xref: /openbmc/linux/fs/dlm/dir.c (revision e8e0929d)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25 
26 
27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28 {
29 	spin_lock(&ls->ls_recover_list_lock);
30 	list_add(&de->list, &ls->ls_recover_list);
31 	spin_unlock(&ls->ls_recover_list_lock);
32 }
33 
34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35 {
36 	int found = 0;
37 	struct dlm_direntry *de;
38 
39 	spin_lock(&ls->ls_recover_list_lock);
40 	list_for_each_entry(de, &ls->ls_recover_list, list) {
41 		if (de->length == len) {
42 			list_del(&de->list);
43 			de->master_nodeid = 0;
44 			memset(de->name, 0, len);
45 			found = 1;
46 			break;
47 		}
48 	}
49 	spin_unlock(&ls->ls_recover_list_lock);
50 
51 	if (!found)
52 		de = kzalloc(sizeof(struct dlm_direntry) + len,
53 			     ls->ls_allocation);
54 	return de;
55 }
56 
57 void dlm_clear_free_entries(struct dlm_ls *ls)
58 {
59 	struct dlm_direntry *de;
60 
61 	spin_lock(&ls->ls_recover_list_lock);
62 	while (!list_empty(&ls->ls_recover_list)) {
63 		de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
64 				list);
65 		list_del(&de->list);
66 		kfree(de);
67 	}
68 	spin_unlock(&ls->ls_recover_list_lock);
69 }
70 
71 /*
72  * We use the upper 16 bits of the hash value to select the directory node.
73  * Low bits are used for distribution of rsb's among hash buckets on each node.
74  *
75  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
76  * num_nodes to the hash value.  This value in the desired range is used as an
77  * offset into the sorted list of nodeid's to give the particular nodeid.
78  */
79 
80 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
81 {
82 	struct list_head *tmp;
83 	struct dlm_member *memb = NULL;
84 	uint32_t node, n = 0;
85 	int nodeid;
86 
87 	if (ls->ls_num_nodes == 1) {
88 		nodeid = dlm_our_nodeid();
89 		goto out;
90 	}
91 
92 	if (ls->ls_node_array) {
93 		node = (hash >> 16) % ls->ls_total_weight;
94 		nodeid = ls->ls_node_array[node];
95 		goto out;
96 	}
97 
98 	/* make_member_array() failed to kmalloc ls_node_array... */
99 
100 	node = (hash >> 16) % ls->ls_num_nodes;
101 
102 	list_for_each(tmp, &ls->ls_nodes) {
103 		if (n++ != node)
104 			continue;
105 		memb = list_entry(tmp, struct dlm_member, list);
106 		break;
107 	}
108 
109 	DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
110 				 ls->ls_num_nodes, n, node););
111 	nodeid = memb->nodeid;
112  out:
113 	return nodeid;
114 }
115 
116 int dlm_dir_nodeid(struct dlm_rsb *r)
117 {
118 	return dlm_hash2nodeid(r->res_ls, r->res_hash);
119 }
120 
121 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
122 {
123 	uint32_t val;
124 
125 	val = jhash(name, len, 0);
126 	val &= (ls->ls_dirtbl_size - 1);
127 
128 	return val;
129 }
130 
131 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
132 {
133 	uint32_t bucket;
134 
135 	bucket = dir_hash(ls, de->name, de->length);
136 	list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
137 }
138 
139 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
140 					  int namelen, uint32_t bucket)
141 {
142 	struct dlm_direntry *de;
143 
144 	list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
145 		if (de->length == namelen && !memcmp(name, de->name, namelen))
146 			goto out;
147 	}
148 	de = NULL;
149  out:
150 	return de;
151 }
152 
153 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
154 {
155 	struct dlm_direntry *de;
156 	uint32_t bucket;
157 
158 	bucket = dir_hash(ls, name, namelen);
159 
160 	spin_lock(&ls->ls_dirtbl[bucket].lock);
161 
162 	de = search_bucket(ls, name, namelen, bucket);
163 
164 	if (!de) {
165 		log_error(ls, "remove fr %u none", nodeid);
166 		goto out;
167 	}
168 
169 	if (de->master_nodeid != nodeid) {
170 		log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
171 		goto out;
172 	}
173 
174 	list_del(&de->list);
175 	kfree(de);
176  out:
177 	spin_unlock(&ls->ls_dirtbl[bucket].lock);
178 }
179 
180 void dlm_dir_clear(struct dlm_ls *ls)
181 {
182 	struct list_head *head;
183 	struct dlm_direntry *de;
184 	int i;
185 
186 	DLM_ASSERT(list_empty(&ls->ls_recover_list), );
187 
188 	for (i = 0; i < ls->ls_dirtbl_size; i++) {
189 		spin_lock(&ls->ls_dirtbl[i].lock);
190 		head = &ls->ls_dirtbl[i].list;
191 		while (!list_empty(head)) {
192 			de = list_entry(head->next, struct dlm_direntry, list);
193 			list_del(&de->list);
194 			put_free_de(ls, de);
195 		}
196 		spin_unlock(&ls->ls_dirtbl[i].lock);
197 	}
198 }
199 
200 int dlm_recover_directory(struct dlm_ls *ls)
201 {
202 	struct dlm_member *memb;
203 	struct dlm_direntry *de;
204 	char *b, *last_name = NULL;
205 	int error = -ENOMEM, last_len, count = 0;
206 	uint16_t namelen;
207 
208 	log_debug(ls, "dlm_recover_directory");
209 
210 	if (dlm_no_directory(ls))
211 		goto out_status;
212 
213 	dlm_dir_clear(ls);
214 
215 	last_name = kmalloc(DLM_RESNAME_MAXLEN, ls->ls_allocation);
216 	if (!last_name)
217 		goto out;
218 
219 	list_for_each_entry(memb, &ls->ls_nodes, list) {
220 		memset(last_name, 0, DLM_RESNAME_MAXLEN);
221 		last_len = 0;
222 
223 		for (;;) {
224 			int left;
225 			error = dlm_recovery_stopped(ls);
226 			if (error)
227 				goto out_free;
228 
229 			error = dlm_rcom_names(ls, memb->nodeid,
230 					       last_name, last_len);
231 			if (error)
232 				goto out_free;
233 
234 			schedule();
235 
236 			/*
237 			 * pick namelen/name pairs out of received buffer
238 			 */
239 
240 			b = ls->ls_recover_buf->rc_buf;
241 			left = ls->ls_recover_buf->rc_header.h_length;
242 			left -= sizeof(struct dlm_rcom);
243 
244 			for (;;) {
245 				__be16 v;
246 
247 				error = -EINVAL;
248 				if (left < sizeof(__be16))
249 					goto out_free;
250 
251 				memcpy(&v, b, sizeof(__be16));
252 				namelen = be16_to_cpu(v);
253 				b += sizeof(__be16);
254 				left -= sizeof(__be16);
255 
256 				/* namelen of 0xFFFFF marks end of names for
257 				   this node; namelen of 0 marks end of the
258 				   buffer */
259 
260 				if (namelen == 0xFFFF)
261 					goto done;
262 				if (!namelen)
263 					break;
264 
265 				if (namelen > left)
266 					goto out_free;
267 
268 				if (namelen > DLM_RESNAME_MAXLEN)
269 					goto out_free;
270 
271 				error = -ENOMEM;
272 				de = get_free_de(ls, namelen);
273 				if (!de)
274 					goto out_free;
275 
276 				de->master_nodeid = memb->nodeid;
277 				de->length = namelen;
278 				last_len = namelen;
279 				memcpy(de->name, b, namelen);
280 				memcpy(last_name, b, namelen);
281 				b += namelen;
282 				left -= namelen;
283 
284 				add_entry_to_hash(ls, de);
285 				count++;
286 			}
287 		}
288          done:
289 		;
290 	}
291 
292  out_status:
293 	error = 0;
294 	dlm_set_recover_status(ls, DLM_RS_DIR);
295 	log_debug(ls, "dlm_recover_directory %d entries", count);
296  out_free:
297 	kfree(last_name);
298  out:
299 	dlm_clear_free_entries(ls);
300 	return error;
301 }
302 
303 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
304 		     int namelen, int *r_nodeid)
305 {
306 	struct dlm_direntry *de, *tmp;
307 	uint32_t bucket;
308 
309 	bucket = dir_hash(ls, name, namelen);
310 
311 	spin_lock(&ls->ls_dirtbl[bucket].lock);
312 	de = search_bucket(ls, name, namelen, bucket);
313 	if (de) {
314 		*r_nodeid = de->master_nodeid;
315 		spin_unlock(&ls->ls_dirtbl[bucket].lock);
316 		if (*r_nodeid == nodeid)
317 			return -EEXIST;
318 		return 0;
319 	}
320 
321 	spin_unlock(&ls->ls_dirtbl[bucket].lock);
322 
323 	if (namelen > DLM_RESNAME_MAXLEN)
324 		return -EINVAL;
325 
326 	de = kzalloc(sizeof(struct dlm_direntry) + namelen, ls->ls_allocation);
327 	if (!de)
328 		return -ENOMEM;
329 
330 	de->master_nodeid = nodeid;
331 	de->length = namelen;
332 	memcpy(de->name, name, namelen);
333 
334 	spin_lock(&ls->ls_dirtbl[bucket].lock);
335 	tmp = search_bucket(ls, name, namelen, bucket);
336 	if (tmp) {
337 		kfree(de);
338 		de = tmp;
339 	} else {
340 		list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
341 	}
342 	*r_nodeid = de->master_nodeid;
343 	spin_unlock(&ls->ls_dirtbl[bucket].lock);
344 	return 0;
345 }
346 
347 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
348 		   int *r_nodeid)
349 {
350 	return get_entry(ls, nodeid, name, namelen, r_nodeid);
351 }
352 
353 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
354 {
355 	struct dlm_rsb *r;
356 
357 	down_read(&ls->ls_root_sem);
358 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
359 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
360 			up_read(&ls->ls_root_sem);
361 			return r;
362 		}
363 	}
364 	up_read(&ls->ls_root_sem);
365 	return NULL;
366 }
367 
368 /* Find the rsb where we left off (or start again), then send rsb names
369    for rsb's we're master of and whose directory node matches the requesting
370    node.  inbuf is the rsb name last sent, inlen is the name's length */
371 
372 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
373  			   char *outbuf, int outlen, int nodeid)
374 {
375 	struct list_head *list;
376 	struct dlm_rsb *r;
377 	int offset = 0, dir_nodeid;
378 	__be16 be_namelen;
379 
380 	down_read(&ls->ls_root_sem);
381 
382 	if (inlen > 1) {
383 		r = find_rsb_root(ls, inbuf, inlen);
384 		if (!r) {
385 			inbuf[inlen - 1] = '\0';
386 			log_error(ls, "copy_master_names from %d start %d %s",
387 				  nodeid, inlen, inbuf);
388 			goto out;
389 		}
390 		list = r->res_root_list.next;
391 	} else {
392 		list = ls->ls_root_list.next;
393 	}
394 
395 	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
396 		r = list_entry(list, struct dlm_rsb, res_root_list);
397 		if (r->res_nodeid)
398 			continue;
399 
400 		dir_nodeid = dlm_dir_nodeid(r);
401 		if (dir_nodeid != nodeid)
402 			continue;
403 
404 		/*
405 		 * The block ends when we can't fit the following in the
406 		 * remaining buffer space:
407 		 * namelen (uint16_t) +
408 		 * name (r->res_length) +
409 		 * end-of-block record 0x0000 (uint16_t)
410 		 */
411 
412 		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
413 			/* Write end-of-block record */
414 			be_namelen = cpu_to_be16(0);
415 			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
416 			offset += sizeof(__be16);
417 			goto out;
418 		}
419 
420 		be_namelen = cpu_to_be16(r->res_length);
421 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
422 		offset += sizeof(__be16);
423 		memcpy(outbuf + offset, r->res_name, r->res_length);
424 		offset += r->res_length;
425 	}
426 
427 	/*
428 	 * If we've reached the end of the list (and there's room) write a
429 	 * terminating record.
430 	 */
431 
432 	if ((list == &ls->ls_root_list) &&
433 	    (offset + sizeof(uint16_t) <= outlen)) {
434 		be_namelen = cpu_to_be16(0xFFFF);
435 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
436 		offset += sizeof(__be16);
437 	}
438 
439  out:
440 	up_read(&ls->ls_root_sem);
441 }
442 
443