xref: /openbmc/linux/fs/dlm/member.c (revision 367b8112)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 #include "dlm_internal.h"
14 #include "lockspace.h"
15 #include "member.h"
16 #include "recoverd.h"
17 #include "recover.h"
18 #include "rcom.h"
19 #include "config.h"
20 
21 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
22 {
23 	struct dlm_member *memb = NULL;
24 	struct list_head *tmp;
25 	struct list_head *newlist = &new->list;
26 	struct list_head *head = &ls->ls_nodes;
27 
28 	list_for_each(tmp, head) {
29 		memb = list_entry(tmp, struct dlm_member, list);
30 		if (new->nodeid < memb->nodeid)
31 			break;
32 	}
33 
34 	if (!memb)
35 		list_add_tail(newlist, head);
36 	else {
37 		/* FIXME: can use list macro here */
38 		newlist->prev = tmp->prev;
39 		newlist->next = tmp;
40 		tmp->prev->next = newlist;
41 		tmp->prev = newlist;
42 	}
43 }
44 
45 static int dlm_add_member(struct dlm_ls *ls, int nodeid)
46 {
47 	struct dlm_member *memb;
48 	int w;
49 
50 	memb = kzalloc(sizeof(struct dlm_member), GFP_KERNEL);
51 	if (!memb)
52 		return -ENOMEM;
53 
54 	w = dlm_node_weight(ls->ls_name, nodeid);
55 	if (w < 0) {
56 		kfree(memb);
57 		return w;
58 	}
59 
60 	memb->nodeid = nodeid;
61 	memb->weight = w;
62 	add_ordered_member(ls, memb);
63 	ls->ls_num_nodes++;
64 	return 0;
65 }
66 
67 static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
68 {
69 	list_move(&memb->list, &ls->ls_nodes_gone);
70 	ls->ls_num_nodes--;
71 }
72 
73 int dlm_is_member(struct dlm_ls *ls, int nodeid)
74 {
75 	struct dlm_member *memb;
76 
77 	list_for_each_entry(memb, &ls->ls_nodes, list) {
78 		if (memb->nodeid == nodeid)
79 			return 1;
80 	}
81 	return 0;
82 }
83 
84 int dlm_is_removed(struct dlm_ls *ls, int nodeid)
85 {
86 	struct dlm_member *memb;
87 
88 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
89 		if (memb->nodeid == nodeid)
90 			return 1;
91 	}
92 	return 0;
93 }
94 
95 static void clear_memb_list(struct list_head *head)
96 {
97 	struct dlm_member *memb;
98 
99 	while (!list_empty(head)) {
100 		memb = list_entry(head->next, struct dlm_member, list);
101 		list_del(&memb->list);
102 		kfree(memb);
103 	}
104 }
105 
106 void dlm_clear_members(struct dlm_ls *ls)
107 {
108 	clear_memb_list(&ls->ls_nodes);
109 	ls->ls_num_nodes = 0;
110 }
111 
112 void dlm_clear_members_gone(struct dlm_ls *ls)
113 {
114 	clear_memb_list(&ls->ls_nodes_gone);
115 }
116 
117 static void make_member_array(struct dlm_ls *ls)
118 {
119 	struct dlm_member *memb;
120 	int i, w, x = 0, total = 0, all_zero = 0, *array;
121 
122 	kfree(ls->ls_node_array);
123 	ls->ls_node_array = NULL;
124 
125 	list_for_each_entry(memb, &ls->ls_nodes, list) {
126 		if (memb->weight)
127 			total += memb->weight;
128 	}
129 
130 	/* all nodes revert to weight of 1 if all have weight 0 */
131 
132 	if (!total) {
133 		total = ls->ls_num_nodes;
134 		all_zero = 1;
135 	}
136 
137 	ls->ls_total_weight = total;
138 
139 	array = kmalloc(sizeof(int) * total, GFP_KERNEL);
140 	if (!array)
141 		return;
142 
143 	list_for_each_entry(memb, &ls->ls_nodes, list) {
144 		if (!all_zero && !memb->weight)
145 			continue;
146 
147 		if (all_zero)
148 			w = 1;
149 		else
150 			w = memb->weight;
151 
152 		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
153 
154 		for (i = 0; i < w; i++)
155 			array[x++] = memb->nodeid;
156 	}
157 
158 	ls->ls_node_array = array;
159 }
160 
161 /* send a status request to all members just to establish comms connections */
162 
163 static int ping_members(struct dlm_ls *ls)
164 {
165 	struct dlm_member *memb;
166 	int error = 0;
167 
168 	list_for_each_entry(memb, &ls->ls_nodes, list) {
169 		error = dlm_recovery_stopped(ls);
170 		if (error)
171 			break;
172 		error = dlm_rcom_status(ls, memb->nodeid);
173 		if (error)
174 			break;
175 	}
176 	if (error)
177 		log_debug(ls, "ping_members aborted %d last nodeid %d",
178 			  error, ls->ls_recover_nodeid);
179 	return error;
180 }
181 
182 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
183 {
184 	struct dlm_member *memb, *safe;
185 	int i, error, found, pos = 0, neg = 0, low = -1;
186 
187 	/* previously removed members that we've not finished removing need to
188 	   count as a negative change so the "neg" recovery steps will happen */
189 
190 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
191 		log_debug(ls, "prev removed member %d", memb->nodeid);
192 		neg++;
193 	}
194 
195 	/* move departed members from ls_nodes to ls_nodes_gone */
196 
197 	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
198 		found = 0;
199 		for (i = 0; i < rv->node_count; i++) {
200 			if (memb->nodeid == rv->nodeids[i]) {
201 				found = 1;
202 				break;
203 			}
204 		}
205 
206 		if (!found) {
207 			neg++;
208 			dlm_remove_member(ls, memb);
209 			log_debug(ls, "remove member %d", memb->nodeid);
210 		}
211 	}
212 
213 	/* Add an entry to ls_nodes_gone for members that were removed and
214 	   then added again, so that previous state for these nodes will be
215 	   cleared during recovery. */
216 
217 	for (i = 0; i < rv->new_count; i++) {
218 		if (!dlm_is_member(ls, rv->new[i]))
219 			continue;
220 		log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]);
221 
222 		memb = kzalloc(sizeof(struct dlm_member), GFP_KERNEL);
223 		if (!memb)
224 			return -ENOMEM;
225 		memb->nodeid = rv->new[i];
226 		list_add_tail(&memb->list, &ls->ls_nodes_gone);
227 		neg++;
228 	}
229 
230 	/* add new members to ls_nodes */
231 
232 	for (i = 0; i < rv->node_count; i++) {
233 		if (dlm_is_member(ls, rv->nodeids[i]))
234 			continue;
235 		dlm_add_member(ls, rv->nodeids[i]);
236 		pos++;
237 		log_debug(ls, "add member %d", rv->nodeids[i]);
238 	}
239 
240 	list_for_each_entry(memb, &ls->ls_nodes, list) {
241 		if (low == -1 || memb->nodeid < low)
242 			low = memb->nodeid;
243 	}
244 	ls->ls_low_nodeid = low;
245 
246 	make_member_array(ls);
247 	dlm_set_recover_status(ls, DLM_RS_NODES);
248 	*neg_out = neg;
249 
250 	error = ping_members(ls);
251 	if (!error || error == -EPROTO) {
252 		/* new_lockspace() may be waiting to know if the config
253 		   is good or bad */
254 		ls->ls_members_result = error;
255 		complete(&ls->ls_members_done);
256 	}
257 	if (error)
258 		goto out;
259 
260 	error = dlm_recover_members_wait(ls);
261  out:
262 	log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error);
263 	return error;
264 }
265 
266 /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
267    dlm_ls_start() is called on any of them to start the new recovery. */
268 
269 int dlm_ls_stop(struct dlm_ls *ls)
270 {
271 	int new;
272 
273 	/*
274 	 * Prevent dlm_recv from being in the middle of something when we do
275 	 * the stop.  This includes ensuring dlm_recv isn't processing a
276 	 * recovery message (rcom), while dlm_recoverd is aborting and
277 	 * resetting things from an in-progress recovery.  i.e. we want
278 	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
279 	 * processing an rcom at the same time.  Stopping dlm_recv also makes
280 	 * it easy for dlm_receive_message() to check locking stopped and add a
281 	 * message to the requestqueue without races.
282 	 */
283 
284 	down_write(&ls->ls_recv_active);
285 
286 	/*
287 	 * Abort any recovery that's in progress (see RECOVERY_STOP,
288 	 * dlm_recovery_stopped()) and tell any other threads running in the
289 	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
290 	 */
291 
292 	spin_lock(&ls->ls_recover_lock);
293 	set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
294 	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
295 	ls->ls_recover_seq++;
296 	spin_unlock(&ls->ls_recover_lock);
297 
298 	/*
299 	 * Let dlm_recv run again, now any normal messages will be saved on the
300 	 * requestqueue for later.
301 	 */
302 
303 	up_write(&ls->ls_recv_active);
304 
305 	/*
306 	 * This in_recovery lock does two things:
307 	 * 1) Keeps this function from returning until all threads are out
308 	 *    of locking routines and locking is truely stopped.
309 	 * 2) Keeps any new requests from being processed until it's unlocked
310 	 *    when recovery is complete.
311 	 */
312 
313 	if (new)
314 		down_write(&ls->ls_in_recovery);
315 
316 	/*
317 	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
318 	 * running) has noticed RECOVERY_STOP above and quit processing the
319 	 * previous recovery.
320 	 */
321 
322 	dlm_recoverd_suspend(ls);
323 	ls->ls_recover_status = 0;
324 	dlm_recoverd_resume(ls);
325 
326 	if (!ls->ls_recover_begin)
327 		ls->ls_recover_begin = jiffies;
328 	return 0;
329 }
330 
331 int dlm_ls_start(struct dlm_ls *ls)
332 {
333 	struct dlm_recover *rv = NULL, *rv_old;
334 	int *ids = NULL, *new = NULL;
335 	int error, ids_count = 0, new_count = 0;
336 
337 	rv = kzalloc(sizeof(struct dlm_recover), GFP_KERNEL);
338 	if (!rv)
339 		return -ENOMEM;
340 
341 	error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count,
342 				&new, &new_count);
343 	if (error < 0)
344 		goto fail;
345 
346 	spin_lock(&ls->ls_recover_lock);
347 
348 	/* the lockspace needs to be stopped before it can be started */
349 
350 	if (!dlm_locking_stopped(ls)) {
351 		spin_unlock(&ls->ls_recover_lock);
352 		log_error(ls, "start ignored: lockspace running");
353 		error = -EINVAL;
354 		goto fail;
355 	}
356 
357 	rv->nodeids = ids;
358 	rv->node_count = ids_count;
359 	rv->new = new;
360 	rv->new_count = new_count;
361 	rv->seq = ++ls->ls_recover_seq;
362 	rv_old = ls->ls_recover_args;
363 	ls->ls_recover_args = rv;
364 	spin_unlock(&ls->ls_recover_lock);
365 
366 	if (rv_old) {
367 		log_error(ls, "unused recovery %llx %d",
368 			  (unsigned long long)rv_old->seq, rv_old->node_count);
369 		kfree(rv_old->nodeids);
370 		kfree(rv_old->new);
371 		kfree(rv_old);
372 	}
373 
374 	dlm_recoverd_kick(ls);
375 	return 0;
376 
377  fail:
378 	kfree(rv);
379 	kfree(ids);
380 	kfree(new);
381 	return error;
382 }
383 
384