1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2004-2008 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8 
9 /*
10  * Cross Partition Communication (XPC) channel support.
11  *
12  *	This is the part of XPC that manages the channels and
13  *	sends/receives messages across them to/from other partitions.
14  *
15  */
16 
17 #include <linux/kernel.h>
18 #include <linux/init.h>
19 #include <linux/sched.h>
20 #include <linux/cache.h>
21 #include <linux/interrupt.h>
22 #include <linux/mutex.h>
23 #include <linux/completion.h>
24 #include <asm/sn/bte.h>
25 #include <asm/sn/sn_sal.h>
26 #include "xpc.h"
27 
28 /*
29  * Guarantee that the kzalloc'd memory is cacheline aligned.
30  */
31 static void *
32 xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base)
33 {
34 	/* see if kzalloc will give us cachline aligned memory by default */
35 	*base = kzalloc(size, flags);
36 	if (*base == NULL)
37 		return NULL;
38 
39 	if ((u64)*base == L1_CACHE_ALIGN((u64)*base))
40 		return *base;
41 
42 	kfree(*base);
43 
44 	/* nope, we'll have to do it ourselves */
45 	*base = kzalloc(size + L1_CACHE_BYTES, flags);
46 	if (*base == NULL)
47 		return NULL;
48 
49 	return (void *)L1_CACHE_ALIGN((u64)*base);
50 }
51 
52 /*
53  * Set up the initial values for the XPartition Communication channels.
54  */
55 static void
56 xpc_initialize_channels(struct xpc_partition *part, short partid)
57 {
58 	int ch_number;
59 	struct xpc_channel *ch;
60 
61 	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
62 		ch = &part->channels[ch_number];
63 
64 		ch->partid = partid;
65 		ch->number = ch_number;
66 		ch->flags = XPC_C_DISCONNECTED;
67 
68 		ch->local_GP = &part->local_GPs[ch_number];
69 		ch->local_openclose_args =
70 		    &part->local_openclose_args[ch_number];
71 
72 		atomic_set(&ch->kthreads_assigned, 0);
73 		atomic_set(&ch->kthreads_idle, 0);
74 		atomic_set(&ch->kthreads_active, 0);
75 
76 		atomic_set(&ch->references, 0);
77 		atomic_set(&ch->n_to_notify, 0);
78 
79 		spin_lock_init(&ch->lock);
80 		mutex_init(&ch->msg_to_pull_mutex);
81 		init_completion(&ch->wdisconnect_wait);
82 
83 		atomic_set(&ch->n_on_msg_allocate_wq, 0);
84 		init_waitqueue_head(&ch->msg_allocate_wq);
85 		init_waitqueue_head(&ch->idle_wq);
86 	}
87 }
88 
89 /*
90  * Setup the infrastructure necessary to support XPartition Communication
91  * between the specified remote partition and the local one.
92  */
93 enum xp_retval
94 xpc_setup_infrastructure(struct xpc_partition *part)
95 {
96 	int ret, cpuid;
97 	struct timer_list *timer;
98 	short partid = XPC_PARTID(part);
99 
100 	/*
101 	 * Zero out MOST of the entry for this partition. Only the fields
102 	 * starting with `nchannels' will be zeroed. The preceding fields must
103 	 * remain `viable' across partition ups and downs, since they may be
104 	 * referenced during this memset() operation.
105 	 */
106 	memset(&part->nchannels, 0, sizeof(struct xpc_partition) -
107 	       offsetof(struct xpc_partition, nchannels));
108 
109 	/*
110 	 * Allocate all of the channel structures as a contiguous chunk of
111 	 * memory.
112 	 */
113 	part->channels = kzalloc(sizeof(struct xpc_channel) * XPC_NCHANNELS,
114 				 GFP_KERNEL);
115 	if (part->channels == NULL) {
116 		dev_err(xpc_chan, "can't get memory for channels\n");
117 		return xpNoMemory;
118 	}
119 
120 	part->nchannels = XPC_NCHANNELS;
121 
122 	/* allocate all the required GET/PUT values */
123 
124 	part->local_GPs = xpc_kzalloc_cacheline_aligned(XPC_GP_SIZE,
125 							GFP_KERNEL,
126 							&part->local_GPs_base);
127 	if (part->local_GPs == NULL) {
128 		kfree(part->channels);
129 		part->channels = NULL;
130 		dev_err(xpc_chan, "can't get memory for local get/put "
131 			"values\n");
132 		return xpNoMemory;
133 	}
134 
135 	part->remote_GPs = xpc_kzalloc_cacheline_aligned(XPC_GP_SIZE,
136 							 GFP_KERNEL,
137 							 &part->
138 							 remote_GPs_base);
139 	if (part->remote_GPs == NULL) {
140 		dev_err(xpc_chan, "can't get memory for remote get/put "
141 			"values\n");
142 		kfree(part->local_GPs_base);
143 		part->local_GPs = NULL;
144 		kfree(part->channels);
145 		part->channels = NULL;
146 		return xpNoMemory;
147 	}
148 
149 	/* allocate all the required open and close args */
150 
151 	part->local_openclose_args =
152 	    xpc_kzalloc_cacheline_aligned(XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL,
153 					  &part->local_openclose_args_base);
154 	if (part->local_openclose_args == NULL) {
155 		dev_err(xpc_chan, "can't get memory for local connect args\n");
156 		kfree(part->remote_GPs_base);
157 		part->remote_GPs = NULL;
158 		kfree(part->local_GPs_base);
159 		part->local_GPs = NULL;
160 		kfree(part->channels);
161 		part->channels = NULL;
162 		return xpNoMemory;
163 	}
164 
165 	part->remote_openclose_args =
166 	    xpc_kzalloc_cacheline_aligned(XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL,
167 					  &part->remote_openclose_args_base);
168 	if (part->remote_openclose_args == NULL) {
169 		dev_err(xpc_chan, "can't get memory for remote connect args\n");
170 		kfree(part->local_openclose_args_base);
171 		part->local_openclose_args = NULL;
172 		kfree(part->remote_GPs_base);
173 		part->remote_GPs = NULL;
174 		kfree(part->local_GPs_base);
175 		part->local_GPs = NULL;
176 		kfree(part->channels);
177 		part->channels = NULL;
178 		return xpNoMemory;
179 	}
180 
181 	xpc_initialize_channels(part, partid);
182 
183 	atomic_set(&part->nchannels_active, 0);
184 	atomic_set(&part->nchannels_engaged, 0);
185 
186 	/* local_IPI_amo were set to 0 by an earlier memset() */
187 
188 	/* Initialize this partitions AMO_t structure */
189 	part->local_IPI_amo_va = xpc_IPI_init(partid);
190 
191 	spin_lock_init(&part->IPI_lock);
192 
193 	atomic_set(&part->channel_mgr_requests, 1);
194 	init_waitqueue_head(&part->channel_mgr_wq);
195 
196 	sprintf(part->IPI_owner, "xpc%02d", partid);
197 	ret = request_irq(SGI_XPC_NOTIFY, xpc_notify_IRQ_handler, IRQF_SHARED,
198 			  part->IPI_owner, (void *)(u64)partid);
199 	if (ret != 0) {
200 		dev_err(xpc_chan, "can't register NOTIFY IRQ handler, "
201 			"errno=%d\n", -ret);
202 		kfree(part->remote_openclose_args_base);
203 		part->remote_openclose_args = NULL;
204 		kfree(part->local_openclose_args_base);
205 		part->local_openclose_args = NULL;
206 		kfree(part->remote_GPs_base);
207 		part->remote_GPs = NULL;
208 		kfree(part->local_GPs_base);
209 		part->local_GPs = NULL;
210 		kfree(part->channels);
211 		part->channels = NULL;
212 		return xpLackOfResources;
213 	}
214 
215 	/* Setup a timer to check for dropped IPIs */
216 	timer = &part->dropped_IPI_timer;
217 	init_timer(timer);
218 	timer->function = (void (*)(unsigned long))xpc_dropped_IPI_check;
219 	timer->data = (unsigned long)part;
220 	timer->expires = jiffies + XPC_P_DROPPED_IPI_WAIT;
221 	add_timer(timer);
222 
223 	/*
224 	 * With the setting of the partition setup_state to XPC_P_SETUP, we're
225 	 * declaring that this partition is ready to go.
226 	 */
227 	part->setup_state = XPC_P_SETUP;
228 
229 	/*
230 	 * Setup the per partition specific variables required by the
231 	 * remote partition to establish channel connections with us.
232 	 *
233 	 * The setting of the magic # indicates that these per partition
234 	 * specific variables are ready to be used.
235 	 */
236 	xpc_vars_part[partid].GPs_pa = __pa(part->local_GPs);
237 	xpc_vars_part[partid].openclose_args_pa =
238 	    __pa(part->local_openclose_args);
239 	xpc_vars_part[partid].IPI_amo_pa = __pa(part->local_IPI_amo_va);
240 	cpuid = raw_smp_processor_id();	/* any CPU in this partition will do */
241 	xpc_vars_part[partid].IPI_nasid = cpuid_to_nasid(cpuid);
242 	xpc_vars_part[partid].IPI_phys_cpuid = cpu_physical_id(cpuid);
243 	xpc_vars_part[partid].nchannels = part->nchannels;
244 	xpc_vars_part[partid].magic = XPC_VP_MAGIC1;
245 
246 	return xpSuccess;
247 }
248 
249 /*
250  * Create a wrapper that hides the underlying mechanism for pulling a cacheline
251  * (or multiple cachelines) from a remote partition.
252  *
253  * src must be a cacheline aligned physical address on the remote partition.
254  * dst must be a cacheline aligned virtual address on this partition.
255  * cnt must be an cacheline sized
256  */
257 static enum xp_retval
258 xpc_pull_remote_cachelines(struct xpc_partition *part, void *dst,
259 			   const void *src, size_t cnt)
260 {
261 	bte_result_t bte_ret;
262 
263 	DBUG_ON((u64)src != L1_CACHE_ALIGN((u64)src));
264 	DBUG_ON((u64)dst != L1_CACHE_ALIGN((u64)dst));
265 	DBUG_ON(cnt != L1_CACHE_ALIGN(cnt));
266 
267 	if (part->act_state == XPC_P_DEACTIVATING)
268 		return part->reason;
269 
270 	bte_ret = xp_bte_copy((u64)src, (u64)dst, (u64)cnt,
271 			      (BTE_NORMAL | BTE_WACQUIRE), NULL);
272 	if (bte_ret == BTE_SUCCESS)
273 		return xpSuccess;
274 
275 	dev_dbg(xpc_chan, "xp_bte_copy() from partition %d failed, ret=%d\n",
276 		XPC_PARTID(part), bte_ret);
277 
278 	return xpc_map_bte_errors(bte_ret);
279 }
280 
281 /*
282  * Pull the remote per partition specific variables from the specified
283  * partition.
284  */
285 enum xp_retval
286 xpc_pull_remote_vars_part(struct xpc_partition *part)
287 {
288 	u8 buffer[L1_CACHE_BYTES * 2];
289 	struct xpc_vars_part *pulled_entry_cacheline =
290 	    (struct xpc_vars_part *)L1_CACHE_ALIGN((u64)buffer);
291 	struct xpc_vars_part *pulled_entry;
292 	u64 remote_entry_cacheline_pa, remote_entry_pa;
293 	short partid = XPC_PARTID(part);
294 	enum xp_retval ret;
295 
296 	/* pull the cacheline that contains the variables we're interested in */
297 
298 	DBUG_ON(part->remote_vars_part_pa !=
299 		L1_CACHE_ALIGN(part->remote_vars_part_pa));
300 	DBUG_ON(sizeof(struct xpc_vars_part) != L1_CACHE_BYTES / 2);
301 
302 	remote_entry_pa = part->remote_vars_part_pa +
303 	    sn_partition_id * sizeof(struct xpc_vars_part);
304 
305 	remote_entry_cacheline_pa = (remote_entry_pa & ~(L1_CACHE_BYTES - 1));
306 
307 	pulled_entry = (struct xpc_vars_part *)((u64)pulled_entry_cacheline +
308 						(remote_entry_pa &
309 						 (L1_CACHE_BYTES - 1)));
310 
311 	ret = xpc_pull_remote_cachelines(part, pulled_entry_cacheline,
312 					 (void *)remote_entry_cacheline_pa,
313 					 L1_CACHE_BYTES);
314 	if (ret != xpSuccess) {
315 		dev_dbg(xpc_chan, "failed to pull XPC vars_part from "
316 			"partition %d, ret=%d\n", partid, ret);
317 		return ret;
318 	}
319 
320 	/* see if they've been set up yet */
321 
322 	if (pulled_entry->magic != XPC_VP_MAGIC1 &&
323 	    pulled_entry->magic != XPC_VP_MAGIC2) {
324 
325 		if (pulled_entry->magic != 0) {
326 			dev_dbg(xpc_chan, "partition %d's XPC vars_part for "
327 				"partition %d has bad magic value (=0x%lx)\n",
328 				partid, sn_partition_id, pulled_entry->magic);
329 			return xpBadMagic;
330 		}
331 
332 		/* they've not been initialized yet */
333 		return xpRetry;
334 	}
335 
336 	if (xpc_vars_part[partid].magic == XPC_VP_MAGIC1) {
337 
338 		/* validate the variables */
339 
340 		if (pulled_entry->GPs_pa == 0 ||
341 		    pulled_entry->openclose_args_pa == 0 ||
342 		    pulled_entry->IPI_amo_pa == 0) {
343 
344 			dev_err(xpc_chan, "partition %d's XPC vars_part for "
345 				"partition %d are not valid\n", partid,
346 				sn_partition_id);
347 			return xpInvalidAddress;
348 		}
349 
350 		/* the variables we imported look to be valid */
351 
352 		part->remote_GPs_pa = pulled_entry->GPs_pa;
353 		part->remote_openclose_args_pa =
354 		    pulled_entry->openclose_args_pa;
355 		part->remote_IPI_amo_va =
356 		    (AMO_t *)__va(pulled_entry->IPI_amo_pa);
357 		part->remote_IPI_nasid = pulled_entry->IPI_nasid;
358 		part->remote_IPI_phys_cpuid = pulled_entry->IPI_phys_cpuid;
359 
360 		if (part->nchannels > pulled_entry->nchannels)
361 			part->nchannels = pulled_entry->nchannels;
362 
363 		/* let the other side know that we've pulled their variables */
364 
365 		xpc_vars_part[partid].magic = XPC_VP_MAGIC2;
366 	}
367 
368 	if (pulled_entry->magic == XPC_VP_MAGIC1)
369 		return xpRetry;
370 
371 	return xpSuccess;
372 }
373 
374 /*
375  * Get the IPI flags and pull the openclose args and/or remote GPs as needed.
376  */
377 static u64
378 xpc_get_IPI_flags(struct xpc_partition *part)
379 {
380 	unsigned long irq_flags;
381 	u64 IPI_amo;
382 	enum xp_retval ret;
383 
384 	/*
385 	 * See if there are any IPI flags to be handled.
386 	 */
387 
388 	spin_lock_irqsave(&part->IPI_lock, irq_flags);
389 	IPI_amo = part->local_IPI_amo;
390 	if (IPI_amo != 0)
391 		part->local_IPI_amo = 0;
392 
393 	spin_unlock_irqrestore(&part->IPI_lock, irq_flags);
394 
395 	if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_amo)) {
396 		ret = xpc_pull_remote_cachelines(part,
397 						 part->remote_openclose_args,
398 						 (void *)part->
399 						 remote_openclose_args_pa,
400 						 XPC_OPENCLOSE_ARGS_SIZE);
401 		if (ret != xpSuccess) {
402 			XPC_DEACTIVATE_PARTITION(part, ret);
403 
404 			dev_dbg(xpc_chan, "failed to pull openclose args from "
405 				"partition %d, ret=%d\n", XPC_PARTID(part),
406 				ret);
407 
408 			/* don't bother processing IPIs anymore */
409 			IPI_amo = 0;
410 		}
411 	}
412 
413 	if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_amo)) {
414 		ret = xpc_pull_remote_cachelines(part, part->remote_GPs,
415 						 (void *)part->remote_GPs_pa,
416 						 XPC_GP_SIZE);
417 		if (ret != xpSuccess) {
418 			XPC_DEACTIVATE_PARTITION(part, ret);
419 
420 			dev_dbg(xpc_chan, "failed to pull GPs from partition "
421 				"%d, ret=%d\n", XPC_PARTID(part), ret);
422 
423 			/* don't bother processing IPIs anymore */
424 			IPI_amo = 0;
425 		}
426 	}
427 
428 	return IPI_amo;
429 }
430 
431 /*
432  * Allocate the local message queue and the notify queue.
433  */
434 static enum xp_retval
435 xpc_allocate_local_msgqueue(struct xpc_channel *ch)
436 {
437 	unsigned long irq_flags;
438 	int nentries;
439 	size_t nbytes;
440 
441 	for (nentries = ch->local_nentries; nentries > 0; nentries--) {
442 
443 		nbytes = nentries * ch->msg_size;
444 		ch->local_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
445 								   GFP_KERNEL,
446 						      &ch->local_msgqueue_base);
447 		if (ch->local_msgqueue == NULL)
448 			continue;
449 
450 		nbytes = nentries * sizeof(struct xpc_notify);
451 		ch->notify_queue = kzalloc(nbytes, GFP_KERNEL);
452 		if (ch->notify_queue == NULL) {
453 			kfree(ch->local_msgqueue_base);
454 			ch->local_msgqueue = NULL;
455 			continue;
456 		}
457 
458 		spin_lock_irqsave(&ch->lock, irq_flags);
459 		if (nentries < ch->local_nentries) {
460 			dev_dbg(xpc_chan, "nentries=%d local_nentries=%d, "
461 				"partid=%d, channel=%d\n", nentries,
462 				ch->local_nentries, ch->partid, ch->number);
463 
464 			ch->local_nentries = nentries;
465 		}
466 		spin_unlock_irqrestore(&ch->lock, irq_flags);
467 		return xpSuccess;
468 	}
469 
470 	dev_dbg(xpc_chan, "can't get memory for local message queue and notify "
471 		"queue, partid=%d, channel=%d\n", ch->partid, ch->number);
472 	return xpNoMemory;
473 }
474 
475 /*
476  * Allocate the cached remote message queue.
477  */
478 static enum xp_retval
479 xpc_allocate_remote_msgqueue(struct xpc_channel *ch)
480 {
481 	unsigned long irq_flags;
482 	int nentries;
483 	size_t nbytes;
484 
485 	DBUG_ON(ch->remote_nentries <= 0);
486 
487 	for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
488 
489 		nbytes = nentries * ch->msg_size;
490 		ch->remote_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
491 								    GFP_KERNEL,
492 						     &ch->remote_msgqueue_base);
493 		if (ch->remote_msgqueue == NULL)
494 			continue;
495 
496 		spin_lock_irqsave(&ch->lock, irq_flags);
497 		if (nentries < ch->remote_nentries) {
498 			dev_dbg(xpc_chan, "nentries=%d remote_nentries=%d, "
499 				"partid=%d, channel=%d\n", nentries,
500 				ch->remote_nentries, ch->partid, ch->number);
501 
502 			ch->remote_nentries = nentries;
503 		}
504 		spin_unlock_irqrestore(&ch->lock, irq_flags);
505 		return xpSuccess;
506 	}
507 
508 	dev_dbg(xpc_chan, "can't get memory for cached remote message queue, "
509 		"partid=%d, channel=%d\n", ch->partid, ch->number);
510 	return xpNoMemory;
511 }
512 
513 /*
514  * Allocate message queues and other stuff associated with a channel.
515  *
516  * Note: Assumes all of the channel sizes are filled in.
517  */
518 static enum xp_retval
519 xpc_allocate_msgqueues(struct xpc_channel *ch)
520 {
521 	unsigned long irq_flags;
522 	enum xp_retval ret;
523 
524 	DBUG_ON(ch->flags & XPC_C_SETUP);
525 
526 	ret = xpc_allocate_local_msgqueue(ch);
527 	if (ret != xpSuccess)
528 		return ret;
529 
530 	ret = xpc_allocate_remote_msgqueue(ch);
531 	if (ret != xpSuccess) {
532 		kfree(ch->local_msgqueue_base);
533 		ch->local_msgqueue = NULL;
534 		kfree(ch->notify_queue);
535 		ch->notify_queue = NULL;
536 		return ret;
537 	}
538 
539 	spin_lock_irqsave(&ch->lock, irq_flags);
540 	ch->flags |= XPC_C_SETUP;
541 	spin_unlock_irqrestore(&ch->lock, irq_flags);
542 
543 	return xpSuccess;
544 }
545 
546 /*
547  * Process a connect message from a remote partition.
548  *
549  * Note: xpc_process_connect() is expecting to be called with the
550  * spin_lock_irqsave held and will leave it locked upon return.
551  */
552 static void
553 xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
554 {
555 	enum xp_retval ret;
556 
557 	DBUG_ON(!spin_is_locked(&ch->lock));
558 
559 	if (!(ch->flags & XPC_C_OPENREQUEST) ||
560 	    !(ch->flags & XPC_C_ROPENREQUEST)) {
561 		/* nothing more to do for now */
562 		return;
563 	}
564 	DBUG_ON(!(ch->flags & XPC_C_CONNECTING));
565 
566 	if (!(ch->flags & XPC_C_SETUP)) {
567 		spin_unlock_irqrestore(&ch->lock, *irq_flags);
568 		ret = xpc_allocate_msgqueues(ch);
569 		spin_lock_irqsave(&ch->lock, *irq_flags);
570 
571 		if (ret != xpSuccess)
572 			XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
573 
574 		if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING))
575 			return;
576 
577 		DBUG_ON(!(ch->flags & XPC_C_SETUP));
578 		DBUG_ON(ch->local_msgqueue == NULL);
579 		DBUG_ON(ch->remote_msgqueue == NULL);
580 	}
581 
582 	if (!(ch->flags & XPC_C_OPENREPLY)) {
583 		ch->flags |= XPC_C_OPENREPLY;
584 		xpc_IPI_send_openreply(ch, irq_flags);
585 	}
586 
587 	if (!(ch->flags & XPC_C_ROPENREPLY))
588 		return;
589 
590 	DBUG_ON(ch->remote_msgqueue_pa == 0);
591 
592 	ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP);	/* clear all else */
593 
594 	dev_info(xpc_chan, "channel %d to partition %d connected\n",
595 		 ch->number, ch->partid);
596 
597 	spin_unlock_irqrestore(&ch->lock, *irq_flags);
598 	xpc_create_kthreads(ch, 1, 0);
599 	spin_lock_irqsave(&ch->lock, *irq_flags);
600 }
601 
602 /*
603  * Notify those who wanted to be notified upon delivery of their message.
604  */
605 static void
606 xpc_notify_senders(struct xpc_channel *ch, enum xp_retval reason, s64 put)
607 {
608 	struct xpc_notify *notify;
609 	u8 notify_type;
610 	s64 get = ch->w_remote_GP.get - 1;
611 
612 	while (++get < put && atomic_read(&ch->n_to_notify) > 0) {
613 
614 		notify = &ch->notify_queue[get % ch->local_nentries];
615 
616 		/*
617 		 * See if the notify entry indicates it was associated with
618 		 * a message who's sender wants to be notified. It is possible
619 		 * that it is, but someone else is doing or has done the
620 		 * notification.
621 		 */
622 		notify_type = notify->type;
623 		if (notify_type == 0 ||
624 		    cmpxchg(&notify->type, notify_type, 0) != notify_type) {
625 			continue;
626 		}
627 
628 		DBUG_ON(notify_type != XPC_N_CALL);
629 
630 		atomic_dec(&ch->n_to_notify);
631 
632 		if (notify->func != NULL) {
633 			dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, "
634 				"msg_number=%ld, partid=%d, channel=%d\n",
635 				(void *)notify, get, ch->partid, ch->number);
636 
637 			notify->func(reason, ch->partid, ch->number,
638 				     notify->key);
639 
640 			dev_dbg(xpc_chan, "notify->func() returned, "
641 				"notify=0x%p, msg_number=%ld, partid=%d, "
642 				"channel=%d\n", (void *)notify, get,
643 				ch->partid, ch->number);
644 		}
645 	}
646 }
647 
648 /*
649  * Free up message queues and other stuff that were allocated for the specified
650  * channel.
651  *
652  * Note: ch->reason and ch->reason_line are left set for debugging purposes,
653  * they're cleared when XPC_C_DISCONNECTED is cleared.
654  */
655 static void
656 xpc_free_msgqueues(struct xpc_channel *ch)
657 {
658 	DBUG_ON(!spin_is_locked(&ch->lock));
659 	DBUG_ON(atomic_read(&ch->n_to_notify) != 0);
660 
661 	ch->remote_msgqueue_pa = 0;
662 	ch->func = NULL;
663 	ch->key = NULL;
664 	ch->msg_size = 0;
665 	ch->local_nentries = 0;
666 	ch->remote_nentries = 0;
667 	ch->kthreads_assigned_limit = 0;
668 	ch->kthreads_idle_limit = 0;
669 
670 	ch->local_GP->get = 0;
671 	ch->local_GP->put = 0;
672 	ch->remote_GP.get = 0;
673 	ch->remote_GP.put = 0;
674 	ch->w_local_GP.get = 0;
675 	ch->w_local_GP.put = 0;
676 	ch->w_remote_GP.get = 0;
677 	ch->w_remote_GP.put = 0;
678 	ch->next_msg_to_pull = 0;
679 
680 	if (ch->flags & XPC_C_SETUP) {
681 		ch->flags &= ~XPC_C_SETUP;
682 
683 		dev_dbg(xpc_chan, "ch->flags=0x%x, partid=%d, channel=%d\n",
684 			ch->flags, ch->partid, ch->number);
685 
686 		kfree(ch->local_msgqueue_base);
687 		ch->local_msgqueue = NULL;
688 		kfree(ch->remote_msgqueue_base);
689 		ch->remote_msgqueue = NULL;
690 		kfree(ch->notify_queue);
691 		ch->notify_queue = NULL;
692 	}
693 }
694 
695 /*
696  * spin_lock_irqsave() is expected to be held on entry.
697  */
698 static void
699 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
700 {
701 	struct xpc_partition *part = &xpc_partitions[ch->partid];
702 	u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
703 
704 	DBUG_ON(!spin_is_locked(&ch->lock));
705 
706 	if (!(ch->flags & XPC_C_DISCONNECTING))
707 		return;
708 
709 	DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
710 
711 	/* make sure all activity has settled down first */
712 
713 	if (atomic_read(&ch->kthreads_assigned) > 0 ||
714 	    atomic_read(&ch->references) > 0) {
715 		return;
716 	}
717 	DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
718 		!(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE));
719 
720 	if (part->act_state == XPC_P_DEACTIVATING) {
721 		/* can't proceed until the other side disengages from us */
722 		if (xpc_partition_engaged(1UL << ch->partid))
723 			return;
724 
725 	} else {
726 
727 		/* as long as the other side is up do the full protocol */
728 
729 		if (!(ch->flags & XPC_C_RCLOSEREQUEST))
730 			return;
731 
732 		if (!(ch->flags & XPC_C_CLOSEREPLY)) {
733 			ch->flags |= XPC_C_CLOSEREPLY;
734 			xpc_IPI_send_closereply(ch, irq_flags);
735 		}
736 
737 		if (!(ch->flags & XPC_C_RCLOSEREPLY))
738 			return;
739 	}
740 
741 	/* wake those waiting for notify completion */
742 	if (atomic_read(&ch->n_to_notify) > 0) {
743 		/* >>> we do callout while holding ch->lock */
744 		xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put);
745 	}
746 
747 	/* both sides are disconnected now */
748 
749 	if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
750 		spin_unlock_irqrestore(&ch->lock, *irq_flags);
751 		xpc_disconnect_callout(ch, xpDisconnected);
752 		spin_lock_irqsave(&ch->lock, *irq_flags);
753 	}
754 
755 	/* it's now safe to free the channel's message queues */
756 	xpc_free_msgqueues(ch);
757 
758 	/* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
759 	ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
760 
761 	atomic_dec(&part->nchannels_active);
762 
763 	if (channel_was_connected) {
764 		dev_info(xpc_chan, "channel %d to partition %d disconnected, "
765 			 "reason=%d\n", ch->number, ch->partid, ch->reason);
766 	}
767 
768 	if (ch->flags & XPC_C_WDISCONNECT) {
769 		/* we won't lose the CPU since we're holding ch->lock */
770 		complete(&ch->wdisconnect_wait);
771 	} else if (ch->delayed_IPI_flags) {
772 		if (part->act_state != XPC_P_DEACTIVATING) {
773 			/* time to take action on any delayed IPI flags */
774 			spin_lock(&part->IPI_lock);
775 			XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number,
776 					  ch->delayed_IPI_flags);
777 			spin_unlock(&part->IPI_lock);
778 		}
779 		ch->delayed_IPI_flags = 0;
780 	}
781 }
782 
783 /*
784  * Process a change in the channel's remote connection state.
785  */
786 static void
787 xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
788 			  u8 IPI_flags)
789 {
790 	unsigned long irq_flags;
791 	struct xpc_openclose_args *args =
792 	    &part->remote_openclose_args[ch_number];
793 	struct xpc_channel *ch = &part->channels[ch_number];
794 	enum xp_retval reason;
795 
796 	spin_lock_irqsave(&ch->lock, irq_flags);
797 
798 again:
799 
800 	if ((ch->flags & XPC_C_DISCONNECTED) &&
801 	    (ch->flags & XPC_C_WDISCONNECT)) {
802 		/*
803 		 * Delay processing IPI flags until thread waiting disconnect
804 		 * has had a chance to see that the channel is disconnected.
805 		 */
806 		ch->delayed_IPI_flags |= IPI_flags;
807 		spin_unlock_irqrestore(&ch->lock, irq_flags);
808 		return;
809 	}
810 
811 	if (IPI_flags & XPC_IPI_CLOSEREQUEST) {
812 
813 		dev_dbg(xpc_chan, "XPC_IPI_CLOSEREQUEST (reason=%d) received "
814 			"from partid=%d, channel=%d\n", args->reason,
815 			ch->partid, ch->number);
816 
817 		/*
818 		 * If RCLOSEREQUEST is set, we're probably waiting for
819 		 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
820 		 * with this RCLOSEREQUEST in the IPI_flags.
821 		 */
822 
823 		if (ch->flags & XPC_C_RCLOSEREQUEST) {
824 			DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
825 			DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
826 			DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
827 			DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);
828 
829 			DBUG_ON(!(IPI_flags & XPC_IPI_CLOSEREPLY));
830 			IPI_flags &= ~XPC_IPI_CLOSEREPLY;
831 			ch->flags |= XPC_C_RCLOSEREPLY;
832 
833 			/* both sides have finished disconnecting */
834 			xpc_process_disconnect(ch, &irq_flags);
835 			DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
836 			goto again;
837 		}
838 
839 		if (ch->flags & XPC_C_DISCONNECTED) {
840 			if (!(IPI_flags & XPC_IPI_OPENREQUEST)) {
841 				if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo,
842 						       ch_number) &
843 				     XPC_IPI_OPENREQUEST)) {
844 
845 					DBUG_ON(ch->delayed_IPI_flags != 0);
846 					spin_lock(&part->IPI_lock);
847 					XPC_SET_IPI_FLAGS(part->local_IPI_amo,
848 							  ch_number,
849 							  XPC_IPI_CLOSEREQUEST);
850 					spin_unlock(&part->IPI_lock);
851 				}
852 				spin_unlock_irqrestore(&ch->lock, irq_flags);
853 				return;
854 			}
855 
856 			XPC_SET_REASON(ch, 0, 0);
857 			ch->flags &= ~XPC_C_DISCONNECTED;
858 
859 			atomic_inc(&part->nchannels_active);
860 			ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
861 		}
862 
863 		IPI_flags &= ~(XPC_IPI_OPENREQUEST | XPC_IPI_OPENREPLY);
864 
865 		/*
866 		 * The meaningful CLOSEREQUEST connection state fields are:
867 		 *      reason = reason connection is to be closed
868 		 */
869 
870 		ch->flags |= XPC_C_RCLOSEREQUEST;
871 
872 		if (!(ch->flags & XPC_C_DISCONNECTING)) {
873 			reason = args->reason;
874 			if (reason <= xpSuccess || reason > xpUnknownReason)
875 				reason = xpUnknownReason;
876 			else if (reason == xpUnregistering)
877 				reason = xpOtherUnregistering;
878 
879 			XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
880 
881 			DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY);
882 			spin_unlock_irqrestore(&ch->lock, irq_flags);
883 			return;
884 		}
885 
886 		xpc_process_disconnect(ch, &irq_flags);
887 	}
888 
889 	if (IPI_flags & XPC_IPI_CLOSEREPLY) {
890 
891 		dev_dbg(xpc_chan, "XPC_IPI_CLOSEREPLY received from partid=%d,"
892 			" channel=%d\n", ch->partid, ch->number);
893 
894 		if (ch->flags & XPC_C_DISCONNECTED) {
895 			DBUG_ON(part->act_state != XPC_P_DEACTIVATING);
896 			spin_unlock_irqrestore(&ch->lock, irq_flags);
897 			return;
898 		}
899 
900 		DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
901 
902 		if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
903 			if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number)
904 			     & XPC_IPI_CLOSEREQUEST)) {
905 
906 				DBUG_ON(ch->delayed_IPI_flags != 0);
907 				spin_lock(&part->IPI_lock);
908 				XPC_SET_IPI_FLAGS(part->local_IPI_amo,
909 						  ch_number,
910 						  XPC_IPI_CLOSEREPLY);
911 				spin_unlock(&part->IPI_lock);
912 			}
913 			spin_unlock_irqrestore(&ch->lock, irq_flags);
914 			return;
915 		}
916 
917 		ch->flags |= XPC_C_RCLOSEREPLY;
918 
919 		if (ch->flags & XPC_C_CLOSEREPLY) {
920 			/* both sides have finished disconnecting */
921 			xpc_process_disconnect(ch, &irq_flags);
922 		}
923 	}
924 
925 	if (IPI_flags & XPC_IPI_OPENREQUEST) {
926 
927 		dev_dbg(xpc_chan, "XPC_IPI_OPENREQUEST (msg_size=%d, "
928 			"local_nentries=%d) received from partid=%d, "
929 			"channel=%d\n", args->msg_size, args->local_nentries,
930 			ch->partid, ch->number);
931 
932 		if (part->act_state == XPC_P_DEACTIVATING ||
933 		    (ch->flags & XPC_C_ROPENREQUEST)) {
934 			spin_unlock_irqrestore(&ch->lock, irq_flags);
935 			return;
936 		}
937 
938 		if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
939 			ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST;
940 			spin_unlock_irqrestore(&ch->lock, irq_flags);
941 			return;
942 		}
943 		DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
944 				       XPC_C_OPENREQUEST)));
945 		DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
946 				     XPC_C_OPENREPLY | XPC_C_CONNECTED));
947 
948 		/*
949 		 * The meaningful OPENREQUEST connection state fields are:
950 		 *      msg_size = size of channel's messages in bytes
951 		 *      local_nentries = remote partition's local_nentries
952 		 */
953 		if (args->msg_size == 0 || args->local_nentries == 0) {
954 			/* assume OPENREQUEST was delayed by mistake */
955 			spin_unlock_irqrestore(&ch->lock, irq_flags);
956 			return;
957 		}
958 
959 		ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
960 		ch->remote_nentries = args->local_nentries;
961 
962 		if (ch->flags & XPC_C_OPENREQUEST) {
963 			if (args->msg_size != ch->msg_size) {
964 				XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
965 						       &irq_flags);
966 				spin_unlock_irqrestore(&ch->lock, irq_flags);
967 				return;
968 			}
969 		} else {
970 			ch->msg_size = args->msg_size;
971 
972 			XPC_SET_REASON(ch, 0, 0);
973 			ch->flags &= ~XPC_C_DISCONNECTED;
974 
975 			atomic_inc(&part->nchannels_active);
976 		}
977 
978 		xpc_process_connect(ch, &irq_flags);
979 	}
980 
981 	if (IPI_flags & XPC_IPI_OPENREPLY) {
982 
983 		dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, "
984 			"local_nentries=%d, remote_nentries=%d) received from "
985 			"partid=%d, channel=%d\n", args->local_msgqueue_pa,
986 			args->local_nentries, args->remote_nentries,
987 			ch->partid, ch->number);
988 
989 		if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
990 			spin_unlock_irqrestore(&ch->lock, irq_flags);
991 			return;
992 		}
993 		if (!(ch->flags & XPC_C_OPENREQUEST)) {
994 			XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError,
995 					       &irq_flags);
996 			spin_unlock_irqrestore(&ch->lock, irq_flags);
997 			return;
998 		}
999 
1000 		DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
1001 		DBUG_ON(ch->flags & XPC_C_CONNECTED);
1002 
1003 		/*
1004 		 * The meaningful OPENREPLY connection state fields are:
1005 		 *      local_msgqueue_pa = physical address of remote
1006 		 *                          partition's local_msgqueue
1007 		 *      local_nentries = remote partition's local_nentries
1008 		 *      remote_nentries = remote partition's remote_nentries
1009 		 */
1010 		DBUG_ON(args->local_msgqueue_pa == 0);
1011 		DBUG_ON(args->local_nentries == 0);
1012 		DBUG_ON(args->remote_nentries == 0);
1013 
1014 		ch->flags |= XPC_C_ROPENREPLY;
1015 		ch->remote_msgqueue_pa = args->local_msgqueue_pa;
1016 
1017 		if (args->local_nentries < ch->remote_nentries) {
1018 			dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
1019 				"remote_nentries=%d, old remote_nentries=%d, "
1020 				"partid=%d, channel=%d\n",
1021 				args->local_nentries, ch->remote_nentries,
1022 				ch->partid, ch->number);
1023 
1024 			ch->remote_nentries = args->local_nentries;
1025 		}
1026 		if (args->remote_nentries < ch->local_nentries) {
1027 			dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
1028 				"local_nentries=%d, old local_nentries=%d, "
1029 				"partid=%d, channel=%d\n",
1030 				args->remote_nentries, ch->local_nentries,
1031 				ch->partid, ch->number);
1032 
1033 			ch->local_nentries = args->remote_nentries;
1034 		}
1035 
1036 		xpc_process_connect(ch, &irq_flags);
1037 	}
1038 
1039 	spin_unlock_irqrestore(&ch->lock, irq_flags);
1040 }
1041 
1042 /*
1043  * Attempt to establish a channel connection to a remote partition.
1044  */
1045 static enum xp_retval
1046 xpc_connect_channel(struct xpc_channel *ch)
1047 {
1048 	unsigned long irq_flags;
1049 	struct xpc_registration *registration = &xpc_registrations[ch->number];
1050 
1051 	if (mutex_trylock(&registration->mutex) == 0)
1052 		return xpRetry;
1053 
1054 	if (!XPC_CHANNEL_REGISTERED(ch->number)) {
1055 		mutex_unlock(&registration->mutex);
1056 		return xpUnregistered;
1057 	}
1058 
1059 	spin_lock_irqsave(&ch->lock, irq_flags);
1060 
1061 	DBUG_ON(ch->flags & XPC_C_CONNECTED);
1062 	DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
1063 
1064 	if (ch->flags & XPC_C_DISCONNECTING) {
1065 		spin_unlock_irqrestore(&ch->lock, irq_flags);
1066 		mutex_unlock(&registration->mutex);
1067 		return ch->reason;
1068 	}
1069 
1070 	/* add info from the channel connect registration to the channel */
1071 
1072 	ch->kthreads_assigned_limit = registration->assigned_limit;
1073 	ch->kthreads_idle_limit = registration->idle_limit;
1074 	DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
1075 	DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
1076 	DBUG_ON(atomic_read(&ch->kthreads_active) != 0);
1077 
1078 	ch->func = registration->func;
1079 	DBUG_ON(registration->func == NULL);
1080 	ch->key = registration->key;
1081 
1082 	ch->local_nentries = registration->nentries;
1083 
1084 	if (ch->flags & XPC_C_ROPENREQUEST) {
1085 		if (registration->msg_size != ch->msg_size) {
1086 			/* the local and remote sides aren't the same */
1087 
1088 			/*
1089 			 * Because XPC_DISCONNECT_CHANNEL() can block we're
1090 			 * forced to up the registration sema before we unlock
1091 			 * the channel lock. But that's okay here because we're
1092 			 * done with the part that required the registration
1093 			 * sema. XPC_DISCONNECT_CHANNEL() requires that the
1094 			 * channel lock be locked and will unlock and relock
1095 			 * the channel lock as needed.
1096 			 */
1097 			mutex_unlock(&registration->mutex);
1098 			XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
1099 					       &irq_flags);
1100 			spin_unlock_irqrestore(&ch->lock, irq_flags);
1101 			return xpUnequalMsgSizes;
1102 		}
1103 	} else {
1104 		ch->msg_size = registration->msg_size;
1105 
1106 		XPC_SET_REASON(ch, 0, 0);
1107 		ch->flags &= ~XPC_C_DISCONNECTED;
1108 
1109 		atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
1110 	}
1111 
1112 	mutex_unlock(&registration->mutex);
1113 
1114 	/* initiate the connection */
1115 
1116 	ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
1117 	xpc_IPI_send_openrequest(ch, &irq_flags);
1118 
1119 	xpc_process_connect(ch, &irq_flags);
1120 
1121 	spin_unlock_irqrestore(&ch->lock, irq_flags);
1122 
1123 	return xpSuccess;
1124 }
1125 
1126 /*
1127  * Clear some of the msg flags in the local message queue.
1128  */
1129 static inline void
1130 xpc_clear_local_msgqueue_flags(struct xpc_channel *ch)
1131 {
1132 	struct xpc_msg *msg;
1133 	s64 get;
1134 
1135 	get = ch->w_remote_GP.get;
1136 	do {
1137 		msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
1138 					 (get % ch->local_nentries) *
1139 					 ch->msg_size);
1140 		msg->flags = 0;
1141 	} while (++get < ch->remote_GP.get);
1142 }
1143 
1144 /*
1145  * Clear some of the msg flags in the remote message queue.
1146  */
1147 static inline void
1148 xpc_clear_remote_msgqueue_flags(struct xpc_channel *ch)
1149 {
1150 	struct xpc_msg *msg;
1151 	s64 put;
1152 
1153 	put = ch->w_remote_GP.put;
1154 	do {
1155 		msg = (struct xpc_msg *)((u64)ch->remote_msgqueue +
1156 					 (put % ch->remote_nentries) *
1157 					 ch->msg_size);
1158 		msg->flags = 0;
1159 	} while (++put < ch->remote_GP.put);
1160 }
1161 
1162 static void
1163 xpc_process_msg_IPI(struct xpc_partition *part, int ch_number)
1164 {
1165 	struct xpc_channel *ch = &part->channels[ch_number];
1166 	int nmsgs_sent;
1167 
1168 	ch->remote_GP = part->remote_GPs[ch_number];
1169 
1170 	/* See what, if anything, has changed for each connected channel */
1171 
1172 	xpc_msgqueue_ref(ch);
1173 
1174 	if (ch->w_remote_GP.get == ch->remote_GP.get &&
1175 	    ch->w_remote_GP.put == ch->remote_GP.put) {
1176 		/* nothing changed since GPs were last pulled */
1177 		xpc_msgqueue_deref(ch);
1178 		return;
1179 	}
1180 
1181 	if (!(ch->flags & XPC_C_CONNECTED)) {
1182 		xpc_msgqueue_deref(ch);
1183 		return;
1184 	}
1185 
1186 	/*
1187 	 * First check to see if messages recently sent by us have been
1188 	 * received by the other side. (The remote GET value will have
1189 	 * changed since we last looked at it.)
1190 	 */
1191 
1192 	if (ch->w_remote_GP.get != ch->remote_GP.get) {
1193 
1194 		/*
1195 		 * We need to notify any senders that want to be notified
1196 		 * that their sent messages have been received by their
1197 		 * intended recipients. We need to do this before updating
1198 		 * w_remote_GP.get so that we don't allocate the same message
1199 		 * queue entries prematurely (see xpc_allocate_msg()).
1200 		 */
1201 		if (atomic_read(&ch->n_to_notify) > 0) {
1202 			/*
1203 			 * Notify senders that messages sent have been
1204 			 * received and delivered by the other side.
1205 			 */
1206 			xpc_notify_senders(ch, xpMsgDelivered,
1207 					   ch->remote_GP.get);
1208 		}
1209 
1210 		/*
1211 		 * Clear msg->flags in previously sent messages, so that
1212 		 * they're ready for xpc_allocate_msg().
1213 		 */
1214 		xpc_clear_local_msgqueue_flags(ch);
1215 
1216 		ch->w_remote_GP.get = ch->remote_GP.get;
1217 
1218 		dev_dbg(xpc_chan, "w_remote_GP.get changed to %ld, partid=%d, "
1219 			"channel=%d\n", ch->w_remote_GP.get, ch->partid,
1220 			ch->number);
1221 
1222 		/*
1223 		 * If anyone was waiting for message queue entries to become
1224 		 * available, wake them up.
1225 		 */
1226 		if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
1227 			wake_up(&ch->msg_allocate_wq);
1228 	}
1229 
1230 	/*
1231 	 * Now check for newly sent messages by the other side. (The remote
1232 	 * PUT value will have changed since we last looked at it.)
1233 	 */
1234 
1235 	if (ch->w_remote_GP.put != ch->remote_GP.put) {
1236 		/*
1237 		 * Clear msg->flags in previously received messages, so that
1238 		 * they're ready for xpc_get_deliverable_msg().
1239 		 */
1240 		xpc_clear_remote_msgqueue_flags(ch);
1241 
1242 		ch->w_remote_GP.put = ch->remote_GP.put;
1243 
1244 		dev_dbg(xpc_chan, "w_remote_GP.put changed to %ld, partid=%d, "
1245 			"channel=%d\n", ch->w_remote_GP.put, ch->partid,
1246 			ch->number);
1247 
1248 		nmsgs_sent = ch->w_remote_GP.put - ch->w_local_GP.get;
1249 		if (nmsgs_sent > 0) {
1250 			dev_dbg(xpc_chan, "msgs waiting to be copied and "
1251 				"delivered=%d, partid=%d, channel=%d\n",
1252 				nmsgs_sent, ch->partid, ch->number);
1253 
1254 			if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE)
1255 				xpc_activate_kthreads(ch, nmsgs_sent);
1256 		}
1257 	}
1258 
1259 	xpc_msgqueue_deref(ch);
1260 }
1261 
1262 void
1263 xpc_process_channel_activity(struct xpc_partition *part)
1264 {
1265 	unsigned long irq_flags;
1266 	u64 IPI_amo, IPI_flags;
1267 	struct xpc_channel *ch;
1268 	int ch_number;
1269 	u32 ch_flags;
1270 
1271 	IPI_amo = xpc_get_IPI_flags(part);
1272 
1273 	/*
1274 	 * Initiate channel connections for registered channels.
1275 	 *
1276 	 * For each connected channel that has pending messages activate idle
1277 	 * kthreads and/or create new kthreads as needed.
1278 	 */
1279 
1280 	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
1281 		ch = &part->channels[ch_number];
1282 
1283 		/*
1284 		 * Process any open or close related IPI flags, and then deal
1285 		 * with connecting or disconnecting the channel as required.
1286 		 */
1287 
1288 		IPI_flags = XPC_GET_IPI_FLAGS(IPI_amo, ch_number);
1289 
1290 		if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags))
1291 			xpc_process_openclose_IPI(part, ch_number, IPI_flags);
1292 
1293 		ch_flags = ch->flags;	/* need an atomic snapshot of flags */
1294 
1295 		if (ch_flags & XPC_C_DISCONNECTING) {
1296 			spin_lock_irqsave(&ch->lock, irq_flags);
1297 			xpc_process_disconnect(ch, &irq_flags);
1298 			spin_unlock_irqrestore(&ch->lock, irq_flags);
1299 			continue;
1300 		}
1301 
1302 		if (part->act_state == XPC_P_DEACTIVATING)
1303 			continue;
1304 
1305 		if (!(ch_flags & XPC_C_CONNECTED)) {
1306 			if (!(ch_flags & XPC_C_OPENREQUEST)) {
1307 				DBUG_ON(ch_flags & XPC_C_SETUP);
1308 				(void)xpc_connect_channel(ch);
1309 			} else {
1310 				spin_lock_irqsave(&ch->lock, irq_flags);
1311 				xpc_process_connect(ch, &irq_flags);
1312 				spin_unlock_irqrestore(&ch->lock, irq_flags);
1313 			}
1314 			continue;
1315 		}
1316 
1317 		/*
1318 		 * Process any message related IPI flags, this may involve the
1319 		 * activation of kthreads to deliver any pending messages sent
1320 		 * from the other partition.
1321 		 */
1322 
1323 		if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags))
1324 			xpc_process_msg_IPI(part, ch_number);
1325 	}
1326 }
1327 
1328 /*
1329  * XPC's heartbeat code calls this function to inform XPC that a partition is
1330  * going down.  XPC responds by tearing down the XPartition Communication
1331  * infrastructure used for the just downed partition.
1332  *
1333  * XPC's heartbeat code will never call this function and xpc_partition_up()
1334  * at the same time. Nor will it ever make multiple calls to either function
1335  * at the same time.
1336  */
1337 void
1338 xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason)
1339 {
1340 	unsigned long irq_flags;
1341 	int ch_number;
1342 	struct xpc_channel *ch;
1343 
1344 	dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
1345 		XPC_PARTID(part), reason);
1346 
1347 	if (!xpc_part_ref(part)) {
1348 		/* infrastructure for this partition isn't currently set up */
1349 		return;
1350 	}
1351 
1352 	/* disconnect channels associated with the partition going down */
1353 
1354 	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
1355 		ch = &part->channels[ch_number];
1356 
1357 		xpc_msgqueue_ref(ch);
1358 		spin_lock_irqsave(&ch->lock, irq_flags);
1359 
1360 		XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
1361 
1362 		spin_unlock_irqrestore(&ch->lock, irq_flags);
1363 		xpc_msgqueue_deref(ch);
1364 	}
1365 
1366 	xpc_wakeup_channel_mgr(part);
1367 
1368 	xpc_part_deref(part);
1369 }
1370 
1371 /*
1372  * Teardown the infrastructure necessary to support XPartition Communication
1373  * between the specified remote partition and the local one.
1374  */
1375 void
1376 xpc_teardown_infrastructure(struct xpc_partition *part)
1377 {
1378 	short partid = XPC_PARTID(part);
1379 
1380 	/*
1381 	 * We start off by making this partition inaccessible to local
1382 	 * processes by marking it as no longer setup. Then we make it
1383 	 * inaccessible to remote processes by clearing the XPC per partition
1384 	 * specific variable's magic # (which indicates that these variables
1385 	 * are no longer valid) and by ignoring all XPC notify IPIs sent to
1386 	 * this partition.
1387 	 */
1388 
1389 	DBUG_ON(atomic_read(&part->nchannels_engaged) != 0);
1390 	DBUG_ON(atomic_read(&part->nchannels_active) != 0);
1391 	DBUG_ON(part->setup_state != XPC_P_SETUP);
1392 	part->setup_state = XPC_P_WTEARDOWN;
1393 
1394 	xpc_vars_part[partid].magic = 0;
1395 
1396 	free_irq(SGI_XPC_NOTIFY, (void *)(u64)partid);
1397 
1398 	/*
1399 	 * Before proceeding with the teardown we have to wait until all
1400 	 * existing references cease.
1401 	 */
1402 	wait_event(part->teardown_wq, (atomic_read(&part->references) == 0));
1403 
1404 	/* now we can begin tearing down the infrastructure */
1405 
1406 	part->setup_state = XPC_P_TORNDOWN;
1407 
1408 	/* in case we've still got outstanding timers registered... */
1409 	del_timer_sync(&part->dropped_IPI_timer);
1410 
1411 	kfree(part->remote_openclose_args_base);
1412 	part->remote_openclose_args = NULL;
1413 	kfree(part->local_openclose_args_base);
1414 	part->local_openclose_args = NULL;
1415 	kfree(part->remote_GPs_base);
1416 	part->remote_GPs = NULL;
1417 	kfree(part->local_GPs_base);
1418 	part->local_GPs = NULL;
1419 	kfree(part->channels);
1420 	part->channels = NULL;
1421 	part->local_IPI_amo_va = NULL;
1422 }
1423 
1424 /*
1425  * Called by XP at the time of channel connection registration to cause
1426  * XPC to establish connections to all currently active partitions.
1427  */
1428 void
1429 xpc_initiate_connect(int ch_number)
1430 {
1431 	short partid;
1432 	struct xpc_partition *part;
1433 	struct xpc_channel *ch;
1434 
1435 	DBUG_ON(ch_number < 0 || ch_number >= XPC_NCHANNELS);
1436 
1437 	for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
1438 		part = &xpc_partitions[partid];
1439 
1440 		if (xpc_part_ref(part)) {
1441 			ch = &part->channels[ch_number];
1442 
1443 			/*
1444 			 * Initiate the establishment of a connection on the
1445 			 * newly registered channel to the remote partition.
1446 			 */
1447 			xpc_wakeup_channel_mgr(part);
1448 			xpc_part_deref(part);
1449 		}
1450 	}
1451 }
1452 
1453 void
1454 xpc_connected_callout(struct xpc_channel *ch)
1455 {
1456 	/* let the registerer know that a connection has been established */
1457 
1458 	if (ch->func != NULL) {
1459 		dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, "
1460 			"partid=%d, channel=%d\n", ch->partid, ch->number);
1461 
1462 		ch->func(xpConnected, ch->partid, ch->number,
1463 			 (void *)(u64)ch->local_nentries, ch->key);
1464 
1465 		dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, "
1466 			"partid=%d, channel=%d\n", ch->partid, ch->number);
1467 	}
1468 }
1469 
1470 /*
1471  * Called by XP at the time of channel connection unregistration to cause
1472  * XPC to teardown all current connections for the specified channel.
1473  *
1474  * Before returning xpc_initiate_disconnect() will wait until all connections
1475  * on the specified channel have been closed/torndown. So the caller can be
1476  * assured that they will not be receiving any more callouts from XPC to the
1477  * function they registered via xpc_connect().
1478  *
1479  * Arguments:
1480  *
1481  *	ch_number - channel # to unregister.
1482  */
1483 void
1484 xpc_initiate_disconnect(int ch_number)
1485 {
1486 	unsigned long irq_flags;
1487 	short partid;
1488 	struct xpc_partition *part;
1489 	struct xpc_channel *ch;
1490 
1491 	DBUG_ON(ch_number < 0 || ch_number >= XPC_NCHANNELS);
1492 
1493 	/* initiate the channel disconnect for every active partition */
1494 	for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
1495 		part = &xpc_partitions[partid];
1496 
1497 		if (xpc_part_ref(part)) {
1498 			ch = &part->channels[ch_number];
1499 			xpc_msgqueue_ref(ch);
1500 
1501 			spin_lock_irqsave(&ch->lock, irq_flags);
1502 
1503 			if (!(ch->flags & XPC_C_DISCONNECTED)) {
1504 				ch->flags |= XPC_C_WDISCONNECT;
1505 
1506 				XPC_DISCONNECT_CHANNEL(ch, xpUnregistering,
1507 						       &irq_flags);
1508 			}
1509 
1510 			spin_unlock_irqrestore(&ch->lock, irq_flags);
1511 
1512 			xpc_msgqueue_deref(ch);
1513 			xpc_part_deref(part);
1514 		}
1515 	}
1516 
1517 	xpc_disconnect_wait(ch_number);
1518 }
1519 
1520 /*
1521  * To disconnect a channel, and reflect it back to all who may be waiting.
1522  *
1523  * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
1524  * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
1525  * xpc_disconnect_wait().
1526  *
1527  * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
1528  */
1529 void
1530 xpc_disconnect_channel(const int line, struct xpc_channel *ch,
1531 		       enum xp_retval reason, unsigned long *irq_flags)
1532 {
1533 	u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
1534 
1535 	DBUG_ON(!spin_is_locked(&ch->lock));
1536 
1537 	if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
1538 		return;
1539 
1540 	DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));
1541 
1542 	dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
1543 		reason, line, ch->partid, ch->number);
1544 
1545 	XPC_SET_REASON(ch, reason, line);
1546 
1547 	ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
1548 	/* some of these may not have been set */
1549 	ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
1550 		       XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
1551 		       XPC_C_CONNECTING | XPC_C_CONNECTED);
1552 
1553 	xpc_IPI_send_closerequest(ch, irq_flags);
1554 
1555 	if (channel_was_connected)
1556 		ch->flags |= XPC_C_WASCONNECTED;
1557 
1558 	spin_unlock_irqrestore(&ch->lock, *irq_flags);
1559 
1560 	/* wake all idle kthreads so they can exit */
1561 	if (atomic_read(&ch->kthreads_idle) > 0) {
1562 		wake_up_all(&ch->idle_wq);
1563 
1564 	} else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
1565 		   !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
1566 		/* start a kthread that will do the xpDisconnecting callout */
1567 		xpc_create_kthreads(ch, 1, 1);
1568 	}
1569 
1570 	/* wake those waiting to allocate an entry from the local msg queue */
1571 	if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
1572 		wake_up(&ch->msg_allocate_wq);
1573 
1574 	spin_lock_irqsave(&ch->lock, *irq_flags);
1575 }
1576 
1577 void
1578 xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason)
1579 {
1580 	/*
1581 	 * Let the channel's registerer know that the channel is being
1582 	 * disconnected. We don't want to do this if the registerer was never
1583 	 * informed of a connection being made.
1584 	 */
1585 
1586 	if (ch->func != NULL) {
1587 		dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
1588 			"channel=%d\n", reason, ch->partid, ch->number);
1589 
1590 		ch->func(reason, ch->partid, ch->number, NULL, ch->key);
1591 
1592 		dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
1593 			"channel=%d\n", reason, ch->partid, ch->number);
1594 	}
1595 }
1596 
1597 /*
1598  * Wait for a message entry to become available for the specified channel,
1599  * but don't wait any longer than 1 jiffy.
1600  */
1601 static enum xp_retval
1602 xpc_allocate_msg_wait(struct xpc_channel *ch)
1603 {
1604 	enum xp_retval ret;
1605 
1606 	if (ch->flags & XPC_C_DISCONNECTING) {
1607 		DBUG_ON(ch->reason == xpInterrupted);
1608 		return ch->reason;
1609 	}
1610 
1611 	atomic_inc(&ch->n_on_msg_allocate_wq);
1612 	ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1);
1613 	atomic_dec(&ch->n_on_msg_allocate_wq);
1614 
1615 	if (ch->flags & XPC_C_DISCONNECTING) {
1616 		ret = ch->reason;
1617 		DBUG_ON(ch->reason == xpInterrupted);
1618 	} else if (ret == 0) {
1619 		ret = xpTimeout;
1620 	} else {
1621 		ret = xpInterrupted;
1622 	}
1623 
1624 	return ret;
1625 }
1626 
1627 /*
1628  * Allocate an entry for a message from the message queue associated with the
1629  * specified channel.
1630  */
1631 static enum xp_retval
1632 xpc_allocate_msg(struct xpc_channel *ch, u32 flags,
1633 		 struct xpc_msg **address_of_msg)
1634 {
1635 	struct xpc_msg *msg;
1636 	enum xp_retval ret;
1637 	s64 put;
1638 
1639 	/* this reference will be dropped in xpc_send_msg() */
1640 	xpc_msgqueue_ref(ch);
1641 
1642 	if (ch->flags & XPC_C_DISCONNECTING) {
1643 		xpc_msgqueue_deref(ch);
1644 		return ch->reason;
1645 	}
1646 	if (!(ch->flags & XPC_C_CONNECTED)) {
1647 		xpc_msgqueue_deref(ch);
1648 		return xpNotConnected;
1649 	}
1650 
1651 	/*
1652 	 * Get the next available message entry from the local message queue.
1653 	 * If none are available, we'll make sure that we grab the latest
1654 	 * GP values.
1655 	 */
1656 	ret = xpTimeout;
1657 
1658 	while (1) {
1659 
1660 		put = ch->w_local_GP.put;
1661 		rmb();	/* guarantee that .put loads before .get */
1662 		if (put - ch->w_remote_GP.get < ch->local_nentries) {
1663 
1664 			/* There are available message entries. We need to try
1665 			 * to secure one for ourselves. We'll do this by trying
1666 			 * to increment w_local_GP.put as long as someone else
1667 			 * doesn't beat us to it. If they do, we'll have to
1668 			 * try again.
1669 			 */
1670 			if (cmpxchg(&ch->w_local_GP.put, put, put + 1) == put) {
1671 				/* we got the entry referenced by put */
1672 				break;
1673 			}
1674 			continue;	/* try again */
1675 		}
1676 
1677 		/*
1678 		 * There aren't any available msg entries at this time.
1679 		 *
1680 		 * In waiting for a message entry to become available,
1681 		 * we set a timeout in case the other side is not
1682 		 * sending completion IPIs. This lets us fake an IPI
1683 		 * that will cause the IPI handler to fetch the latest
1684 		 * GP values as if an IPI was sent by the other side.
1685 		 */
1686 		if (ret == xpTimeout)
1687 			xpc_IPI_send_local_msgrequest(ch);
1688 
1689 		if (flags & XPC_NOWAIT) {
1690 			xpc_msgqueue_deref(ch);
1691 			return xpNoWait;
1692 		}
1693 
1694 		ret = xpc_allocate_msg_wait(ch);
1695 		if (ret != xpInterrupted && ret != xpTimeout) {
1696 			xpc_msgqueue_deref(ch);
1697 			return ret;
1698 		}
1699 	}
1700 
1701 	/* get the message's address and initialize it */
1702 	msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
1703 				 (put % ch->local_nentries) * ch->msg_size);
1704 
1705 	DBUG_ON(msg->flags != 0);
1706 	msg->number = put;
1707 
1708 	dev_dbg(xpc_chan, "w_local_GP.put changed to %ld; msg=0x%p, "
1709 		"msg_number=%ld, partid=%d, channel=%d\n", put + 1,
1710 		(void *)msg, msg->number, ch->partid, ch->number);
1711 
1712 	*address_of_msg = msg;
1713 
1714 	return xpSuccess;
1715 }
1716 
1717 /*
1718  * Allocate an entry for a message from the message queue associated with the
1719  * specified channel. NOTE that this routine can sleep waiting for a message
1720  * entry to become available. To not sleep, pass in the XPC_NOWAIT flag.
1721  *
1722  * Arguments:
1723  *
1724  *	partid - ID of partition to which the channel is connected.
1725  *	ch_number - channel #.
1726  *	flags - see xpc.h for valid flags.
1727  *	payload - address of the allocated payload area pointer (filled in on
1728  * 	          return) in which the user-defined message is constructed.
1729  */
1730 enum xp_retval
1731 xpc_initiate_allocate(short partid, int ch_number, u32 flags, void **payload)
1732 {
1733 	struct xpc_partition *part = &xpc_partitions[partid];
1734 	enum xp_retval ret = xpUnknownReason;
1735 	struct xpc_msg *msg = NULL;
1736 
1737 	DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
1738 	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1739 
1740 	*payload = NULL;
1741 
1742 	if (xpc_part_ref(part)) {
1743 		ret = xpc_allocate_msg(&part->channels[ch_number], flags, &msg);
1744 		xpc_part_deref(part);
1745 
1746 		if (msg != NULL)
1747 			*payload = &msg->payload;
1748 	}
1749 
1750 	return ret;
1751 }
1752 
1753 /*
1754  * Now we actually send the messages that are ready to be sent by advancing
1755  * the local message queue's Put value and then send an IPI to the recipient
1756  * partition.
1757  */
1758 static void
1759 xpc_send_msgs(struct xpc_channel *ch, s64 initial_put)
1760 {
1761 	struct xpc_msg *msg;
1762 	s64 put = initial_put + 1;
1763 	int send_IPI = 0;
1764 
1765 	while (1) {
1766 
1767 		while (1) {
1768 			if (put == ch->w_local_GP.put)
1769 				break;
1770 
1771 			msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
1772 						 (put % ch->local_nentries) *
1773 						 ch->msg_size);
1774 
1775 			if (!(msg->flags & XPC_M_READY))
1776 				break;
1777 
1778 			put++;
1779 		}
1780 
1781 		if (put == initial_put) {
1782 			/* nothing's changed */
1783 			break;
1784 		}
1785 
1786 		if (cmpxchg_rel(&ch->local_GP->put, initial_put, put) !=
1787 		    initial_put) {
1788 			/* someone else beat us to it */
1789 			DBUG_ON(ch->local_GP->put < initial_put);
1790 			break;
1791 		}
1792 
1793 		/* we just set the new value of local_GP->put */
1794 
1795 		dev_dbg(xpc_chan, "local_GP->put changed to %ld, partid=%d, "
1796 			"channel=%d\n", put, ch->partid, ch->number);
1797 
1798 		send_IPI = 1;
1799 
1800 		/*
1801 		 * We need to ensure that the message referenced by
1802 		 * local_GP->put is not XPC_M_READY or that local_GP->put
1803 		 * equals w_local_GP.put, so we'll go have a look.
1804 		 */
1805 		initial_put = put;
1806 	}
1807 
1808 	if (send_IPI)
1809 		xpc_IPI_send_msgrequest(ch);
1810 }
1811 
1812 /*
1813  * Common code that does the actual sending of the message by advancing the
1814  * local message queue's Put value and sends an IPI to the partition the
1815  * message is being sent to.
1816  */
1817 static enum xp_retval
1818 xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type,
1819 	     xpc_notify_func func, void *key)
1820 {
1821 	enum xp_retval ret = xpSuccess;
1822 	struct xpc_notify *notify = notify;
1823 	s64 put, msg_number = msg->number;
1824 
1825 	DBUG_ON(notify_type == XPC_N_CALL && func == NULL);
1826 	DBUG_ON((((u64)msg - (u64)ch->local_msgqueue) / ch->msg_size) !=
1827 		msg_number % ch->local_nentries);
1828 	DBUG_ON(msg->flags & XPC_M_READY);
1829 
1830 	if (ch->flags & XPC_C_DISCONNECTING) {
1831 		/* drop the reference grabbed in xpc_allocate_msg() */
1832 		xpc_msgqueue_deref(ch);
1833 		return ch->reason;
1834 	}
1835 
1836 	if (notify_type != 0) {
1837 		/*
1838 		 * Tell the remote side to send an ACK interrupt when the
1839 		 * message has been delivered.
1840 		 */
1841 		msg->flags |= XPC_M_INTERRUPT;
1842 
1843 		atomic_inc(&ch->n_to_notify);
1844 
1845 		notify = &ch->notify_queue[msg_number % ch->local_nentries];
1846 		notify->func = func;
1847 		notify->key = key;
1848 		notify->type = notify_type;
1849 
1850 		/* >>> is a mb() needed here? */
1851 
1852 		if (ch->flags & XPC_C_DISCONNECTING) {
1853 			/*
1854 			 * An error occurred between our last error check and
1855 			 * this one. We will try to clear the type field from
1856 			 * the notify entry. If we succeed then
1857 			 * xpc_disconnect_channel() didn't already process
1858 			 * the notify entry.
1859 			 */
1860 			if (cmpxchg(&notify->type, notify_type, 0) ==
1861 			    notify_type) {
1862 				atomic_dec(&ch->n_to_notify);
1863 				ret = ch->reason;
1864 			}
1865 
1866 			/* drop the reference grabbed in xpc_allocate_msg() */
1867 			xpc_msgqueue_deref(ch);
1868 			return ret;
1869 		}
1870 	}
1871 
1872 	msg->flags |= XPC_M_READY;
1873 
1874 	/*
1875 	 * The preceding store of msg->flags must occur before the following
1876 	 * load of ch->local_GP->put.
1877 	 */
1878 	mb();
1879 
1880 	/* see if the message is next in line to be sent, if so send it */
1881 
1882 	put = ch->local_GP->put;
1883 	if (put == msg_number)
1884 		xpc_send_msgs(ch, put);
1885 
1886 	/* drop the reference grabbed in xpc_allocate_msg() */
1887 	xpc_msgqueue_deref(ch);
1888 	return ret;
1889 }
1890 
1891 /*
1892  * Send a message previously allocated using xpc_initiate_allocate() on the
1893  * specified channel connected to the specified partition.
1894  *
1895  * This routine will not wait for the message to be received, nor will
1896  * notification be given when it does happen. Once this routine has returned
1897  * the message entry allocated via xpc_initiate_allocate() is no longer
1898  * accessable to the caller.
1899  *
1900  * This routine, although called by users, does not call xpc_part_ref() to
1901  * ensure that the partition infrastructure is in place. It relies on the
1902  * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
1903  *
1904  * Arguments:
1905  *
1906  *	partid - ID of partition to which the channel is connected.
1907  *	ch_number - channel # to send message on.
1908  *	payload - pointer to the payload area allocated via
1909  *			xpc_initiate_allocate().
1910  */
1911 enum xp_retval
1912 xpc_initiate_send(short partid, int ch_number, void *payload)
1913 {
1914 	struct xpc_partition *part = &xpc_partitions[partid];
1915 	struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1916 	enum xp_retval ret;
1917 
1918 	dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg,
1919 		partid, ch_number);
1920 
1921 	DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
1922 	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1923 	DBUG_ON(msg == NULL);
1924 
1925 	ret = xpc_send_msg(&part->channels[ch_number], msg, 0, NULL, NULL);
1926 
1927 	return ret;
1928 }
1929 
1930 /*
1931  * Send a message previously allocated using xpc_initiate_allocate on the
1932  * specified channel connected to the specified partition.
1933  *
1934  * This routine will not wait for the message to be sent. Once this routine
1935  * has returned the message entry allocated via xpc_initiate_allocate() is no
1936  * longer accessable to the caller.
1937  *
1938  * Once the remote end of the channel has received the message, the function
1939  * passed as an argument to xpc_initiate_send_notify() will be called. This
1940  * allows the sender to free up or re-use any buffers referenced by the
1941  * message, but does NOT mean the message has been processed at the remote
1942  * end by a receiver.
1943  *
1944  * If this routine returns an error, the caller's function will NOT be called.
1945  *
1946  * This routine, although called by users, does not call xpc_part_ref() to
1947  * ensure that the partition infrastructure is in place. It relies on the
1948  * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
1949  *
1950  * Arguments:
1951  *
1952  *	partid - ID of partition to which the channel is connected.
1953  *	ch_number - channel # to send message on.
1954  *	payload - pointer to the payload area allocated via
1955  *			xpc_initiate_allocate().
1956  *	func - function to call with asynchronous notification of message
1957  *		  receipt. THIS FUNCTION MUST BE NON-BLOCKING.
1958  *	key - user-defined key to be passed to the function when it's called.
1959  */
1960 enum xp_retval
1961 xpc_initiate_send_notify(short partid, int ch_number, void *payload,
1962 			 xpc_notify_func func, void *key)
1963 {
1964 	struct xpc_partition *part = &xpc_partitions[partid];
1965 	struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1966 	enum xp_retval ret;
1967 
1968 	dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg,
1969 		partid, ch_number);
1970 
1971 	DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
1972 	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1973 	DBUG_ON(msg == NULL);
1974 	DBUG_ON(func == NULL);
1975 
1976 	ret = xpc_send_msg(&part->channels[ch_number], msg, XPC_N_CALL,
1977 			   func, key);
1978 	return ret;
1979 }
1980 
1981 static struct xpc_msg *
1982 xpc_pull_remote_msg(struct xpc_channel *ch, s64 get)
1983 {
1984 	struct xpc_partition *part = &xpc_partitions[ch->partid];
1985 	struct xpc_msg *remote_msg, *msg;
1986 	u32 msg_index, nmsgs;
1987 	u64 msg_offset;
1988 	enum xp_retval ret;
1989 
1990 	if (mutex_lock_interruptible(&ch->msg_to_pull_mutex) != 0) {
1991 		/* we were interrupted by a signal */
1992 		return NULL;
1993 	}
1994 
1995 	while (get >= ch->next_msg_to_pull) {
1996 
1997 		/* pull as many messages as are ready and able to be pulled */
1998 
1999 		msg_index = ch->next_msg_to_pull % ch->remote_nentries;
2000 
2001 		DBUG_ON(ch->next_msg_to_pull >= ch->w_remote_GP.put);
2002 		nmsgs = ch->w_remote_GP.put - ch->next_msg_to_pull;
2003 		if (msg_index + nmsgs > ch->remote_nentries) {
2004 			/* ignore the ones that wrap the msg queue for now */
2005 			nmsgs = ch->remote_nentries - msg_index;
2006 		}
2007 
2008 		msg_offset = msg_index * ch->msg_size;
2009 		msg = (struct xpc_msg *)((u64)ch->remote_msgqueue + msg_offset);
2010 		remote_msg = (struct xpc_msg *)(ch->remote_msgqueue_pa +
2011 						msg_offset);
2012 
2013 		ret = xpc_pull_remote_cachelines(part, msg, remote_msg,
2014 						 nmsgs * ch->msg_size);
2015 		if (ret != xpSuccess) {
2016 
2017 			dev_dbg(xpc_chan, "failed to pull %d msgs starting with"
2018 				" msg %ld from partition %d, channel=%d, "
2019 				"ret=%d\n", nmsgs, ch->next_msg_to_pull,
2020 				ch->partid, ch->number, ret);
2021 
2022 			XPC_DEACTIVATE_PARTITION(part, ret);
2023 
2024 			mutex_unlock(&ch->msg_to_pull_mutex);
2025 			return NULL;
2026 		}
2027 
2028 		ch->next_msg_to_pull += nmsgs;
2029 	}
2030 
2031 	mutex_unlock(&ch->msg_to_pull_mutex);
2032 
2033 	/* return the message we were looking for */
2034 	msg_offset = (get % ch->remote_nentries) * ch->msg_size;
2035 	msg = (struct xpc_msg *)((u64)ch->remote_msgqueue + msg_offset);
2036 
2037 	return msg;
2038 }
2039 
2040 /*
2041  * Get a message to be delivered.
2042  */
2043 static struct xpc_msg *
2044 xpc_get_deliverable_msg(struct xpc_channel *ch)
2045 {
2046 	struct xpc_msg *msg = NULL;
2047 	s64 get;
2048 
2049 	do {
2050 		if (ch->flags & XPC_C_DISCONNECTING)
2051 			break;
2052 
2053 		get = ch->w_local_GP.get;
2054 		rmb();	/* guarantee that .get loads before .put */
2055 		if (get == ch->w_remote_GP.put)
2056 			break;
2057 
2058 		/* There are messages waiting to be pulled and delivered.
2059 		 * We need to try to secure one for ourselves. We'll do this
2060 		 * by trying to increment w_local_GP.get and hope that no one
2061 		 * else beats us to it. If they do, we'll we'll simply have
2062 		 * to try again for the next one.
2063 		 */
2064 
2065 		if (cmpxchg(&ch->w_local_GP.get, get, get + 1) == get) {
2066 			/* we got the entry referenced by get */
2067 
2068 			dev_dbg(xpc_chan, "w_local_GP.get changed to %ld, "
2069 				"partid=%d, channel=%d\n", get + 1,
2070 				ch->partid, ch->number);
2071 
2072 			/* pull the message from the remote partition */
2073 
2074 			msg = xpc_pull_remote_msg(ch, get);
2075 
2076 			DBUG_ON(msg != NULL && msg->number != get);
2077 			DBUG_ON(msg != NULL && (msg->flags & XPC_M_DONE));
2078 			DBUG_ON(msg != NULL && !(msg->flags & XPC_M_READY));
2079 
2080 			break;
2081 		}
2082 
2083 	} while (1);
2084 
2085 	return msg;
2086 }
2087 
2088 /*
2089  * Deliver a message to its intended recipient.
2090  */
2091 void
2092 xpc_deliver_msg(struct xpc_channel *ch)
2093 {
2094 	struct xpc_msg *msg;
2095 
2096 	msg = xpc_get_deliverable_msg(ch);
2097 	if (msg != NULL) {
2098 
2099 		/*
2100 		 * This ref is taken to protect the payload itself from being
2101 		 * freed before the user is finished with it, which the user
2102 		 * indicates by calling xpc_initiate_received().
2103 		 */
2104 		xpc_msgqueue_ref(ch);
2105 
2106 		atomic_inc(&ch->kthreads_active);
2107 
2108 		if (ch->func != NULL) {
2109 			dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, "
2110 				"msg_number=%ld, partid=%d, channel=%d\n",
2111 				(void *)msg, msg->number, ch->partid,
2112 				ch->number);
2113 
2114 			/* deliver the message to its intended recipient */
2115 			ch->func(xpMsgReceived, ch->partid, ch->number,
2116 				 &msg->payload, ch->key);
2117 
2118 			dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, "
2119 				"msg_number=%ld, partid=%d, channel=%d\n",
2120 				(void *)msg, msg->number, ch->partid,
2121 				ch->number);
2122 		}
2123 
2124 		atomic_dec(&ch->kthreads_active);
2125 	}
2126 }
2127 
2128 /*
2129  * Now we actually acknowledge the messages that have been delivered and ack'd
2130  * by advancing the cached remote message queue's Get value and if requested
2131  * send an IPI to the message sender's partition.
2132  */
2133 static void
2134 xpc_acknowledge_msgs(struct xpc_channel *ch, s64 initial_get, u8 msg_flags)
2135 {
2136 	struct xpc_msg *msg;
2137 	s64 get = initial_get + 1;
2138 	int send_IPI = 0;
2139 
2140 	while (1) {
2141 
2142 		while (1) {
2143 			if (get == ch->w_local_GP.get)
2144 				break;
2145 
2146 			msg = (struct xpc_msg *)((u64)ch->remote_msgqueue +
2147 						 (get % ch->remote_nentries) *
2148 						 ch->msg_size);
2149 
2150 			if (!(msg->flags & XPC_M_DONE))
2151 				break;
2152 
2153 			msg_flags |= msg->flags;
2154 			get++;
2155 		}
2156 
2157 		if (get == initial_get) {
2158 			/* nothing's changed */
2159 			break;
2160 		}
2161 
2162 		if (cmpxchg_rel(&ch->local_GP->get, initial_get, get) !=
2163 		    initial_get) {
2164 			/* someone else beat us to it */
2165 			DBUG_ON(ch->local_GP->get <= initial_get);
2166 			break;
2167 		}
2168 
2169 		/* we just set the new value of local_GP->get */
2170 
2171 		dev_dbg(xpc_chan, "local_GP->get changed to %ld, partid=%d, "
2172 			"channel=%d\n", get, ch->partid, ch->number);
2173 
2174 		send_IPI = (msg_flags & XPC_M_INTERRUPT);
2175 
2176 		/*
2177 		 * We need to ensure that the message referenced by
2178 		 * local_GP->get is not XPC_M_DONE or that local_GP->get
2179 		 * equals w_local_GP.get, so we'll go have a look.
2180 		 */
2181 		initial_get = get;
2182 	}
2183 
2184 	if (send_IPI)
2185 		xpc_IPI_send_msgrequest(ch);
2186 }
2187 
2188 /*
2189  * Acknowledge receipt of a delivered message.
2190  *
2191  * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
2192  * that sent the message.
2193  *
2194  * This function, although called by users, does not call xpc_part_ref() to
2195  * ensure that the partition infrastructure is in place. It relies on the
2196  * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
2197  *
2198  * Arguments:
2199  *
2200  *	partid - ID of partition to which the channel is connected.
2201  *	ch_number - channel # message received on.
2202  *	payload - pointer to the payload area allocated via
2203  *			xpc_initiate_allocate().
2204  */
2205 void
2206 xpc_initiate_received(short partid, int ch_number, void *payload)
2207 {
2208 	struct xpc_partition *part = &xpc_partitions[partid];
2209 	struct xpc_channel *ch;
2210 	struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
2211 	s64 get, msg_number = msg->number;
2212 
2213 	DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
2214 	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
2215 
2216 	ch = &part->channels[ch_number];
2217 
2218 	dev_dbg(xpc_chan, "msg=0x%p, msg_number=%ld, partid=%d, channel=%d\n",
2219 		(void *)msg, msg_number, ch->partid, ch->number);
2220 
2221 	DBUG_ON((((u64)msg - (u64)ch->remote_msgqueue) / ch->msg_size) !=
2222 		msg_number % ch->remote_nentries);
2223 	DBUG_ON(msg->flags & XPC_M_DONE);
2224 
2225 	msg->flags |= XPC_M_DONE;
2226 
2227 	/*
2228 	 * The preceding store of msg->flags must occur before the following
2229 	 * load of ch->local_GP->get.
2230 	 */
2231 	mb();
2232 
2233 	/*
2234 	 * See if this message is next in line to be acknowledged as having
2235 	 * been delivered.
2236 	 */
2237 	get = ch->local_GP->get;
2238 	if (get == msg_number)
2239 		xpc_acknowledge_msgs(ch, get, msg->flags);
2240 
2241 	/* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg()  */
2242 	xpc_msgqueue_deref(ch);
2243 }
2244