xref: /openbmc/linux/drivers/misc/sgi-xp/xpc_main.c (revision f15cbe6f1a4b4d9df59142fc8e4abb973302cf44)
1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2004-2008 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8 
9 /*
10  * Cross Partition Communication (XPC) support - standard version.
11  *
12  *	XPC provides a message passing capability that crosses partition
13  *	boundaries. This module is made up of two parts:
14  *
15  *	    partition	This part detects the presence/absence of other
16  *			partitions. It provides a heartbeat and monitors
17  *			the heartbeats of other partitions.
18  *
19  *	    channel	This part manages the channels and sends/receives
20  *			messages across them to/from other partitions.
21  *
22  *	There are a couple of additional functions residing in XP, which
23  *	provide an interface to XPC for its users.
24  *
25  *
26  *	Caveats:
27  *
28  *	  . We currently have no way to determine which nasid an IPI came
29  *	    from. Thus, xpc_IPI_send() does a remote AMO write followed by
30  *	    an IPI. The AMO indicates where data is to be pulled from, so
31  *	    after the IPI arrives, the remote partition checks the AMO word.
32  *	    The IPI can actually arrive before the AMO however, so other code
33  *	    must periodically check for this case. Also, remote AMO operations
34  *	    do not reliably time out. Thus we do a remote PIO read solely to
35  *	    know whether the remote partition is down and whether we should
36  *	    stop sending IPIs to it. This remote PIO read operation is set up
37  *	    in a special nofault region so SAL knows to ignore (and cleanup)
38  *	    any errors due to the remote AMO write, PIO read, and/or PIO
39  *	    write operations.
40  *
41  *	    If/when new hardware solves this IPI problem, we should abandon
42  *	    the current approach.
43  *
44  */
45 
46 #include <linux/kernel.h>
47 #include <linux/module.h>
48 #include <linux/init.h>
49 #include <linux/cache.h>
50 #include <linux/interrupt.h>
51 #include <linux/delay.h>
52 #include <linux/reboot.h>
53 #include <linux/completion.h>
54 #include <linux/kdebug.h>
55 #include <linux/kthread.h>
56 #include <linux/uaccess.h>
57 #include <asm/sn/intr.h>
58 #include <asm/sn/sn_sal.h>
59 #include "xpc.h"
60 
61 /* define two XPC debug device structures to be used with dev_dbg() et al */
62 
63 struct device_driver xpc_dbg_name = {
64 	.name = "xpc"
65 };
66 
67 struct device xpc_part_dbg_subname = {
68 	.bus_id = {0},		/* set to "part" at xpc_init() time */
69 	.driver = &xpc_dbg_name
70 };
71 
72 struct device xpc_chan_dbg_subname = {
73 	.bus_id = {0},		/* set to "chan" at xpc_init() time */
74 	.driver = &xpc_dbg_name
75 };
76 
77 struct device *xpc_part = &xpc_part_dbg_subname;
78 struct device *xpc_chan = &xpc_chan_dbg_subname;
79 
80 static int xpc_kdebug_ignore;
81 
82 /* systune related variables for /proc/sys directories */
83 
84 static int xpc_hb_interval = XPC_HB_DEFAULT_INTERVAL;
85 static int xpc_hb_min_interval = 1;
86 static int xpc_hb_max_interval = 10;
87 
88 static int xpc_hb_check_interval = XPC_HB_CHECK_DEFAULT_INTERVAL;
89 static int xpc_hb_check_min_interval = 10;
90 static int xpc_hb_check_max_interval = 120;
91 
92 int xpc_disengage_request_timelimit = XPC_DISENGAGE_REQUEST_DEFAULT_TIMELIMIT;
93 static int xpc_disengage_request_min_timelimit;	/* = 0 */
94 static int xpc_disengage_request_max_timelimit = 120;
95 
96 static ctl_table xpc_sys_xpc_hb_dir[] = {
97 	{
98 	 .ctl_name = CTL_UNNUMBERED,
99 	 .procname = "hb_interval",
100 	 .data = &xpc_hb_interval,
101 	 .maxlen = sizeof(int),
102 	 .mode = 0644,
103 	 .proc_handler = &proc_dointvec_minmax,
104 	 .strategy = &sysctl_intvec,
105 	 .extra1 = &xpc_hb_min_interval,
106 	 .extra2 = &xpc_hb_max_interval},
107 	{
108 	 .ctl_name = CTL_UNNUMBERED,
109 	 .procname = "hb_check_interval",
110 	 .data = &xpc_hb_check_interval,
111 	 .maxlen = sizeof(int),
112 	 .mode = 0644,
113 	 .proc_handler = &proc_dointvec_minmax,
114 	 .strategy = &sysctl_intvec,
115 	 .extra1 = &xpc_hb_check_min_interval,
116 	 .extra2 = &xpc_hb_check_max_interval},
117 	{}
118 };
119 static ctl_table xpc_sys_xpc_dir[] = {
120 	{
121 	 .ctl_name = CTL_UNNUMBERED,
122 	 .procname = "hb",
123 	 .mode = 0555,
124 	 .child = xpc_sys_xpc_hb_dir},
125 	{
126 	 .ctl_name = CTL_UNNUMBERED,
127 	 .procname = "disengage_request_timelimit",
128 	 .data = &xpc_disengage_request_timelimit,
129 	 .maxlen = sizeof(int),
130 	 .mode = 0644,
131 	 .proc_handler = &proc_dointvec_minmax,
132 	 .strategy = &sysctl_intvec,
133 	 .extra1 = &xpc_disengage_request_min_timelimit,
134 	 .extra2 = &xpc_disengage_request_max_timelimit},
135 	{}
136 };
137 static ctl_table xpc_sys_dir[] = {
138 	{
139 	 .ctl_name = CTL_UNNUMBERED,
140 	 .procname = "xpc",
141 	 .mode = 0555,
142 	 .child = xpc_sys_xpc_dir},
143 	{}
144 };
145 static struct ctl_table_header *xpc_sysctl;
146 
147 /* non-zero if any remote partition disengage request was timed out */
148 int xpc_disengage_request_timedout;
149 
150 /* #of IRQs received */
151 static atomic_t xpc_act_IRQ_rcvd;
152 
153 /* IRQ handler notifies this wait queue on receipt of an IRQ */
154 static DECLARE_WAIT_QUEUE_HEAD(xpc_act_IRQ_wq);
155 
156 static unsigned long xpc_hb_check_timeout;
157 
158 /* notification that the xpc_hb_checker thread has exited */
159 static DECLARE_COMPLETION(xpc_hb_checker_exited);
160 
161 /* notification that the xpc_discovery thread has exited */
162 static DECLARE_COMPLETION(xpc_discovery_exited);
163 
164 static struct timer_list xpc_hb_timer;
165 
166 static void xpc_kthread_waitmsgs(struct xpc_partition *, struct xpc_channel *);
167 
168 static int xpc_system_reboot(struct notifier_block *, unsigned long, void *);
169 static struct notifier_block xpc_reboot_notifier = {
170 	.notifier_call = xpc_system_reboot,
171 };
172 
173 static int xpc_system_die(struct notifier_block *, unsigned long, void *);
174 static struct notifier_block xpc_die_notifier = {
175 	.notifier_call = xpc_system_die,
176 };
177 
178 /*
179  * Timer function to enforce the timelimit on the partition disengage request.
180  */
181 static void
182 xpc_timeout_partition_disengage_request(unsigned long data)
183 {
184 	struct xpc_partition *part = (struct xpc_partition *)data;
185 
186 	DBUG_ON(time_before(jiffies, part->disengage_request_timeout));
187 
188 	(void)xpc_partition_disengaged(part);
189 
190 	DBUG_ON(part->disengage_request_timeout != 0);
191 	DBUG_ON(xpc_partition_engaged(1UL << XPC_PARTID(part)) != 0);
192 }
193 
194 /*
195  * Notify the heartbeat check thread that an IRQ has been received.
196  */
197 static irqreturn_t
198 xpc_act_IRQ_handler(int irq, void *dev_id)
199 {
200 	atomic_inc(&xpc_act_IRQ_rcvd);
201 	wake_up_interruptible(&xpc_act_IRQ_wq);
202 	return IRQ_HANDLED;
203 }
204 
205 /*
206  * Timer to produce the heartbeat.  The timer structures function is
207  * already set when this is initially called.  A tunable is used to
208  * specify when the next timeout should occur.
209  */
210 static void
211 xpc_hb_beater(unsigned long dummy)
212 {
213 	xpc_vars->heartbeat++;
214 
215 	if (time_after_eq(jiffies, xpc_hb_check_timeout))
216 		wake_up_interruptible(&xpc_act_IRQ_wq);
217 
218 	xpc_hb_timer.expires = jiffies + (xpc_hb_interval * HZ);
219 	add_timer(&xpc_hb_timer);
220 }
221 
222 /*
223  * This thread is responsible for nearly all of the partition
224  * activation/deactivation.
225  */
226 static int
227 xpc_hb_checker(void *ignore)
228 {
229 	int last_IRQ_count = 0;
230 	int new_IRQ_count;
231 	int force_IRQ = 0;
232 	cpumask_of_cpu_ptr(cpumask, XPC_HB_CHECK_CPU);
233 
234 	/* this thread was marked active by xpc_hb_init() */
235 
236 	set_cpus_allowed_ptr(current, cpumask);
237 
238 	/* set our heartbeating to other partitions into motion */
239 	xpc_hb_check_timeout = jiffies + (xpc_hb_check_interval * HZ);
240 	xpc_hb_beater(0);
241 
242 	while (!xpc_exiting) {
243 
244 		dev_dbg(xpc_part, "woke up with %d ticks rem; %d IRQs have "
245 			"been received\n",
246 			(int)(xpc_hb_check_timeout - jiffies),
247 			atomic_read(&xpc_act_IRQ_rcvd) - last_IRQ_count);
248 
249 		/* checking of remote heartbeats is skewed by IRQ handling */
250 		if (time_after_eq(jiffies, xpc_hb_check_timeout)) {
251 			dev_dbg(xpc_part, "checking remote heartbeats\n");
252 			xpc_check_remote_hb();
253 
254 			/*
255 			 * We need to periodically recheck to ensure no
256 			 * IPI/AMO pairs have been missed.  That check
257 			 * must always reset xpc_hb_check_timeout.
258 			 */
259 			force_IRQ = 1;
260 		}
261 
262 		/* check for outstanding IRQs */
263 		new_IRQ_count = atomic_read(&xpc_act_IRQ_rcvd);
264 		if (last_IRQ_count < new_IRQ_count || force_IRQ != 0) {
265 			force_IRQ = 0;
266 
267 			dev_dbg(xpc_part, "found an IRQ to process; will be "
268 				"resetting xpc_hb_check_timeout\n");
269 
270 			last_IRQ_count += xpc_identify_act_IRQ_sender();
271 			if (last_IRQ_count < new_IRQ_count) {
272 				/* retry once to help avoid missing AMO */
273 				(void)xpc_identify_act_IRQ_sender();
274 			}
275 			last_IRQ_count = new_IRQ_count;
276 
277 			xpc_hb_check_timeout = jiffies +
278 			    (xpc_hb_check_interval * HZ);
279 		}
280 
281 		/* wait for IRQ or timeout */
282 		(void)wait_event_interruptible(xpc_act_IRQ_wq,
283 					       (last_IRQ_count <
284 						atomic_read(&xpc_act_IRQ_rcvd)
285 						|| time_after_eq(jiffies,
286 							xpc_hb_check_timeout) ||
287 						xpc_exiting));
288 	}
289 
290 	dev_dbg(xpc_part, "heartbeat checker is exiting\n");
291 
292 	/* mark this thread as having exited */
293 	complete(&xpc_hb_checker_exited);
294 	return 0;
295 }
296 
297 /*
298  * This thread will attempt to discover other partitions to activate
299  * based on info provided by SAL. This new thread is short lived and
300  * will exit once discovery is complete.
301  */
302 static int
303 xpc_initiate_discovery(void *ignore)
304 {
305 	xpc_discovery();
306 
307 	dev_dbg(xpc_part, "discovery thread is exiting\n");
308 
309 	/* mark this thread as having exited */
310 	complete(&xpc_discovery_exited);
311 	return 0;
312 }
313 
314 /*
315  * Establish first contact with the remote partititon. This involves pulling
316  * the XPC per partition variables from the remote partition and waiting for
317  * the remote partition to pull ours.
318  */
319 static enum xp_retval
320 xpc_make_first_contact(struct xpc_partition *part)
321 {
322 	enum xp_retval ret;
323 
324 	while ((ret = xpc_pull_remote_vars_part(part)) != xpSuccess) {
325 		if (ret != xpRetry) {
326 			XPC_DEACTIVATE_PARTITION(part, ret);
327 			return ret;
328 		}
329 
330 		dev_dbg(xpc_chan, "waiting to make first contact with "
331 			"partition %d\n", XPC_PARTID(part));
332 
333 		/* wait a 1/4 of a second or so */
334 		(void)msleep_interruptible(250);
335 
336 		if (part->act_state == XPC_P_DEACTIVATING)
337 			return part->reason;
338 	}
339 
340 	return xpc_mark_partition_active(part);
341 }
342 
343 /*
344  * The first kthread assigned to a newly activated partition is the one
345  * created by XPC HB with which it calls xpc_partition_up(). XPC hangs on to
346  * that kthread until the partition is brought down, at which time that kthread
347  * returns back to XPC HB. (The return of that kthread will signify to XPC HB
348  * that XPC has dismantled all communication infrastructure for the associated
349  * partition.) This kthread becomes the channel manager for that partition.
350  *
351  * Each active partition has a channel manager, who, besides connecting and
352  * disconnecting channels, will ensure that each of the partition's connected
353  * channels has the required number of assigned kthreads to get the work done.
354  */
355 static void
356 xpc_channel_mgr(struct xpc_partition *part)
357 {
358 	while (part->act_state != XPC_P_DEACTIVATING ||
359 	       atomic_read(&part->nchannels_active) > 0 ||
360 	       !xpc_partition_disengaged(part)) {
361 
362 		xpc_process_channel_activity(part);
363 
364 		/*
365 		 * Wait until we've been requested to activate kthreads or
366 		 * all of the channel's message queues have been torn down or
367 		 * a signal is pending.
368 		 *
369 		 * The channel_mgr_requests is set to 1 after being awakened,
370 		 * This is done to prevent the channel mgr from making one pass
371 		 * through the loop for each request, since he will
372 		 * be servicing all the requests in one pass. The reason it's
373 		 * set to 1 instead of 0 is so that other kthreads will know
374 		 * that the channel mgr is running and won't bother trying to
375 		 * wake him up.
376 		 */
377 		atomic_dec(&part->channel_mgr_requests);
378 		(void)wait_event_interruptible(part->channel_mgr_wq,
379 				(atomic_read(&part->channel_mgr_requests) > 0 ||
380 				 part->local_IPI_amo != 0 ||
381 				 (part->act_state == XPC_P_DEACTIVATING &&
382 				 atomic_read(&part->nchannels_active) == 0 &&
383 				 xpc_partition_disengaged(part))));
384 		atomic_set(&part->channel_mgr_requests, 1);
385 	}
386 }
387 
388 /*
389  * When XPC HB determines that a partition has come up, it will create a new
390  * kthread and that kthread will call this function to attempt to set up the
391  * basic infrastructure used for Cross Partition Communication with the newly
392  * upped partition.
393  *
394  * The kthread that was created by XPC HB and which setup the XPC
395  * infrastructure will remain assigned to the partition until the partition
396  * goes down. At which time the kthread will teardown the XPC infrastructure
397  * and then exit.
398  *
399  * XPC HB will put the remote partition's XPC per partition specific variables
400  * physical address into xpc_partitions[partid].remote_vars_part_pa prior to
401  * calling xpc_partition_up().
402  */
403 static void
404 xpc_partition_up(struct xpc_partition *part)
405 {
406 	DBUG_ON(part->channels != NULL);
407 
408 	dev_dbg(xpc_chan, "activating partition %d\n", XPC_PARTID(part));
409 
410 	if (xpc_setup_infrastructure(part) != xpSuccess)
411 		return;
412 
413 	/*
414 	 * The kthread that XPC HB called us with will become the
415 	 * channel manager for this partition. It will not return
416 	 * back to XPC HB until the partition's XPC infrastructure
417 	 * has been dismantled.
418 	 */
419 
420 	(void)xpc_part_ref(part);	/* this will always succeed */
421 
422 	if (xpc_make_first_contact(part) == xpSuccess)
423 		xpc_channel_mgr(part);
424 
425 	xpc_part_deref(part);
426 
427 	xpc_teardown_infrastructure(part);
428 }
429 
430 static int
431 xpc_activating(void *__partid)
432 {
433 	short partid = (u64)__partid;
434 	struct xpc_partition *part = &xpc_partitions[partid];
435 	unsigned long irq_flags;
436 
437 	DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
438 
439 	spin_lock_irqsave(&part->act_lock, irq_flags);
440 
441 	if (part->act_state == XPC_P_DEACTIVATING) {
442 		part->act_state = XPC_P_INACTIVE;
443 		spin_unlock_irqrestore(&part->act_lock, irq_flags);
444 		part->remote_rp_pa = 0;
445 		return 0;
446 	}
447 
448 	/* indicate the thread is activating */
449 	DBUG_ON(part->act_state != XPC_P_ACTIVATION_REQ);
450 	part->act_state = XPC_P_ACTIVATING;
451 
452 	XPC_SET_REASON(part, 0, 0);
453 	spin_unlock_irqrestore(&part->act_lock, irq_flags);
454 
455 	dev_dbg(xpc_part, "bringing partition %d up\n", partid);
456 
457 	/*
458 	 * Register the remote partition's AMOs with SAL so it can handle
459 	 * and cleanup errors within that address range should the remote
460 	 * partition go down. We don't unregister this range because it is
461 	 * difficult to tell when outstanding writes to the remote partition
462 	 * are finished and thus when it is safe to unregister. This should
463 	 * not result in wasted space in the SAL xp_addr_region table because
464 	 * we should get the same page for remote_amos_page_pa after module
465 	 * reloads and system reboots.
466 	 */
467 	if (sn_register_xp_addr_region(part->remote_amos_page_pa,
468 				       PAGE_SIZE, 1) < 0) {
469 		dev_warn(xpc_part, "xpc_partition_up(%d) failed to register "
470 			 "xp_addr region\n", partid);
471 
472 		spin_lock_irqsave(&part->act_lock, irq_flags);
473 		part->act_state = XPC_P_INACTIVE;
474 		XPC_SET_REASON(part, xpPhysAddrRegFailed, __LINE__);
475 		spin_unlock_irqrestore(&part->act_lock, irq_flags);
476 		part->remote_rp_pa = 0;
477 		return 0;
478 	}
479 
480 	xpc_allow_hb(partid, xpc_vars);
481 	xpc_IPI_send_activated(part);
482 
483 	/*
484 	 * xpc_partition_up() holds this thread and marks this partition as
485 	 * XPC_P_ACTIVE by calling xpc_hb_mark_active().
486 	 */
487 	(void)xpc_partition_up(part);
488 
489 	xpc_disallow_hb(partid, xpc_vars);
490 	xpc_mark_partition_inactive(part);
491 
492 	if (part->reason == xpReactivating) {
493 		/* interrupting ourselves results in activating partition */
494 		xpc_IPI_send_reactivate(part);
495 	}
496 
497 	return 0;
498 }
499 
500 void
501 xpc_activate_partition(struct xpc_partition *part)
502 {
503 	short partid = XPC_PARTID(part);
504 	unsigned long irq_flags;
505 	struct task_struct *kthread;
506 
507 	spin_lock_irqsave(&part->act_lock, irq_flags);
508 
509 	DBUG_ON(part->act_state != XPC_P_INACTIVE);
510 
511 	part->act_state = XPC_P_ACTIVATION_REQ;
512 	XPC_SET_REASON(part, xpCloneKThread, __LINE__);
513 
514 	spin_unlock_irqrestore(&part->act_lock, irq_flags);
515 
516 	kthread = kthread_run(xpc_activating, (void *)((u64)partid), "xpc%02d",
517 			      partid);
518 	if (IS_ERR(kthread)) {
519 		spin_lock_irqsave(&part->act_lock, irq_flags);
520 		part->act_state = XPC_P_INACTIVE;
521 		XPC_SET_REASON(part, xpCloneKThreadFailed, __LINE__);
522 		spin_unlock_irqrestore(&part->act_lock, irq_flags);
523 	}
524 }
525 
526 /*
527  * Handle the receipt of a SGI_XPC_NOTIFY IRQ by seeing whether the specified
528  * partition actually sent it. Since SGI_XPC_NOTIFY IRQs may be shared by more
529  * than one partition, we use an AMO_t structure per partition to indicate
530  * whether a partition has sent an IPI or not.  If it has, then wake up the
531  * associated kthread to handle it.
532  *
533  * All SGI_XPC_NOTIFY IRQs received by XPC are the result of IPIs sent by XPC
534  * running on other partitions.
535  *
536  * Noteworthy Arguments:
537  *
538  *	irq - Interrupt ReQuest number. NOT USED.
539  *
540  *	dev_id - partid of IPI's potential sender.
541  */
542 irqreturn_t
543 xpc_notify_IRQ_handler(int irq, void *dev_id)
544 {
545 	short partid = (short)(u64)dev_id;
546 	struct xpc_partition *part = &xpc_partitions[partid];
547 
548 	DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
549 
550 	if (xpc_part_ref(part)) {
551 		xpc_check_for_channel_activity(part);
552 
553 		xpc_part_deref(part);
554 	}
555 	return IRQ_HANDLED;
556 }
557 
558 /*
559  * Check to see if xpc_notify_IRQ_handler() dropped any IPIs on the floor
560  * because the write to their associated IPI amo completed after the IRQ/IPI
561  * was received.
562  */
563 void
564 xpc_dropped_IPI_check(struct xpc_partition *part)
565 {
566 	if (xpc_part_ref(part)) {
567 		xpc_check_for_channel_activity(part);
568 
569 		part->dropped_IPI_timer.expires = jiffies +
570 		    XPC_P_DROPPED_IPI_WAIT;
571 		add_timer(&part->dropped_IPI_timer);
572 		xpc_part_deref(part);
573 	}
574 }
575 
576 void
577 xpc_activate_kthreads(struct xpc_channel *ch, int needed)
578 {
579 	int idle = atomic_read(&ch->kthreads_idle);
580 	int assigned = atomic_read(&ch->kthreads_assigned);
581 	int wakeup;
582 
583 	DBUG_ON(needed <= 0);
584 
585 	if (idle > 0) {
586 		wakeup = (needed > idle) ? idle : needed;
587 		needed -= wakeup;
588 
589 		dev_dbg(xpc_chan, "wakeup %d idle kthreads, partid=%d, "
590 			"channel=%d\n", wakeup, ch->partid, ch->number);
591 
592 		/* only wakeup the requested number of kthreads */
593 		wake_up_nr(&ch->idle_wq, wakeup);
594 	}
595 
596 	if (needed <= 0)
597 		return;
598 
599 	if (needed + assigned > ch->kthreads_assigned_limit) {
600 		needed = ch->kthreads_assigned_limit - assigned;
601 		if (needed <= 0)
602 			return;
603 	}
604 
605 	dev_dbg(xpc_chan, "create %d new kthreads, partid=%d, channel=%d\n",
606 		needed, ch->partid, ch->number);
607 
608 	xpc_create_kthreads(ch, needed, 0);
609 }
610 
611 /*
612  * This function is where XPC's kthreads wait for messages to deliver.
613  */
614 static void
615 xpc_kthread_waitmsgs(struct xpc_partition *part, struct xpc_channel *ch)
616 {
617 	do {
618 		/* deliver messages to their intended recipients */
619 
620 		while (ch->w_local_GP.get < ch->w_remote_GP.put &&
621 		       !(ch->flags & XPC_C_DISCONNECTING)) {
622 			xpc_deliver_msg(ch);
623 		}
624 
625 		if (atomic_inc_return(&ch->kthreads_idle) >
626 		    ch->kthreads_idle_limit) {
627 			/* too many idle kthreads on this channel */
628 			atomic_dec(&ch->kthreads_idle);
629 			break;
630 		}
631 
632 		dev_dbg(xpc_chan, "idle kthread calling "
633 			"wait_event_interruptible_exclusive()\n");
634 
635 		(void)wait_event_interruptible_exclusive(ch->idle_wq,
636 				(ch->w_local_GP.get < ch->w_remote_GP.put ||
637 				 (ch->flags & XPC_C_DISCONNECTING)));
638 
639 		atomic_dec(&ch->kthreads_idle);
640 
641 	} while (!(ch->flags & XPC_C_DISCONNECTING));
642 }
643 
644 static int
645 xpc_kthread_start(void *args)
646 {
647 	short partid = XPC_UNPACK_ARG1(args);
648 	u16 ch_number = XPC_UNPACK_ARG2(args);
649 	struct xpc_partition *part = &xpc_partitions[partid];
650 	struct xpc_channel *ch;
651 	int n_needed;
652 	unsigned long irq_flags;
653 
654 	dev_dbg(xpc_chan, "kthread starting, partid=%d, channel=%d\n",
655 		partid, ch_number);
656 
657 	ch = &part->channels[ch_number];
658 
659 	if (!(ch->flags & XPC_C_DISCONNECTING)) {
660 
661 		/* let registerer know that connection has been established */
662 
663 		spin_lock_irqsave(&ch->lock, irq_flags);
664 		if (!(ch->flags & XPC_C_CONNECTEDCALLOUT)) {
665 			ch->flags |= XPC_C_CONNECTEDCALLOUT;
666 			spin_unlock_irqrestore(&ch->lock, irq_flags);
667 
668 			xpc_connected_callout(ch);
669 
670 			spin_lock_irqsave(&ch->lock, irq_flags);
671 			ch->flags |= XPC_C_CONNECTEDCALLOUT_MADE;
672 			spin_unlock_irqrestore(&ch->lock, irq_flags);
673 
674 			/*
675 			 * It is possible that while the callout was being
676 			 * made that the remote partition sent some messages.
677 			 * If that is the case, we may need to activate
678 			 * additional kthreads to help deliver them. We only
679 			 * need one less than total #of messages to deliver.
680 			 */
681 			n_needed = ch->w_remote_GP.put - ch->w_local_GP.get - 1;
682 			if (n_needed > 0 && !(ch->flags & XPC_C_DISCONNECTING))
683 				xpc_activate_kthreads(ch, n_needed);
684 
685 		} else {
686 			spin_unlock_irqrestore(&ch->lock, irq_flags);
687 		}
688 
689 		xpc_kthread_waitmsgs(part, ch);
690 	}
691 
692 	/* let registerer know that connection is disconnecting */
693 
694 	spin_lock_irqsave(&ch->lock, irq_flags);
695 	if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
696 	    !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
697 		ch->flags |= XPC_C_DISCONNECTINGCALLOUT;
698 		spin_unlock_irqrestore(&ch->lock, irq_flags);
699 
700 		xpc_disconnect_callout(ch, xpDisconnecting);
701 
702 		spin_lock_irqsave(&ch->lock, irq_flags);
703 		ch->flags |= XPC_C_DISCONNECTINGCALLOUT_MADE;
704 	}
705 	spin_unlock_irqrestore(&ch->lock, irq_flags);
706 
707 	if (atomic_dec_return(&ch->kthreads_assigned) == 0) {
708 		if (atomic_dec_return(&part->nchannels_engaged) == 0) {
709 			xpc_mark_partition_disengaged(part);
710 			xpc_IPI_send_disengage(part);
711 		}
712 	}
713 
714 	xpc_msgqueue_deref(ch);
715 
716 	dev_dbg(xpc_chan, "kthread exiting, partid=%d, channel=%d\n",
717 		partid, ch_number);
718 
719 	xpc_part_deref(part);
720 	return 0;
721 }
722 
723 /*
724  * For each partition that XPC has established communications with, there is
725  * a minimum of one kernel thread assigned to perform any operation that
726  * may potentially sleep or block (basically the callouts to the asynchronous
727  * functions registered via xpc_connect()).
728  *
729  * Additional kthreads are created and destroyed by XPC as the workload
730  * demands.
731  *
732  * A kthread is assigned to one of the active channels that exists for a given
733  * partition.
734  */
735 void
736 xpc_create_kthreads(struct xpc_channel *ch, int needed,
737 		    int ignore_disconnecting)
738 {
739 	unsigned long irq_flags;
740 	u64 args = XPC_PACK_ARGS(ch->partid, ch->number);
741 	struct xpc_partition *part = &xpc_partitions[ch->partid];
742 	struct task_struct *kthread;
743 
744 	while (needed-- > 0) {
745 
746 		/*
747 		 * The following is done on behalf of the newly created
748 		 * kthread. That kthread is responsible for doing the
749 		 * counterpart to the following before it exits.
750 		 */
751 		if (ignore_disconnecting) {
752 			if (!atomic_inc_not_zero(&ch->kthreads_assigned)) {
753 				/* kthreads assigned had gone to zero */
754 				BUG_ON(!(ch->flags &
755 					 XPC_C_DISCONNECTINGCALLOUT_MADE));
756 				break;
757 			}
758 
759 		} else if (ch->flags & XPC_C_DISCONNECTING) {
760 			break;
761 
762 		} else if (atomic_inc_return(&ch->kthreads_assigned) == 1) {
763 			if (atomic_inc_return(&part->nchannels_engaged) == 1)
764 				xpc_mark_partition_engaged(part);
765 		}
766 		(void)xpc_part_ref(part);
767 		xpc_msgqueue_ref(ch);
768 
769 		kthread = kthread_run(xpc_kthread_start, (void *)args,
770 				      "xpc%02dc%d", ch->partid, ch->number);
771 		if (IS_ERR(kthread)) {
772 			/* the fork failed */
773 
774 			/*
775 			 * NOTE: if (ignore_disconnecting &&
776 			 * !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) is true,
777 			 * then we'll deadlock if all other kthreads assigned
778 			 * to this channel are blocked in the channel's
779 			 * registerer, because the only thing that will unblock
780 			 * them is the xpDisconnecting callout that this
781 			 * failed kthread_run() would have made.
782 			 */
783 
784 			if (atomic_dec_return(&ch->kthreads_assigned) == 0 &&
785 			    atomic_dec_return(&part->nchannels_engaged) == 0) {
786 				xpc_mark_partition_disengaged(part);
787 				xpc_IPI_send_disengage(part);
788 			}
789 			xpc_msgqueue_deref(ch);
790 			xpc_part_deref(part);
791 
792 			if (atomic_read(&ch->kthreads_assigned) <
793 			    ch->kthreads_idle_limit) {
794 				/*
795 				 * Flag this as an error only if we have an
796 				 * insufficient #of kthreads for the channel
797 				 * to function.
798 				 */
799 				spin_lock_irqsave(&ch->lock, irq_flags);
800 				XPC_DISCONNECT_CHANNEL(ch, xpLackOfResources,
801 						       &irq_flags);
802 				spin_unlock_irqrestore(&ch->lock, irq_flags);
803 			}
804 			break;
805 		}
806 	}
807 }
808 
809 void
810 xpc_disconnect_wait(int ch_number)
811 {
812 	unsigned long irq_flags;
813 	short partid;
814 	struct xpc_partition *part;
815 	struct xpc_channel *ch;
816 	int wakeup_channel_mgr;
817 
818 	/* now wait for all callouts to the caller's function to cease */
819 	for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
820 		part = &xpc_partitions[partid];
821 
822 		if (!xpc_part_ref(part))
823 			continue;
824 
825 		ch = &part->channels[ch_number];
826 
827 		if (!(ch->flags & XPC_C_WDISCONNECT)) {
828 			xpc_part_deref(part);
829 			continue;
830 		}
831 
832 		wait_for_completion(&ch->wdisconnect_wait);
833 
834 		spin_lock_irqsave(&ch->lock, irq_flags);
835 		DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
836 		wakeup_channel_mgr = 0;
837 
838 		if (ch->delayed_IPI_flags) {
839 			if (part->act_state != XPC_P_DEACTIVATING) {
840 				spin_lock(&part->IPI_lock);
841 				XPC_SET_IPI_FLAGS(part->local_IPI_amo,
842 						  ch->number,
843 						  ch->delayed_IPI_flags);
844 				spin_unlock(&part->IPI_lock);
845 				wakeup_channel_mgr = 1;
846 			}
847 			ch->delayed_IPI_flags = 0;
848 		}
849 
850 		ch->flags &= ~XPC_C_WDISCONNECT;
851 		spin_unlock_irqrestore(&ch->lock, irq_flags);
852 
853 		if (wakeup_channel_mgr)
854 			xpc_wakeup_channel_mgr(part);
855 
856 		xpc_part_deref(part);
857 	}
858 }
859 
860 static void
861 xpc_do_exit(enum xp_retval reason)
862 {
863 	short partid;
864 	int active_part_count, printed_waiting_msg = 0;
865 	struct xpc_partition *part;
866 	unsigned long printmsg_time, disengage_request_timeout = 0;
867 
868 	/* a 'rmmod XPC' and a 'reboot' cannot both end up here together */
869 	DBUG_ON(xpc_exiting == 1);
870 
871 	/*
872 	 * Let the heartbeat checker thread and the discovery thread
873 	 * (if one is running) know that they should exit. Also wake up
874 	 * the heartbeat checker thread in case it's sleeping.
875 	 */
876 	xpc_exiting = 1;
877 	wake_up_interruptible(&xpc_act_IRQ_wq);
878 
879 	/* ignore all incoming interrupts */
880 	free_irq(SGI_XPC_ACTIVATE, NULL);
881 
882 	/* wait for the discovery thread to exit */
883 	wait_for_completion(&xpc_discovery_exited);
884 
885 	/* wait for the heartbeat checker thread to exit */
886 	wait_for_completion(&xpc_hb_checker_exited);
887 
888 	/* sleep for a 1/3 of a second or so */
889 	(void)msleep_interruptible(300);
890 
891 	/* wait for all partitions to become inactive */
892 
893 	printmsg_time = jiffies + (XPC_DISENGAGE_PRINTMSG_INTERVAL * HZ);
894 	xpc_disengage_request_timedout = 0;
895 
896 	do {
897 		active_part_count = 0;
898 
899 		for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
900 			part = &xpc_partitions[partid];
901 
902 			if (xpc_partition_disengaged(part) &&
903 			    part->act_state == XPC_P_INACTIVE) {
904 				continue;
905 			}
906 
907 			active_part_count++;
908 
909 			XPC_DEACTIVATE_PARTITION(part, reason);
910 
911 			if (part->disengage_request_timeout >
912 			    disengage_request_timeout) {
913 				disengage_request_timeout =
914 				    part->disengage_request_timeout;
915 			}
916 		}
917 
918 		if (xpc_partition_engaged(-1UL)) {
919 			if (time_after(jiffies, printmsg_time)) {
920 				dev_info(xpc_part, "waiting for remote "
921 					 "partitions to disengage, timeout in "
922 					 "%ld seconds\n",
923 					 (disengage_request_timeout - jiffies)
924 					 / HZ);
925 				printmsg_time = jiffies +
926 				    (XPC_DISENGAGE_PRINTMSG_INTERVAL * HZ);
927 				printed_waiting_msg = 1;
928 			}
929 
930 		} else if (active_part_count > 0) {
931 			if (printed_waiting_msg) {
932 				dev_info(xpc_part, "waiting for local partition"
933 					 " to disengage\n");
934 				printed_waiting_msg = 0;
935 			}
936 
937 		} else {
938 			if (!xpc_disengage_request_timedout) {
939 				dev_info(xpc_part, "all partitions have "
940 					 "disengaged\n");
941 			}
942 			break;
943 		}
944 
945 		/* sleep for a 1/3 of a second or so */
946 		(void)msleep_interruptible(300);
947 
948 	} while (1);
949 
950 	DBUG_ON(xpc_partition_engaged(-1UL));
951 
952 	/* indicate to others that our reserved page is uninitialized */
953 	xpc_rsvd_page->vars_pa = 0;
954 
955 	/* now it's time to eliminate our heartbeat */
956 	del_timer_sync(&xpc_hb_timer);
957 	DBUG_ON(xpc_vars->heartbeating_to_mask != 0);
958 
959 	if (reason == xpUnloading) {
960 		/* take ourselves off of the reboot_notifier_list */
961 		(void)unregister_reboot_notifier(&xpc_reboot_notifier);
962 
963 		/* take ourselves off of the die_notifier list */
964 		(void)unregister_die_notifier(&xpc_die_notifier);
965 	}
966 
967 	/* close down protections for IPI operations */
968 	xpc_restrict_IPI_ops();
969 
970 	/* clear the interface to XPC's functions */
971 	xpc_clear_interface();
972 
973 	if (xpc_sysctl)
974 		unregister_sysctl_table(xpc_sysctl);
975 
976 	kfree(xpc_remote_copy_buffer_base);
977 }
978 
979 /*
980  * This function is called when the system is being rebooted.
981  */
982 static int
983 xpc_system_reboot(struct notifier_block *nb, unsigned long event, void *unused)
984 {
985 	enum xp_retval reason;
986 
987 	switch (event) {
988 	case SYS_RESTART:
989 		reason = xpSystemReboot;
990 		break;
991 	case SYS_HALT:
992 		reason = xpSystemHalt;
993 		break;
994 	case SYS_POWER_OFF:
995 		reason = xpSystemPoweroff;
996 		break;
997 	default:
998 		reason = xpSystemGoingDown;
999 	}
1000 
1001 	xpc_do_exit(reason);
1002 	return NOTIFY_DONE;
1003 }
1004 
1005 /*
1006  * Notify other partitions to disengage from all references to our memory.
1007  */
1008 static void
1009 xpc_die_disengage(void)
1010 {
1011 	struct xpc_partition *part;
1012 	short partid;
1013 	unsigned long engaged;
1014 	long time, printmsg_time, disengage_request_timeout;
1015 
1016 	/* keep xpc_hb_checker thread from doing anything (just in case) */
1017 	xpc_exiting = 1;
1018 
1019 	xpc_vars->heartbeating_to_mask = 0;	/* indicate we're deactivated */
1020 
1021 	for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
1022 		part = &xpc_partitions[partid];
1023 
1024 		if (!XPC_SUPPORTS_DISENGAGE_REQUEST(part->
1025 		    remote_vars_version)) {
1026 
1027 			/* just in case it was left set by an earlier XPC */
1028 			xpc_clear_partition_engaged(1UL << partid);
1029 			continue;
1030 		}
1031 
1032 		if (xpc_partition_engaged(1UL << partid) ||
1033 		    part->act_state != XPC_P_INACTIVE) {
1034 			xpc_request_partition_disengage(part);
1035 			xpc_mark_partition_disengaged(part);
1036 			xpc_IPI_send_disengage(part);
1037 		}
1038 	}
1039 
1040 	time = rtc_time();
1041 	printmsg_time = time +
1042 	    (XPC_DISENGAGE_PRINTMSG_INTERVAL * sn_rtc_cycles_per_second);
1043 	disengage_request_timeout = time +
1044 	    (xpc_disengage_request_timelimit * sn_rtc_cycles_per_second);
1045 
1046 	/* wait for all other partitions to disengage from us */
1047 
1048 	while (1) {
1049 		engaged = xpc_partition_engaged(-1UL);
1050 		if (!engaged) {
1051 			dev_info(xpc_part, "all partitions have disengaged\n");
1052 			break;
1053 		}
1054 
1055 		time = rtc_time();
1056 		if (time >= disengage_request_timeout) {
1057 			for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
1058 				if (engaged & (1UL << partid)) {
1059 					dev_info(xpc_part, "disengage from "
1060 						 "remote partition %d timed "
1061 						 "out\n", partid);
1062 				}
1063 			}
1064 			break;
1065 		}
1066 
1067 		if (time >= printmsg_time) {
1068 			dev_info(xpc_part, "waiting for remote partitions to "
1069 				 "disengage, timeout in %ld seconds\n",
1070 				 (disengage_request_timeout - time) /
1071 				 sn_rtc_cycles_per_second);
1072 			printmsg_time = time +
1073 			    (XPC_DISENGAGE_PRINTMSG_INTERVAL *
1074 			     sn_rtc_cycles_per_second);
1075 		}
1076 	}
1077 }
1078 
1079 /*
1080  * This function is called when the system is being restarted or halted due
1081  * to some sort of system failure. If this is the case we need to notify the
1082  * other partitions to disengage from all references to our memory.
1083  * This function can also be called when our heartbeater could be offlined
1084  * for a time. In this case we need to notify other partitions to not worry
1085  * about the lack of a heartbeat.
1086  */
1087 static int
1088 xpc_system_die(struct notifier_block *nb, unsigned long event, void *unused)
1089 {
1090 	switch (event) {
1091 	case DIE_MACHINE_RESTART:
1092 	case DIE_MACHINE_HALT:
1093 		xpc_die_disengage();
1094 		break;
1095 
1096 	case DIE_KDEBUG_ENTER:
1097 		/* Should lack of heartbeat be ignored by other partitions? */
1098 		if (!xpc_kdebug_ignore)
1099 			break;
1100 
1101 		/* fall through */
1102 	case DIE_MCA_MONARCH_ENTER:
1103 	case DIE_INIT_MONARCH_ENTER:
1104 		xpc_vars->heartbeat++;
1105 		xpc_vars->heartbeat_offline = 1;
1106 		break;
1107 
1108 	case DIE_KDEBUG_LEAVE:
1109 		/* Is lack of heartbeat being ignored by other partitions? */
1110 		if (!xpc_kdebug_ignore)
1111 			break;
1112 
1113 		/* fall through */
1114 	case DIE_MCA_MONARCH_LEAVE:
1115 	case DIE_INIT_MONARCH_LEAVE:
1116 		xpc_vars->heartbeat++;
1117 		xpc_vars->heartbeat_offline = 0;
1118 		break;
1119 	}
1120 
1121 	return NOTIFY_DONE;
1122 }
1123 
1124 int __init
1125 xpc_init(void)
1126 {
1127 	int ret;
1128 	short partid;
1129 	struct xpc_partition *part;
1130 	struct task_struct *kthread;
1131 	size_t buf_size;
1132 
1133 	if (!ia64_platform_is("sn2"))
1134 		return -ENODEV;
1135 
1136 	buf_size = max(XPC_RP_VARS_SIZE,
1137 		       XPC_RP_HEADER_SIZE + XP_NASID_MASK_BYTES);
1138 	xpc_remote_copy_buffer = xpc_kmalloc_cacheline_aligned(buf_size,
1139 							       GFP_KERNEL,
1140 						  &xpc_remote_copy_buffer_base);
1141 	if (xpc_remote_copy_buffer == NULL)
1142 		return -ENOMEM;
1143 
1144 	snprintf(xpc_part->bus_id, BUS_ID_SIZE, "part");
1145 	snprintf(xpc_chan->bus_id, BUS_ID_SIZE, "chan");
1146 
1147 	xpc_sysctl = register_sysctl_table(xpc_sys_dir);
1148 
1149 	/*
1150 	 * The first few fields of each entry of xpc_partitions[] need to
1151 	 * be initialized now so that calls to xpc_connect() and
1152 	 * xpc_disconnect() can be made prior to the activation of any remote
1153 	 * partition. NOTE THAT NONE OF THE OTHER FIELDS BELONGING TO THESE
1154 	 * ENTRIES ARE MEANINGFUL UNTIL AFTER AN ENTRY'S CORRESPONDING
1155 	 * PARTITION HAS BEEN ACTIVATED.
1156 	 */
1157 	for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
1158 		part = &xpc_partitions[partid];
1159 
1160 		DBUG_ON((u64)part != L1_CACHE_ALIGN((u64)part));
1161 
1162 		part->act_IRQ_rcvd = 0;
1163 		spin_lock_init(&part->act_lock);
1164 		part->act_state = XPC_P_INACTIVE;
1165 		XPC_SET_REASON(part, 0, 0);
1166 
1167 		init_timer(&part->disengage_request_timer);
1168 		part->disengage_request_timer.function =
1169 		    xpc_timeout_partition_disengage_request;
1170 		part->disengage_request_timer.data = (unsigned long)part;
1171 
1172 		part->setup_state = XPC_P_UNSET;
1173 		init_waitqueue_head(&part->teardown_wq);
1174 		atomic_set(&part->references, 0);
1175 	}
1176 
1177 	/*
1178 	 * Open up protections for IPI operations (and AMO operations on
1179 	 * Shub 1.1 systems).
1180 	 */
1181 	xpc_allow_IPI_ops();
1182 
1183 	/*
1184 	 * Interrupts being processed will increment this atomic variable and
1185 	 * awaken the heartbeat thread which will process the interrupts.
1186 	 */
1187 	atomic_set(&xpc_act_IRQ_rcvd, 0);
1188 
1189 	/*
1190 	 * This is safe to do before the xpc_hb_checker thread has started
1191 	 * because the handler releases a wait queue.  If an interrupt is
1192 	 * received before the thread is waiting, it will not go to sleep,
1193 	 * but rather immediately process the interrupt.
1194 	 */
1195 	ret = request_irq(SGI_XPC_ACTIVATE, xpc_act_IRQ_handler, 0,
1196 			  "xpc hb", NULL);
1197 	if (ret != 0) {
1198 		dev_err(xpc_part, "can't register ACTIVATE IRQ handler, "
1199 			"errno=%d\n", -ret);
1200 
1201 		xpc_restrict_IPI_ops();
1202 
1203 		if (xpc_sysctl)
1204 			unregister_sysctl_table(xpc_sysctl);
1205 
1206 		kfree(xpc_remote_copy_buffer_base);
1207 		return -EBUSY;
1208 	}
1209 
1210 	/*
1211 	 * Fill the partition reserved page with the information needed by
1212 	 * other partitions to discover we are alive and establish initial
1213 	 * communications.
1214 	 */
1215 	xpc_rsvd_page = xpc_rsvd_page_init();
1216 	if (xpc_rsvd_page == NULL) {
1217 		dev_err(xpc_part, "could not setup our reserved page\n");
1218 
1219 		free_irq(SGI_XPC_ACTIVATE, NULL);
1220 		xpc_restrict_IPI_ops();
1221 
1222 		if (xpc_sysctl)
1223 			unregister_sysctl_table(xpc_sysctl);
1224 
1225 		kfree(xpc_remote_copy_buffer_base);
1226 		return -EBUSY;
1227 	}
1228 
1229 	/* add ourselves to the reboot_notifier_list */
1230 	ret = register_reboot_notifier(&xpc_reboot_notifier);
1231 	if (ret != 0)
1232 		dev_warn(xpc_part, "can't register reboot notifier\n");
1233 
1234 	/* add ourselves to the die_notifier list */
1235 	ret = register_die_notifier(&xpc_die_notifier);
1236 	if (ret != 0)
1237 		dev_warn(xpc_part, "can't register die notifier\n");
1238 
1239 	init_timer(&xpc_hb_timer);
1240 	xpc_hb_timer.function = xpc_hb_beater;
1241 
1242 	/*
1243 	 * The real work-horse behind xpc.  This processes incoming
1244 	 * interrupts and monitors remote heartbeats.
1245 	 */
1246 	kthread = kthread_run(xpc_hb_checker, NULL, XPC_HB_CHECK_THREAD_NAME);
1247 	if (IS_ERR(kthread)) {
1248 		dev_err(xpc_part, "failed while forking hb check thread\n");
1249 
1250 		/* indicate to others that our reserved page is uninitialized */
1251 		xpc_rsvd_page->vars_pa = 0;
1252 
1253 		/* take ourselves off of the reboot_notifier_list */
1254 		(void)unregister_reboot_notifier(&xpc_reboot_notifier);
1255 
1256 		/* take ourselves off of the die_notifier list */
1257 		(void)unregister_die_notifier(&xpc_die_notifier);
1258 
1259 		del_timer_sync(&xpc_hb_timer);
1260 		free_irq(SGI_XPC_ACTIVATE, NULL);
1261 		xpc_restrict_IPI_ops();
1262 
1263 		if (xpc_sysctl)
1264 			unregister_sysctl_table(xpc_sysctl);
1265 
1266 		kfree(xpc_remote_copy_buffer_base);
1267 		return -EBUSY;
1268 	}
1269 
1270 	/*
1271 	 * Startup a thread that will attempt to discover other partitions to
1272 	 * activate based on info provided by SAL. This new thread is short
1273 	 * lived and will exit once discovery is complete.
1274 	 */
1275 	kthread = kthread_run(xpc_initiate_discovery, NULL,
1276 			      XPC_DISCOVERY_THREAD_NAME);
1277 	if (IS_ERR(kthread)) {
1278 		dev_err(xpc_part, "failed while forking discovery thread\n");
1279 
1280 		/* mark this new thread as a non-starter */
1281 		complete(&xpc_discovery_exited);
1282 
1283 		xpc_do_exit(xpUnloading);
1284 		return -EBUSY;
1285 	}
1286 
1287 	/* set the interface to point at XPC's functions */
1288 	xpc_set_interface(xpc_initiate_connect, xpc_initiate_disconnect,
1289 			  xpc_initiate_allocate, xpc_initiate_send,
1290 			  xpc_initiate_send_notify, xpc_initiate_received,
1291 			  xpc_initiate_partid_to_nasids);
1292 
1293 	return 0;
1294 }
1295 
1296 module_init(xpc_init);
1297 
1298 void __exit
1299 xpc_exit(void)
1300 {
1301 	xpc_do_exit(xpUnloading);
1302 }
1303 
1304 module_exit(xpc_exit);
1305 
1306 MODULE_AUTHOR("Silicon Graphics, Inc.");
1307 MODULE_DESCRIPTION("Cross Partition Communication (XPC) support");
1308 MODULE_LICENSE("GPL");
1309 
1310 module_param(xpc_hb_interval, int, 0);
1311 MODULE_PARM_DESC(xpc_hb_interval, "Number of seconds between "
1312 		 "heartbeat increments.");
1313 
1314 module_param(xpc_hb_check_interval, int, 0);
1315 MODULE_PARM_DESC(xpc_hb_check_interval, "Number of seconds between "
1316 		 "heartbeat checks.");
1317 
1318 module_param(xpc_disengage_request_timelimit, int, 0);
1319 MODULE_PARM_DESC(xpc_disengage_request_timelimit, "Number of seconds to wait "
1320 		 "for disengage request to complete.");
1321 
1322 module_param(xpc_kdebug_ignore, int, 0);
1323 MODULE_PARM_DESC(xpc_kdebug_ignore, "Should lack of heartbeat be ignored by "
1324 		 "other partitions when dropping into kdebug.");
1325