xref: /openbmc/linux/fs/dlm/member.c (revision a1e58bbd)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 #include "dlm_internal.h"
14 #include "lockspace.h"
15 #include "member.h"
16 #include "recoverd.h"
17 #include "recover.h"
18 #include "rcom.h"
19 #include "config.h"
20 
21 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
22 {
23 	struct dlm_member *memb = NULL;
24 	struct list_head *tmp;
25 	struct list_head *newlist = &new->list;
26 	struct list_head *head = &ls->ls_nodes;
27 
28 	list_for_each(tmp, head) {
29 		memb = list_entry(tmp, struct dlm_member, list);
30 		if (new->nodeid < memb->nodeid)
31 			break;
32 	}
33 
34 	if (!memb)
35 		list_add_tail(newlist, head);
36 	else {
37 		/* FIXME: can use list macro here */
38 		newlist->prev = tmp->prev;
39 		newlist->next = tmp;
40 		tmp->prev->next = newlist;
41 		tmp->prev = newlist;
42 	}
43 }
44 
45 static int dlm_add_member(struct dlm_ls *ls, int nodeid)
46 {
47 	struct dlm_member *memb;
48 	int w;
49 
50 	memb = kzalloc(sizeof(struct dlm_member), GFP_KERNEL);
51 	if (!memb)
52 		return -ENOMEM;
53 
54 	w = dlm_node_weight(ls->ls_name, nodeid);
55 	if (w < 0) {
56 		kfree(memb);
57 		return w;
58 	}
59 
60 	memb->nodeid = nodeid;
61 	memb->weight = w;
62 	add_ordered_member(ls, memb);
63 	ls->ls_num_nodes++;
64 	return 0;
65 }
66 
67 static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
68 {
69 	list_move(&memb->list, &ls->ls_nodes_gone);
70 	ls->ls_num_nodes--;
71 }
72 
73 int dlm_is_member(struct dlm_ls *ls, int nodeid)
74 {
75 	struct dlm_member *memb;
76 
77 	list_for_each_entry(memb, &ls->ls_nodes, list) {
78 		if (memb->nodeid == nodeid)
79 			return 1;
80 	}
81 	return 0;
82 }
83 
84 int dlm_is_removed(struct dlm_ls *ls, int nodeid)
85 {
86 	struct dlm_member *memb;
87 
88 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
89 		if (memb->nodeid == nodeid)
90 			return 1;
91 	}
92 	return 0;
93 }
94 
95 static void clear_memb_list(struct list_head *head)
96 {
97 	struct dlm_member *memb;
98 
99 	while (!list_empty(head)) {
100 		memb = list_entry(head->next, struct dlm_member, list);
101 		list_del(&memb->list);
102 		kfree(memb);
103 	}
104 }
105 
106 void dlm_clear_members(struct dlm_ls *ls)
107 {
108 	clear_memb_list(&ls->ls_nodes);
109 	ls->ls_num_nodes = 0;
110 }
111 
112 void dlm_clear_members_gone(struct dlm_ls *ls)
113 {
114 	clear_memb_list(&ls->ls_nodes_gone);
115 }
116 
117 static void make_member_array(struct dlm_ls *ls)
118 {
119 	struct dlm_member *memb;
120 	int i, w, x = 0, total = 0, all_zero = 0, *array;
121 
122 	kfree(ls->ls_node_array);
123 	ls->ls_node_array = NULL;
124 
125 	list_for_each_entry(memb, &ls->ls_nodes, list) {
126 		if (memb->weight)
127 			total += memb->weight;
128 	}
129 
130 	/* all nodes revert to weight of 1 if all have weight 0 */
131 
132 	if (!total) {
133 		total = ls->ls_num_nodes;
134 		all_zero = 1;
135 	}
136 
137 	ls->ls_total_weight = total;
138 
139 	array = kmalloc(sizeof(int) * total, GFP_KERNEL);
140 	if (!array)
141 		return;
142 
143 	list_for_each_entry(memb, &ls->ls_nodes, list) {
144 		if (!all_zero && !memb->weight)
145 			continue;
146 
147 		if (all_zero)
148 			w = 1;
149 		else
150 			w = memb->weight;
151 
152 		DLM_ASSERT(x < total, printk("total %d x %d\n", total, x););
153 
154 		for (i = 0; i < w; i++)
155 			array[x++] = memb->nodeid;
156 	}
157 
158 	ls->ls_node_array = array;
159 }
160 
161 /* send a status request to all members just to establish comms connections */
162 
163 static int ping_members(struct dlm_ls *ls)
164 {
165 	struct dlm_member *memb;
166 	int error = 0;
167 
168 	list_for_each_entry(memb, &ls->ls_nodes, list) {
169 		error = dlm_recovery_stopped(ls);
170 		if (error)
171 			break;
172 		error = dlm_rcom_status(ls, memb->nodeid);
173 		if (error)
174 			break;
175 	}
176 	if (error)
177 		log_debug(ls, "ping_members aborted %d last nodeid %d",
178 			  error, ls->ls_recover_nodeid);
179 	return error;
180 }
181 
182 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
183 {
184 	struct dlm_member *memb, *safe;
185 	int i, error, found, pos = 0, neg = 0, low = -1;
186 
187 	/* previously removed members that we've not finished removing need to
188 	   count as a negative change so the "neg" recovery steps will happen */
189 
190 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
191 		log_debug(ls, "prev removed member %d", memb->nodeid);
192 		neg++;
193 	}
194 
195 	/* move departed members from ls_nodes to ls_nodes_gone */
196 
197 	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
198 		found = 0;
199 		for (i = 0; i < rv->node_count; i++) {
200 			if (memb->nodeid == rv->nodeids[i]) {
201 				found = 1;
202 				break;
203 			}
204 		}
205 
206 		if (!found) {
207 			neg++;
208 			dlm_remove_member(ls, memb);
209 			log_debug(ls, "remove member %d", memb->nodeid);
210 		}
211 	}
212 
213 	/* add new members to ls_nodes */
214 
215 	for (i = 0; i < rv->node_count; i++) {
216 		if (dlm_is_member(ls, rv->nodeids[i]))
217 			continue;
218 		dlm_add_member(ls, rv->nodeids[i]);
219 		pos++;
220 		log_debug(ls, "add member %d", rv->nodeids[i]);
221 	}
222 
223 	list_for_each_entry(memb, &ls->ls_nodes, list) {
224 		if (low == -1 || memb->nodeid < low)
225 			low = memb->nodeid;
226 	}
227 	ls->ls_low_nodeid = low;
228 
229 	make_member_array(ls);
230 	dlm_set_recover_status(ls, DLM_RS_NODES);
231 	*neg_out = neg;
232 
233 	error = ping_members(ls);
234 	if (!error || error == -EPROTO) {
235 		/* new_lockspace() may be waiting to know if the config
236 		   is good or bad */
237 		ls->ls_members_result = error;
238 		complete(&ls->ls_members_done);
239 	}
240 	if (error)
241 		goto out;
242 
243 	error = dlm_recover_members_wait(ls);
244  out:
245 	log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error);
246 	return error;
247 }
248 
249 /* Userspace guarantees that dlm_ls_stop() has completed on all nodes before
250    dlm_ls_start() is called on any of them to start the new recovery. */
251 
252 int dlm_ls_stop(struct dlm_ls *ls)
253 {
254 	int new;
255 
256 	/*
257 	 * Prevent dlm_recv from being in the middle of something when we do
258 	 * the stop.  This includes ensuring dlm_recv isn't processing a
259 	 * recovery message (rcom), while dlm_recoverd is aborting and
260 	 * resetting things from an in-progress recovery.  i.e. we want
261 	 * dlm_recoverd to abort its recovery without worrying about dlm_recv
262 	 * processing an rcom at the same time.  Stopping dlm_recv also makes
263 	 * it easy for dlm_receive_message() to check locking stopped and add a
264 	 * message to the requestqueue without races.
265 	 */
266 
267 	down_write(&ls->ls_recv_active);
268 
269 	/*
270 	 * Abort any recovery that's in progress (see RECOVERY_STOP,
271 	 * dlm_recovery_stopped()) and tell any other threads running in the
272 	 * dlm to quit any processing (see RUNNING, dlm_locking_stopped()).
273 	 */
274 
275 	spin_lock(&ls->ls_recover_lock);
276 	set_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
277 	new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags);
278 	ls->ls_recover_seq++;
279 	spin_unlock(&ls->ls_recover_lock);
280 
281 	/*
282 	 * Let dlm_recv run again, now any normal messages will be saved on the
283 	 * requestqueue for later.
284 	 */
285 
286 	up_write(&ls->ls_recv_active);
287 
288 	/*
289 	 * This in_recovery lock does two things:
290 	 * 1) Keeps this function from returning until all threads are out
291 	 *    of locking routines and locking is truely stopped.
292 	 * 2) Keeps any new requests from being processed until it's unlocked
293 	 *    when recovery is complete.
294 	 */
295 
296 	if (new)
297 		down_write(&ls->ls_in_recovery);
298 
299 	/*
300 	 * The recoverd suspend/resume makes sure that dlm_recoverd (if
301 	 * running) has noticed RECOVERY_STOP above and quit processing the
302 	 * previous recovery.
303 	 */
304 
305 	dlm_recoverd_suspend(ls);
306 	ls->ls_recover_status = 0;
307 	dlm_recoverd_resume(ls);
308 
309 	if (!ls->ls_recover_begin)
310 		ls->ls_recover_begin = jiffies;
311 	return 0;
312 }
313 
314 int dlm_ls_start(struct dlm_ls *ls)
315 {
316 	struct dlm_recover *rv = NULL, *rv_old;
317 	int *ids = NULL;
318 	int error, count;
319 
320 	rv = kzalloc(sizeof(struct dlm_recover), GFP_KERNEL);
321 	if (!rv)
322 		return -ENOMEM;
323 
324 	error = count = dlm_nodeid_list(ls->ls_name, &ids);
325 	if (error <= 0)
326 		goto fail;
327 
328 	spin_lock(&ls->ls_recover_lock);
329 
330 	/* the lockspace needs to be stopped before it can be started */
331 
332 	if (!dlm_locking_stopped(ls)) {
333 		spin_unlock(&ls->ls_recover_lock);
334 		log_error(ls, "start ignored: lockspace running");
335 		error = -EINVAL;
336 		goto fail;
337 	}
338 
339 	rv->nodeids = ids;
340 	rv->node_count = count;
341 	rv->seq = ++ls->ls_recover_seq;
342 	rv_old = ls->ls_recover_args;
343 	ls->ls_recover_args = rv;
344 	spin_unlock(&ls->ls_recover_lock);
345 
346 	if (rv_old) {
347 		kfree(rv_old->nodeids);
348 		kfree(rv_old);
349 	}
350 
351 	dlm_recoverd_kick(ls);
352 	return 0;
353 
354  fail:
355 	kfree(rv);
356 	kfree(ids);
357 	return error;
358 }
359 
360