1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2004-2008 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8 
9 /*
10  * Cross Partition Communication (XPC) channel support.
11  *
12  *	This is the part of XPC that manages the channels and
13  *	sends/receives messages across them to/from other partitions.
14  *
15  */
16 
17 #include <linux/kernel.h>
18 #include <linux/init.h>
19 #include <linux/sched.h>
20 #include <linux/cache.h>
21 #include <linux/interrupt.h>
22 #include <linux/mutex.h>
23 #include <linux/completion.h>
24 #include <asm/sn/sn_sal.h>
25 #include "xpc.h"
26 
27 /*
28  * Guarantee that the kzalloc'd memory is cacheline aligned.
29  */
30 void *
31 xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base)
32 {
33 	/* see if kzalloc will give us cachline aligned memory by default */
34 	*base = kzalloc(size, flags);
35 	if (*base == NULL)
36 		return NULL;
37 
38 	if ((u64)*base == L1_CACHE_ALIGN((u64)*base))
39 		return *base;
40 
41 	kfree(*base);
42 
43 	/* nope, we'll have to do it ourselves */
44 	*base = kzalloc(size + L1_CACHE_BYTES, flags);
45 	if (*base == NULL)
46 		return NULL;
47 
48 	return (void *)L1_CACHE_ALIGN((u64)*base);
49 }
50 
51 /*
52  * Allocate the local message queue and the notify queue.
53  */
54 static enum xp_retval
55 xpc_allocate_local_msgqueue(struct xpc_channel *ch)
56 {
57 	unsigned long irq_flags;
58 	int nentries;
59 	size_t nbytes;
60 
61 	for (nentries = ch->local_nentries; nentries > 0; nentries--) {
62 
63 		nbytes = nentries * ch->msg_size;
64 		ch->local_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
65 								   GFP_KERNEL,
66 						      &ch->local_msgqueue_base);
67 		if (ch->local_msgqueue == NULL)
68 			continue;
69 
70 		nbytes = nentries * sizeof(struct xpc_notify);
71 		ch->notify_queue = kzalloc(nbytes, GFP_KERNEL);
72 		if (ch->notify_queue == NULL) {
73 			kfree(ch->local_msgqueue_base);
74 			ch->local_msgqueue = NULL;
75 			continue;
76 		}
77 
78 		spin_lock_irqsave(&ch->lock, irq_flags);
79 		if (nentries < ch->local_nentries) {
80 			dev_dbg(xpc_chan, "nentries=%d local_nentries=%d, "
81 				"partid=%d, channel=%d\n", nentries,
82 				ch->local_nentries, ch->partid, ch->number);
83 
84 			ch->local_nentries = nentries;
85 		}
86 		spin_unlock_irqrestore(&ch->lock, irq_flags);
87 		return xpSuccess;
88 	}
89 
90 	dev_dbg(xpc_chan, "can't get memory for local message queue and notify "
91 		"queue, partid=%d, channel=%d\n", ch->partid, ch->number);
92 	return xpNoMemory;
93 }
94 
95 /*
96  * Allocate the cached remote message queue.
97  */
98 static enum xp_retval
99 xpc_allocate_remote_msgqueue(struct xpc_channel *ch)
100 {
101 	unsigned long irq_flags;
102 	int nentries;
103 	size_t nbytes;
104 
105 	DBUG_ON(ch->remote_nentries <= 0);
106 
107 	for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
108 
109 		nbytes = nentries * ch->msg_size;
110 		ch->remote_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes,
111 								    GFP_KERNEL,
112 						     &ch->remote_msgqueue_base);
113 		if (ch->remote_msgqueue == NULL)
114 			continue;
115 
116 		spin_lock_irqsave(&ch->lock, irq_flags);
117 		if (nentries < ch->remote_nentries) {
118 			dev_dbg(xpc_chan, "nentries=%d remote_nentries=%d, "
119 				"partid=%d, channel=%d\n", nentries,
120 				ch->remote_nentries, ch->partid, ch->number);
121 
122 			ch->remote_nentries = nentries;
123 		}
124 		spin_unlock_irqrestore(&ch->lock, irq_flags);
125 		return xpSuccess;
126 	}
127 
128 	dev_dbg(xpc_chan, "can't get memory for cached remote message queue, "
129 		"partid=%d, channel=%d\n", ch->partid, ch->number);
130 	return xpNoMemory;
131 }
132 
133 /*
134  * Allocate message queues and other stuff associated with a channel.
135  *
136  * Note: Assumes all of the channel sizes are filled in.
137  */
138 static enum xp_retval
139 xpc_allocate_msgqueues(struct xpc_channel *ch)
140 {
141 	unsigned long irq_flags;
142 	enum xp_retval ret;
143 
144 	DBUG_ON(ch->flags & XPC_C_SETUP);
145 
146 	ret = xpc_allocate_local_msgqueue(ch);
147 	if (ret != xpSuccess)
148 		return ret;
149 
150 	ret = xpc_allocate_remote_msgqueue(ch);
151 	if (ret != xpSuccess) {
152 		kfree(ch->local_msgqueue_base);
153 		ch->local_msgqueue = NULL;
154 		kfree(ch->notify_queue);
155 		ch->notify_queue = NULL;
156 		return ret;
157 	}
158 
159 	spin_lock_irqsave(&ch->lock, irq_flags);
160 	ch->flags |= XPC_C_SETUP;
161 	spin_unlock_irqrestore(&ch->lock, irq_flags);
162 
163 	return xpSuccess;
164 }
165 
166 /*
167  * Process a connect message from a remote partition.
168  *
169  * Note: xpc_process_connect() is expecting to be called with the
170  * spin_lock_irqsave held and will leave it locked upon return.
171  */
172 static void
173 xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
174 {
175 	enum xp_retval ret;
176 
177 	DBUG_ON(!spin_is_locked(&ch->lock));
178 
179 	if (!(ch->flags & XPC_C_OPENREQUEST) ||
180 	    !(ch->flags & XPC_C_ROPENREQUEST)) {
181 		/* nothing more to do for now */
182 		return;
183 	}
184 	DBUG_ON(!(ch->flags & XPC_C_CONNECTING));
185 
186 	if (!(ch->flags & XPC_C_SETUP)) {
187 		spin_unlock_irqrestore(&ch->lock, *irq_flags);
188 		ret = xpc_allocate_msgqueues(ch);
189 		spin_lock_irqsave(&ch->lock, *irq_flags);
190 
191 		if (ret != xpSuccess)
192 			XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
193 
194 		if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING))
195 			return;
196 
197 		DBUG_ON(!(ch->flags & XPC_C_SETUP));
198 		DBUG_ON(ch->local_msgqueue == NULL);
199 		DBUG_ON(ch->remote_msgqueue == NULL);
200 	}
201 
202 	if (!(ch->flags & XPC_C_OPENREPLY)) {
203 		ch->flags |= XPC_C_OPENREPLY;
204 		xpc_IPI_send_openreply(ch, irq_flags);
205 	}
206 
207 	if (!(ch->flags & XPC_C_ROPENREPLY))
208 		return;
209 
210 	DBUG_ON(ch->remote_msgqueue_pa == 0);
211 
212 	ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP);	/* clear all else */
213 
214 	dev_info(xpc_chan, "channel %d to partition %d connected\n",
215 		 ch->number, ch->partid);
216 
217 	spin_unlock_irqrestore(&ch->lock, *irq_flags);
218 	xpc_create_kthreads(ch, 1, 0);
219 	spin_lock_irqsave(&ch->lock, *irq_flags);
220 }
221 
222 /*
223  * Notify those who wanted to be notified upon delivery of their message.
224  */
225 static void
226 xpc_notify_senders(struct xpc_channel *ch, enum xp_retval reason, s64 put)
227 {
228 	struct xpc_notify *notify;
229 	u8 notify_type;
230 	s64 get = ch->w_remote_GP.get - 1;
231 
232 	while (++get < put && atomic_read(&ch->n_to_notify) > 0) {
233 
234 		notify = &ch->notify_queue[get % ch->local_nentries];
235 
236 		/*
237 		 * See if the notify entry indicates it was associated with
238 		 * a message who's sender wants to be notified. It is possible
239 		 * that it is, but someone else is doing or has done the
240 		 * notification.
241 		 */
242 		notify_type = notify->type;
243 		if (notify_type == 0 ||
244 		    cmpxchg(&notify->type, notify_type, 0) != notify_type) {
245 			continue;
246 		}
247 
248 		DBUG_ON(notify_type != XPC_N_CALL);
249 
250 		atomic_dec(&ch->n_to_notify);
251 
252 		if (notify->func != NULL) {
253 			dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, "
254 				"msg_number=%ld, partid=%d, channel=%d\n",
255 				(void *)notify, get, ch->partid, ch->number);
256 
257 			notify->func(reason, ch->partid, ch->number,
258 				     notify->key);
259 
260 			dev_dbg(xpc_chan, "notify->func() returned, "
261 				"notify=0x%p, msg_number=%ld, partid=%d, "
262 				"channel=%d\n", (void *)notify, get,
263 				ch->partid, ch->number);
264 		}
265 	}
266 }
267 
268 /*
269  * Free up message queues and other stuff that were allocated for the specified
270  * channel.
271  *
272  * Note: ch->reason and ch->reason_line are left set for debugging purposes,
273  * they're cleared when XPC_C_DISCONNECTED is cleared.
274  */
275 static void
276 xpc_free_msgqueues(struct xpc_channel *ch)
277 {
278 	DBUG_ON(!spin_is_locked(&ch->lock));
279 	DBUG_ON(atomic_read(&ch->n_to_notify) != 0);
280 
281 	ch->remote_msgqueue_pa = 0;
282 	ch->func = NULL;
283 	ch->key = NULL;
284 	ch->msg_size = 0;
285 	ch->local_nentries = 0;
286 	ch->remote_nentries = 0;
287 	ch->kthreads_assigned_limit = 0;
288 	ch->kthreads_idle_limit = 0;
289 
290 	ch->local_GP->get = 0;
291 	ch->local_GP->put = 0;
292 	ch->remote_GP.get = 0;
293 	ch->remote_GP.put = 0;
294 	ch->w_local_GP.get = 0;
295 	ch->w_local_GP.put = 0;
296 	ch->w_remote_GP.get = 0;
297 	ch->w_remote_GP.put = 0;
298 	ch->next_msg_to_pull = 0;
299 
300 	if (ch->flags & XPC_C_SETUP) {
301 		ch->flags &= ~XPC_C_SETUP;
302 
303 		dev_dbg(xpc_chan, "ch->flags=0x%x, partid=%d, channel=%d\n",
304 			ch->flags, ch->partid, ch->number);
305 
306 		kfree(ch->local_msgqueue_base);
307 		ch->local_msgqueue = NULL;
308 		kfree(ch->remote_msgqueue_base);
309 		ch->remote_msgqueue = NULL;
310 		kfree(ch->notify_queue);
311 		ch->notify_queue = NULL;
312 	}
313 }
314 
315 /*
316  * spin_lock_irqsave() is expected to be held on entry.
317  */
318 static void
319 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
320 {
321 	struct xpc_partition *part = &xpc_partitions[ch->partid];
322 	u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
323 
324 	DBUG_ON(!spin_is_locked(&ch->lock));
325 
326 	if (!(ch->flags & XPC_C_DISCONNECTING))
327 		return;
328 
329 	DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
330 
331 	/* make sure all activity has settled down first */
332 
333 	if (atomic_read(&ch->kthreads_assigned) > 0 ||
334 	    atomic_read(&ch->references) > 0) {
335 		return;
336 	}
337 	DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
338 		!(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE));
339 
340 	if (part->act_state == XPC_P_DEACTIVATING) {
341 		/* can't proceed until the other side disengages from us */
342 		if (xpc_partition_engaged(1UL << ch->partid))
343 			return;
344 
345 	} else {
346 
347 		/* as long as the other side is up do the full protocol */
348 
349 		if (!(ch->flags & XPC_C_RCLOSEREQUEST))
350 			return;
351 
352 		if (!(ch->flags & XPC_C_CLOSEREPLY)) {
353 			ch->flags |= XPC_C_CLOSEREPLY;
354 			xpc_IPI_send_closereply(ch, irq_flags);
355 		}
356 
357 		if (!(ch->flags & XPC_C_RCLOSEREPLY))
358 			return;
359 	}
360 
361 	/* wake those waiting for notify completion */
362 	if (atomic_read(&ch->n_to_notify) > 0) {
363 		/* >>> we do callout while holding ch->lock */
364 		xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put);
365 	}
366 
367 	/* both sides are disconnected now */
368 
369 	if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
370 		spin_unlock_irqrestore(&ch->lock, *irq_flags);
371 		xpc_disconnect_callout(ch, xpDisconnected);
372 		spin_lock_irqsave(&ch->lock, *irq_flags);
373 	}
374 
375 	/* it's now safe to free the channel's message queues */
376 	xpc_free_msgqueues(ch);
377 
378 	/* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
379 	ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
380 
381 	atomic_dec(&part->nchannels_active);
382 
383 	if (channel_was_connected) {
384 		dev_info(xpc_chan, "channel %d to partition %d disconnected, "
385 			 "reason=%d\n", ch->number, ch->partid, ch->reason);
386 	}
387 
388 	if (ch->flags & XPC_C_WDISCONNECT) {
389 		/* we won't lose the CPU since we're holding ch->lock */
390 		complete(&ch->wdisconnect_wait);
391 	} else if (ch->delayed_IPI_flags) {
392 		if (part->act_state != XPC_P_DEACTIVATING) {
393 			/* time to take action on any delayed IPI flags */
394 			spin_lock(&part->IPI_lock);
395 			XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number,
396 					  ch->delayed_IPI_flags);
397 			spin_unlock(&part->IPI_lock);
398 		}
399 		ch->delayed_IPI_flags = 0;
400 	}
401 }
402 
403 /*
404  * Process a change in the channel's remote connection state.
405  */
406 static void
407 xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
408 			  u8 IPI_flags)
409 {
410 	unsigned long irq_flags;
411 	struct xpc_openclose_args *args =
412 	    &part->remote_openclose_args[ch_number];
413 	struct xpc_channel *ch = &part->channels[ch_number];
414 	enum xp_retval reason;
415 
416 	spin_lock_irqsave(&ch->lock, irq_flags);
417 
418 again:
419 
420 	if ((ch->flags & XPC_C_DISCONNECTED) &&
421 	    (ch->flags & XPC_C_WDISCONNECT)) {
422 		/*
423 		 * Delay processing IPI flags until thread waiting disconnect
424 		 * has had a chance to see that the channel is disconnected.
425 		 */
426 		ch->delayed_IPI_flags |= IPI_flags;
427 		spin_unlock_irqrestore(&ch->lock, irq_flags);
428 		return;
429 	}
430 
431 	if (IPI_flags & XPC_IPI_CLOSEREQUEST) {
432 
433 		dev_dbg(xpc_chan, "XPC_IPI_CLOSEREQUEST (reason=%d) received "
434 			"from partid=%d, channel=%d\n", args->reason,
435 			ch->partid, ch->number);
436 
437 		/*
438 		 * If RCLOSEREQUEST is set, we're probably waiting for
439 		 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
440 		 * with this RCLOSEREQUEST in the IPI_flags.
441 		 */
442 
443 		if (ch->flags & XPC_C_RCLOSEREQUEST) {
444 			DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
445 			DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
446 			DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
447 			DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);
448 
449 			DBUG_ON(!(IPI_flags & XPC_IPI_CLOSEREPLY));
450 			IPI_flags &= ~XPC_IPI_CLOSEREPLY;
451 			ch->flags |= XPC_C_RCLOSEREPLY;
452 
453 			/* both sides have finished disconnecting */
454 			xpc_process_disconnect(ch, &irq_flags);
455 			DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
456 			goto again;
457 		}
458 
459 		if (ch->flags & XPC_C_DISCONNECTED) {
460 			if (!(IPI_flags & XPC_IPI_OPENREQUEST)) {
461 				if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo,
462 						       ch_number) &
463 				     XPC_IPI_OPENREQUEST)) {
464 
465 					DBUG_ON(ch->delayed_IPI_flags != 0);
466 					spin_lock(&part->IPI_lock);
467 					XPC_SET_IPI_FLAGS(part->local_IPI_amo,
468 							  ch_number,
469 							  XPC_IPI_CLOSEREQUEST);
470 					spin_unlock(&part->IPI_lock);
471 				}
472 				spin_unlock_irqrestore(&ch->lock, irq_flags);
473 				return;
474 			}
475 
476 			XPC_SET_REASON(ch, 0, 0);
477 			ch->flags &= ~XPC_C_DISCONNECTED;
478 
479 			atomic_inc(&part->nchannels_active);
480 			ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
481 		}
482 
483 		IPI_flags &= ~(XPC_IPI_OPENREQUEST | XPC_IPI_OPENREPLY);
484 
485 		/*
486 		 * The meaningful CLOSEREQUEST connection state fields are:
487 		 *      reason = reason connection is to be closed
488 		 */
489 
490 		ch->flags |= XPC_C_RCLOSEREQUEST;
491 
492 		if (!(ch->flags & XPC_C_DISCONNECTING)) {
493 			reason = args->reason;
494 			if (reason <= xpSuccess || reason > xpUnknownReason)
495 				reason = xpUnknownReason;
496 			else if (reason == xpUnregistering)
497 				reason = xpOtherUnregistering;
498 
499 			XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
500 
501 			DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY);
502 			spin_unlock_irqrestore(&ch->lock, irq_flags);
503 			return;
504 		}
505 
506 		xpc_process_disconnect(ch, &irq_flags);
507 	}
508 
509 	if (IPI_flags & XPC_IPI_CLOSEREPLY) {
510 
511 		dev_dbg(xpc_chan, "XPC_IPI_CLOSEREPLY received from partid=%d,"
512 			" channel=%d\n", ch->partid, ch->number);
513 
514 		if (ch->flags & XPC_C_DISCONNECTED) {
515 			DBUG_ON(part->act_state != XPC_P_DEACTIVATING);
516 			spin_unlock_irqrestore(&ch->lock, irq_flags);
517 			return;
518 		}
519 
520 		DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
521 
522 		if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
523 			if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number)
524 			     & XPC_IPI_CLOSEREQUEST)) {
525 
526 				DBUG_ON(ch->delayed_IPI_flags != 0);
527 				spin_lock(&part->IPI_lock);
528 				XPC_SET_IPI_FLAGS(part->local_IPI_amo,
529 						  ch_number,
530 						  XPC_IPI_CLOSEREPLY);
531 				spin_unlock(&part->IPI_lock);
532 			}
533 			spin_unlock_irqrestore(&ch->lock, irq_flags);
534 			return;
535 		}
536 
537 		ch->flags |= XPC_C_RCLOSEREPLY;
538 
539 		if (ch->flags & XPC_C_CLOSEREPLY) {
540 			/* both sides have finished disconnecting */
541 			xpc_process_disconnect(ch, &irq_flags);
542 		}
543 	}
544 
545 	if (IPI_flags & XPC_IPI_OPENREQUEST) {
546 
547 		dev_dbg(xpc_chan, "XPC_IPI_OPENREQUEST (msg_size=%d, "
548 			"local_nentries=%d) received from partid=%d, "
549 			"channel=%d\n", args->msg_size, args->local_nentries,
550 			ch->partid, ch->number);
551 
552 		if (part->act_state == XPC_P_DEACTIVATING ||
553 		    (ch->flags & XPC_C_ROPENREQUEST)) {
554 			spin_unlock_irqrestore(&ch->lock, irq_flags);
555 			return;
556 		}
557 
558 		if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
559 			ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST;
560 			spin_unlock_irqrestore(&ch->lock, irq_flags);
561 			return;
562 		}
563 		DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
564 				       XPC_C_OPENREQUEST)));
565 		DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
566 				     XPC_C_OPENREPLY | XPC_C_CONNECTED));
567 
568 		/*
569 		 * The meaningful OPENREQUEST connection state fields are:
570 		 *      msg_size = size of channel's messages in bytes
571 		 *      local_nentries = remote partition's local_nentries
572 		 */
573 		if (args->msg_size == 0 || args->local_nentries == 0) {
574 			/* assume OPENREQUEST was delayed by mistake */
575 			spin_unlock_irqrestore(&ch->lock, irq_flags);
576 			return;
577 		}
578 
579 		ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
580 		ch->remote_nentries = args->local_nentries;
581 
582 		if (ch->flags & XPC_C_OPENREQUEST) {
583 			if (args->msg_size != ch->msg_size) {
584 				XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
585 						       &irq_flags);
586 				spin_unlock_irqrestore(&ch->lock, irq_flags);
587 				return;
588 			}
589 		} else {
590 			ch->msg_size = args->msg_size;
591 
592 			XPC_SET_REASON(ch, 0, 0);
593 			ch->flags &= ~XPC_C_DISCONNECTED;
594 
595 			atomic_inc(&part->nchannels_active);
596 		}
597 
598 		xpc_process_connect(ch, &irq_flags);
599 	}
600 
601 	if (IPI_flags & XPC_IPI_OPENREPLY) {
602 
603 		dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, "
604 			"local_nentries=%d, remote_nentries=%d) received from "
605 			"partid=%d, channel=%d\n", args->local_msgqueue_pa,
606 			args->local_nentries, args->remote_nentries,
607 			ch->partid, ch->number);
608 
609 		if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
610 			spin_unlock_irqrestore(&ch->lock, irq_flags);
611 			return;
612 		}
613 		if (!(ch->flags & XPC_C_OPENREQUEST)) {
614 			XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError,
615 					       &irq_flags);
616 			spin_unlock_irqrestore(&ch->lock, irq_flags);
617 			return;
618 		}
619 
620 		DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
621 		DBUG_ON(ch->flags & XPC_C_CONNECTED);
622 
623 		/*
624 		 * The meaningful OPENREPLY connection state fields are:
625 		 *      local_msgqueue_pa = physical address of remote
626 		 *                          partition's local_msgqueue
627 		 *      local_nentries = remote partition's local_nentries
628 		 *      remote_nentries = remote partition's remote_nentries
629 		 */
630 		DBUG_ON(args->local_msgqueue_pa == 0);
631 		DBUG_ON(args->local_nentries == 0);
632 		DBUG_ON(args->remote_nentries == 0);
633 
634 		ch->flags |= XPC_C_ROPENREPLY;
635 		ch->remote_msgqueue_pa = args->local_msgqueue_pa;
636 
637 		if (args->local_nentries < ch->remote_nentries) {
638 			dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
639 				"remote_nentries=%d, old remote_nentries=%d, "
640 				"partid=%d, channel=%d\n",
641 				args->local_nentries, ch->remote_nentries,
642 				ch->partid, ch->number);
643 
644 			ch->remote_nentries = args->local_nentries;
645 		}
646 		if (args->remote_nentries < ch->local_nentries) {
647 			dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
648 				"local_nentries=%d, old local_nentries=%d, "
649 				"partid=%d, channel=%d\n",
650 				args->remote_nentries, ch->local_nentries,
651 				ch->partid, ch->number);
652 
653 			ch->local_nentries = args->remote_nentries;
654 		}
655 
656 		xpc_process_connect(ch, &irq_flags);
657 	}
658 
659 	spin_unlock_irqrestore(&ch->lock, irq_flags);
660 }
661 
662 /*
663  * Attempt to establish a channel connection to a remote partition.
664  */
665 static enum xp_retval
666 xpc_connect_channel(struct xpc_channel *ch)
667 {
668 	unsigned long irq_flags;
669 	struct xpc_registration *registration = &xpc_registrations[ch->number];
670 
671 	if (mutex_trylock(&registration->mutex) == 0)
672 		return xpRetry;
673 
674 	if (!XPC_CHANNEL_REGISTERED(ch->number)) {
675 		mutex_unlock(&registration->mutex);
676 		return xpUnregistered;
677 	}
678 
679 	spin_lock_irqsave(&ch->lock, irq_flags);
680 
681 	DBUG_ON(ch->flags & XPC_C_CONNECTED);
682 	DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
683 
684 	if (ch->flags & XPC_C_DISCONNECTING) {
685 		spin_unlock_irqrestore(&ch->lock, irq_flags);
686 		mutex_unlock(&registration->mutex);
687 		return ch->reason;
688 	}
689 
690 	/* add info from the channel connect registration to the channel */
691 
692 	ch->kthreads_assigned_limit = registration->assigned_limit;
693 	ch->kthreads_idle_limit = registration->idle_limit;
694 	DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
695 	DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
696 	DBUG_ON(atomic_read(&ch->kthreads_active) != 0);
697 
698 	ch->func = registration->func;
699 	DBUG_ON(registration->func == NULL);
700 	ch->key = registration->key;
701 
702 	ch->local_nentries = registration->nentries;
703 
704 	if (ch->flags & XPC_C_ROPENREQUEST) {
705 		if (registration->msg_size != ch->msg_size) {
706 			/* the local and remote sides aren't the same */
707 
708 			/*
709 			 * Because XPC_DISCONNECT_CHANNEL() can block we're
710 			 * forced to up the registration sema before we unlock
711 			 * the channel lock. But that's okay here because we're
712 			 * done with the part that required the registration
713 			 * sema. XPC_DISCONNECT_CHANNEL() requires that the
714 			 * channel lock be locked and will unlock and relock
715 			 * the channel lock as needed.
716 			 */
717 			mutex_unlock(&registration->mutex);
718 			XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
719 					       &irq_flags);
720 			spin_unlock_irqrestore(&ch->lock, irq_flags);
721 			return xpUnequalMsgSizes;
722 		}
723 	} else {
724 		ch->msg_size = registration->msg_size;
725 
726 		XPC_SET_REASON(ch, 0, 0);
727 		ch->flags &= ~XPC_C_DISCONNECTED;
728 
729 		atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
730 	}
731 
732 	mutex_unlock(&registration->mutex);
733 
734 	/* initiate the connection */
735 
736 	ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
737 	xpc_IPI_send_openrequest(ch, &irq_flags);
738 
739 	xpc_process_connect(ch, &irq_flags);
740 
741 	spin_unlock_irqrestore(&ch->lock, irq_flags);
742 
743 	return xpSuccess;
744 }
745 
746 /*
747  * Clear some of the msg flags in the local message queue.
748  */
749 static inline void
750 xpc_clear_local_msgqueue_flags(struct xpc_channel *ch)
751 {
752 	struct xpc_msg *msg;
753 	s64 get;
754 
755 	get = ch->w_remote_GP.get;
756 	do {
757 		msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
758 					 (get % ch->local_nentries) *
759 					 ch->msg_size);
760 		msg->flags = 0;
761 	} while (++get < ch->remote_GP.get);
762 }
763 
764 /*
765  * Clear some of the msg flags in the remote message queue.
766  */
767 static inline void
768 xpc_clear_remote_msgqueue_flags(struct xpc_channel *ch)
769 {
770 	struct xpc_msg *msg;
771 	s64 put;
772 
773 	put = ch->w_remote_GP.put;
774 	do {
775 		msg = (struct xpc_msg *)((u64)ch->remote_msgqueue +
776 					 (put % ch->remote_nentries) *
777 					 ch->msg_size);
778 		msg->flags = 0;
779 	} while (++put < ch->remote_GP.put);
780 }
781 
782 static void
783 xpc_process_msg_IPI(struct xpc_partition *part, int ch_number)
784 {
785 	struct xpc_channel *ch = &part->channels[ch_number];
786 	int nmsgs_sent;
787 
788 	ch->remote_GP = part->remote_GPs[ch_number];
789 
790 	/* See what, if anything, has changed for each connected channel */
791 
792 	xpc_msgqueue_ref(ch);
793 
794 	if (ch->w_remote_GP.get == ch->remote_GP.get &&
795 	    ch->w_remote_GP.put == ch->remote_GP.put) {
796 		/* nothing changed since GPs were last pulled */
797 		xpc_msgqueue_deref(ch);
798 		return;
799 	}
800 
801 	if (!(ch->flags & XPC_C_CONNECTED)) {
802 		xpc_msgqueue_deref(ch);
803 		return;
804 	}
805 
806 	/*
807 	 * First check to see if messages recently sent by us have been
808 	 * received by the other side. (The remote GET value will have
809 	 * changed since we last looked at it.)
810 	 */
811 
812 	if (ch->w_remote_GP.get != ch->remote_GP.get) {
813 
814 		/*
815 		 * We need to notify any senders that want to be notified
816 		 * that their sent messages have been received by their
817 		 * intended recipients. We need to do this before updating
818 		 * w_remote_GP.get so that we don't allocate the same message
819 		 * queue entries prematurely (see xpc_allocate_msg()).
820 		 */
821 		if (atomic_read(&ch->n_to_notify) > 0) {
822 			/*
823 			 * Notify senders that messages sent have been
824 			 * received and delivered by the other side.
825 			 */
826 			xpc_notify_senders(ch, xpMsgDelivered,
827 					   ch->remote_GP.get);
828 		}
829 
830 		/*
831 		 * Clear msg->flags in previously sent messages, so that
832 		 * they're ready for xpc_allocate_msg().
833 		 */
834 		xpc_clear_local_msgqueue_flags(ch);
835 
836 		ch->w_remote_GP.get = ch->remote_GP.get;
837 
838 		dev_dbg(xpc_chan, "w_remote_GP.get changed to %ld, partid=%d, "
839 			"channel=%d\n", ch->w_remote_GP.get, ch->partid,
840 			ch->number);
841 
842 		/*
843 		 * If anyone was waiting for message queue entries to become
844 		 * available, wake them up.
845 		 */
846 		if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
847 			wake_up(&ch->msg_allocate_wq);
848 	}
849 
850 	/*
851 	 * Now check for newly sent messages by the other side. (The remote
852 	 * PUT value will have changed since we last looked at it.)
853 	 */
854 
855 	if (ch->w_remote_GP.put != ch->remote_GP.put) {
856 		/*
857 		 * Clear msg->flags in previously received messages, so that
858 		 * they're ready for xpc_get_deliverable_msg().
859 		 */
860 		xpc_clear_remote_msgqueue_flags(ch);
861 
862 		ch->w_remote_GP.put = ch->remote_GP.put;
863 
864 		dev_dbg(xpc_chan, "w_remote_GP.put changed to %ld, partid=%d, "
865 			"channel=%d\n", ch->w_remote_GP.put, ch->partid,
866 			ch->number);
867 
868 		nmsgs_sent = ch->w_remote_GP.put - ch->w_local_GP.get;
869 		if (nmsgs_sent > 0) {
870 			dev_dbg(xpc_chan, "msgs waiting to be copied and "
871 				"delivered=%d, partid=%d, channel=%d\n",
872 				nmsgs_sent, ch->partid, ch->number);
873 
874 			if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE)
875 				xpc_activate_kthreads(ch, nmsgs_sent);
876 		}
877 	}
878 
879 	xpc_msgqueue_deref(ch);
880 }
881 
882 void
883 xpc_process_channel_activity(struct xpc_partition *part)
884 {
885 	unsigned long irq_flags;
886 	u64 IPI_amo, IPI_flags;
887 	struct xpc_channel *ch;
888 	int ch_number;
889 	u32 ch_flags;
890 
891 	IPI_amo = xpc_get_IPI_flags(part);
892 
893 	/*
894 	 * Initiate channel connections for registered channels.
895 	 *
896 	 * For each connected channel that has pending messages activate idle
897 	 * kthreads and/or create new kthreads as needed.
898 	 */
899 
900 	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
901 		ch = &part->channels[ch_number];
902 
903 		/*
904 		 * Process any open or close related IPI flags, and then deal
905 		 * with connecting or disconnecting the channel as required.
906 		 */
907 
908 		IPI_flags = XPC_GET_IPI_FLAGS(IPI_amo, ch_number);
909 
910 		if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags))
911 			xpc_process_openclose_IPI(part, ch_number, IPI_flags);
912 
913 		ch_flags = ch->flags;	/* need an atomic snapshot of flags */
914 
915 		if (ch_flags & XPC_C_DISCONNECTING) {
916 			spin_lock_irqsave(&ch->lock, irq_flags);
917 			xpc_process_disconnect(ch, &irq_flags);
918 			spin_unlock_irqrestore(&ch->lock, irq_flags);
919 			continue;
920 		}
921 
922 		if (part->act_state == XPC_P_DEACTIVATING)
923 			continue;
924 
925 		if (!(ch_flags & XPC_C_CONNECTED)) {
926 			if (!(ch_flags & XPC_C_OPENREQUEST)) {
927 				DBUG_ON(ch_flags & XPC_C_SETUP);
928 				(void)xpc_connect_channel(ch);
929 			} else {
930 				spin_lock_irqsave(&ch->lock, irq_flags);
931 				xpc_process_connect(ch, &irq_flags);
932 				spin_unlock_irqrestore(&ch->lock, irq_flags);
933 			}
934 			continue;
935 		}
936 
937 		/*
938 		 * Process any message related IPI flags, this may involve the
939 		 * activation of kthreads to deliver any pending messages sent
940 		 * from the other partition.
941 		 */
942 
943 		if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags))
944 			xpc_process_msg_IPI(part, ch_number);
945 	}
946 }
947 
948 /*
949  * XPC's heartbeat code calls this function to inform XPC that a partition is
950  * going down.  XPC responds by tearing down the XPartition Communication
951  * infrastructure used for the just downed partition.
952  *
953  * XPC's heartbeat code will never call this function and xpc_partition_up()
954  * at the same time. Nor will it ever make multiple calls to either function
955  * at the same time.
956  */
957 void
958 xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason)
959 {
960 	unsigned long irq_flags;
961 	int ch_number;
962 	struct xpc_channel *ch;
963 
964 	dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
965 		XPC_PARTID(part), reason);
966 
967 	if (!xpc_part_ref(part)) {
968 		/* infrastructure for this partition isn't currently set up */
969 		return;
970 	}
971 
972 	/* disconnect channels associated with the partition going down */
973 
974 	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
975 		ch = &part->channels[ch_number];
976 
977 		xpc_msgqueue_ref(ch);
978 		spin_lock_irqsave(&ch->lock, irq_flags);
979 
980 		XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
981 
982 		spin_unlock_irqrestore(&ch->lock, irq_flags);
983 		xpc_msgqueue_deref(ch);
984 	}
985 
986 	xpc_wakeup_channel_mgr(part);
987 
988 	xpc_part_deref(part);
989 }
990 
991 /*
992  * Called by XP at the time of channel connection registration to cause
993  * XPC to establish connections to all currently active partitions.
994  */
995 void
996 xpc_initiate_connect(int ch_number)
997 {
998 	short partid;
999 	struct xpc_partition *part;
1000 	struct xpc_channel *ch;
1001 
1002 	DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
1003 
1004 	for (partid = 0; partid < xp_max_npartitions; partid++) {
1005 		part = &xpc_partitions[partid];
1006 
1007 		if (xpc_part_ref(part)) {
1008 			ch = &part->channels[ch_number];
1009 
1010 			/*
1011 			 * Initiate the establishment of a connection on the
1012 			 * newly registered channel to the remote partition.
1013 			 */
1014 			xpc_wakeup_channel_mgr(part);
1015 			xpc_part_deref(part);
1016 		}
1017 	}
1018 }
1019 
1020 void
1021 xpc_connected_callout(struct xpc_channel *ch)
1022 {
1023 	/* let the registerer know that a connection has been established */
1024 
1025 	if (ch->func != NULL) {
1026 		dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, "
1027 			"partid=%d, channel=%d\n", ch->partid, ch->number);
1028 
1029 		ch->func(xpConnected, ch->partid, ch->number,
1030 			 (void *)(u64)ch->local_nentries, ch->key);
1031 
1032 		dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, "
1033 			"partid=%d, channel=%d\n", ch->partid, ch->number);
1034 	}
1035 }
1036 
1037 /*
1038  * Called by XP at the time of channel connection unregistration to cause
1039  * XPC to teardown all current connections for the specified channel.
1040  *
1041  * Before returning xpc_initiate_disconnect() will wait until all connections
1042  * on the specified channel have been closed/torndown. So the caller can be
1043  * assured that they will not be receiving any more callouts from XPC to the
1044  * function they registered via xpc_connect().
1045  *
1046  * Arguments:
1047  *
1048  *	ch_number - channel # to unregister.
1049  */
1050 void
1051 xpc_initiate_disconnect(int ch_number)
1052 {
1053 	unsigned long irq_flags;
1054 	short partid;
1055 	struct xpc_partition *part;
1056 	struct xpc_channel *ch;
1057 
1058 	DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
1059 
1060 	/* initiate the channel disconnect for every active partition */
1061 	for (partid = 0; partid < xp_max_npartitions; partid++) {
1062 		part = &xpc_partitions[partid];
1063 
1064 		if (xpc_part_ref(part)) {
1065 			ch = &part->channels[ch_number];
1066 			xpc_msgqueue_ref(ch);
1067 
1068 			spin_lock_irqsave(&ch->lock, irq_flags);
1069 
1070 			if (!(ch->flags & XPC_C_DISCONNECTED)) {
1071 				ch->flags |= XPC_C_WDISCONNECT;
1072 
1073 				XPC_DISCONNECT_CHANNEL(ch, xpUnregistering,
1074 						       &irq_flags);
1075 			}
1076 
1077 			spin_unlock_irqrestore(&ch->lock, irq_flags);
1078 
1079 			xpc_msgqueue_deref(ch);
1080 			xpc_part_deref(part);
1081 		}
1082 	}
1083 
1084 	xpc_disconnect_wait(ch_number);
1085 }
1086 
1087 /*
1088  * To disconnect a channel, and reflect it back to all who may be waiting.
1089  *
1090  * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
1091  * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
1092  * xpc_disconnect_wait().
1093  *
1094  * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
1095  */
1096 void
1097 xpc_disconnect_channel(const int line, struct xpc_channel *ch,
1098 		       enum xp_retval reason, unsigned long *irq_flags)
1099 {
1100 	u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
1101 
1102 	DBUG_ON(!spin_is_locked(&ch->lock));
1103 
1104 	if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
1105 		return;
1106 
1107 	DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));
1108 
1109 	dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
1110 		reason, line, ch->partid, ch->number);
1111 
1112 	XPC_SET_REASON(ch, reason, line);
1113 
1114 	ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
1115 	/* some of these may not have been set */
1116 	ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
1117 		       XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
1118 		       XPC_C_CONNECTING | XPC_C_CONNECTED);
1119 
1120 	xpc_IPI_send_closerequest(ch, irq_flags);
1121 
1122 	if (channel_was_connected)
1123 		ch->flags |= XPC_C_WASCONNECTED;
1124 
1125 	spin_unlock_irqrestore(&ch->lock, *irq_flags);
1126 
1127 	/* wake all idle kthreads so they can exit */
1128 	if (atomic_read(&ch->kthreads_idle) > 0) {
1129 		wake_up_all(&ch->idle_wq);
1130 
1131 	} else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
1132 		   !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
1133 		/* start a kthread that will do the xpDisconnecting callout */
1134 		xpc_create_kthreads(ch, 1, 1);
1135 	}
1136 
1137 	/* wake those waiting to allocate an entry from the local msg queue */
1138 	if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
1139 		wake_up(&ch->msg_allocate_wq);
1140 
1141 	spin_lock_irqsave(&ch->lock, *irq_flags);
1142 }
1143 
1144 void
1145 xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason)
1146 {
1147 	/*
1148 	 * Let the channel's registerer know that the channel is being
1149 	 * disconnected. We don't want to do this if the registerer was never
1150 	 * informed of a connection being made.
1151 	 */
1152 
1153 	if (ch->func != NULL) {
1154 		dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
1155 			"channel=%d\n", reason, ch->partid, ch->number);
1156 
1157 		ch->func(reason, ch->partid, ch->number, NULL, ch->key);
1158 
1159 		dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
1160 			"channel=%d\n", reason, ch->partid, ch->number);
1161 	}
1162 }
1163 
1164 /*
1165  * Wait for a message entry to become available for the specified channel,
1166  * but don't wait any longer than 1 jiffy.
1167  */
1168 static enum xp_retval
1169 xpc_allocate_msg_wait(struct xpc_channel *ch)
1170 {
1171 	enum xp_retval ret;
1172 
1173 	if (ch->flags & XPC_C_DISCONNECTING) {
1174 		DBUG_ON(ch->reason == xpInterrupted);
1175 		return ch->reason;
1176 	}
1177 
1178 	atomic_inc(&ch->n_on_msg_allocate_wq);
1179 	ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1);
1180 	atomic_dec(&ch->n_on_msg_allocate_wq);
1181 
1182 	if (ch->flags & XPC_C_DISCONNECTING) {
1183 		ret = ch->reason;
1184 		DBUG_ON(ch->reason == xpInterrupted);
1185 	} else if (ret == 0) {
1186 		ret = xpTimeout;
1187 	} else {
1188 		ret = xpInterrupted;
1189 	}
1190 
1191 	return ret;
1192 }
1193 
1194 /*
1195  * Allocate an entry for a message from the message queue associated with the
1196  * specified channel.
1197  */
1198 static enum xp_retval
1199 xpc_allocate_msg(struct xpc_channel *ch, u32 flags,
1200 		 struct xpc_msg **address_of_msg)
1201 {
1202 	struct xpc_msg *msg;
1203 	enum xp_retval ret;
1204 	s64 put;
1205 
1206 	/* this reference will be dropped in xpc_send_msg() */
1207 	xpc_msgqueue_ref(ch);
1208 
1209 	if (ch->flags & XPC_C_DISCONNECTING) {
1210 		xpc_msgqueue_deref(ch);
1211 		return ch->reason;
1212 	}
1213 	if (!(ch->flags & XPC_C_CONNECTED)) {
1214 		xpc_msgqueue_deref(ch);
1215 		return xpNotConnected;
1216 	}
1217 
1218 	/*
1219 	 * Get the next available message entry from the local message queue.
1220 	 * If none are available, we'll make sure that we grab the latest
1221 	 * GP values.
1222 	 */
1223 	ret = xpTimeout;
1224 
1225 	while (1) {
1226 
1227 		put = ch->w_local_GP.put;
1228 		rmb();	/* guarantee that .put loads before .get */
1229 		if (put - ch->w_remote_GP.get < ch->local_nentries) {
1230 
1231 			/* There are available message entries. We need to try
1232 			 * to secure one for ourselves. We'll do this by trying
1233 			 * to increment w_local_GP.put as long as someone else
1234 			 * doesn't beat us to it. If they do, we'll have to
1235 			 * try again.
1236 			 */
1237 			if (cmpxchg(&ch->w_local_GP.put, put, put + 1) == put) {
1238 				/* we got the entry referenced by put */
1239 				break;
1240 			}
1241 			continue;	/* try again */
1242 		}
1243 
1244 		/*
1245 		 * There aren't any available msg entries at this time.
1246 		 *
1247 		 * In waiting for a message entry to become available,
1248 		 * we set a timeout in case the other side is not
1249 		 * sending completion IPIs. This lets us fake an IPI
1250 		 * that will cause the IPI handler to fetch the latest
1251 		 * GP values as if an IPI was sent by the other side.
1252 		 */
1253 		if (ret == xpTimeout)
1254 			xpc_IPI_send_local_msgrequest(ch);
1255 
1256 		if (flags & XPC_NOWAIT) {
1257 			xpc_msgqueue_deref(ch);
1258 			return xpNoWait;
1259 		}
1260 
1261 		ret = xpc_allocate_msg_wait(ch);
1262 		if (ret != xpInterrupted && ret != xpTimeout) {
1263 			xpc_msgqueue_deref(ch);
1264 			return ret;
1265 		}
1266 	}
1267 
1268 	/* get the message's address and initialize it */
1269 	msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
1270 				 (put % ch->local_nentries) * ch->msg_size);
1271 
1272 	DBUG_ON(msg->flags != 0);
1273 	msg->number = put;
1274 
1275 	dev_dbg(xpc_chan, "w_local_GP.put changed to %ld; msg=0x%p, "
1276 		"msg_number=%ld, partid=%d, channel=%d\n", put + 1,
1277 		(void *)msg, msg->number, ch->partid, ch->number);
1278 
1279 	*address_of_msg = msg;
1280 
1281 	return xpSuccess;
1282 }
1283 
1284 /*
1285  * Allocate an entry for a message from the message queue associated with the
1286  * specified channel. NOTE that this routine can sleep waiting for a message
1287  * entry to become available. To not sleep, pass in the XPC_NOWAIT flag.
1288  *
1289  * Arguments:
1290  *
1291  *	partid - ID of partition to which the channel is connected.
1292  *	ch_number - channel #.
1293  *	flags - see xpc.h for valid flags.
1294  *	payload - address of the allocated payload area pointer (filled in on
1295  * 	          return) in which the user-defined message is constructed.
1296  */
1297 enum xp_retval
1298 xpc_initiate_allocate(short partid, int ch_number, u32 flags, void **payload)
1299 {
1300 	struct xpc_partition *part = &xpc_partitions[partid];
1301 	enum xp_retval ret = xpUnknownReason;
1302 	struct xpc_msg *msg = NULL;
1303 
1304 	DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1305 	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1306 
1307 	*payload = NULL;
1308 
1309 	if (xpc_part_ref(part)) {
1310 		ret = xpc_allocate_msg(&part->channels[ch_number], flags, &msg);
1311 		xpc_part_deref(part);
1312 
1313 		if (msg != NULL)
1314 			*payload = &msg->payload;
1315 	}
1316 
1317 	return ret;
1318 }
1319 
1320 /*
1321  * Now we actually send the messages that are ready to be sent by advancing
1322  * the local message queue's Put value and then send an IPI to the recipient
1323  * partition.
1324  */
1325 static void
1326 xpc_send_msgs(struct xpc_channel *ch, s64 initial_put)
1327 {
1328 	struct xpc_msg *msg;
1329 	s64 put = initial_put + 1;
1330 	int send_IPI = 0;
1331 
1332 	while (1) {
1333 
1334 		while (1) {
1335 			if (put == ch->w_local_GP.put)
1336 				break;
1337 
1338 			msg = (struct xpc_msg *)((u64)ch->local_msgqueue +
1339 						 (put % ch->local_nentries) *
1340 						 ch->msg_size);
1341 
1342 			if (!(msg->flags & XPC_M_READY))
1343 				break;
1344 
1345 			put++;
1346 		}
1347 
1348 		if (put == initial_put) {
1349 			/* nothing's changed */
1350 			break;
1351 		}
1352 
1353 		if (cmpxchg_rel(&ch->local_GP->put, initial_put, put) !=
1354 		    initial_put) {
1355 			/* someone else beat us to it */
1356 			DBUG_ON(ch->local_GP->put < initial_put);
1357 			break;
1358 		}
1359 
1360 		/* we just set the new value of local_GP->put */
1361 
1362 		dev_dbg(xpc_chan, "local_GP->put changed to %ld, partid=%d, "
1363 			"channel=%d\n", put, ch->partid, ch->number);
1364 
1365 		send_IPI = 1;
1366 
1367 		/*
1368 		 * We need to ensure that the message referenced by
1369 		 * local_GP->put is not XPC_M_READY or that local_GP->put
1370 		 * equals w_local_GP.put, so we'll go have a look.
1371 		 */
1372 		initial_put = put;
1373 	}
1374 
1375 	if (send_IPI)
1376 		xpc_IPI_send_msgrequest(ch);
1377 }
1378 
1379 /*
1380  * Common code that does the actual sending of the message by advancing the
1381  * local message queue's Put value and sends an IPI to the partition the
1382  * message is being sent to.
1383  */
1384 static enum xp_retval
1385 xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type,
1386 	     xpc_notify_func func, void *key)
1387 {
1388 	enum xp_retval ret = xpSuccess;
1389 	struct xpc_notify *notify = notify;
1390 	s64 put, msg_number = msg->number;
1391 
1392 	DBUG_ON(notify_type == XPC_N_CALL && func == NULL);
1393 	DBUG_ON((((u64)msg - (u64)ch->local_msgqueue) / ch->msg_size) !=
1394 		msg_number % ch->local_nentries);
1395 	DBUG_ON(msg->flags & XPC_M_READY);
1396 
1397 	if (ch->flags & XPC_C_DISCONNECTING) {
1398 		/* drop the reference grabbed in xpc_allocate_msg() */
1399 		xpc_msgqueue_deref(ch);
1400 		return ch->reason;
1401 	}
1402 
1403 	if (notify_type != 0) {
1404 		/*
1405 		 * Tell the remote side to send an ACK interrupt when the
1406 		 * message has been delivered.
1407 		 */
1408 		msg->flags |= XPC_M_INTERRUPT;
1409 
1410 		atomic_inc(&ch->n_to_notify);
1411 
1412 		notify = &ch->notify_queue[msg_number % ch->local_nentries];
1413 		notify->func = func;
1414 		notify->key = key;
1415 		notify->type = notify_type;
1416 
1417 		/* >>> is a mb() needed here? */
1418 
1419 		if (ch->flags & XPC_C_DISCONNECTING) {
1420 			/*
1421 			 * An error occurred between our last error check and
1422 			 * this one. We will try to clear the type field from
1423 			 * the notify entry. If we succeed then
1424 			 * xpc_disconnect_channel() didn't already process
1425 			 * the notify entry.
1426 			 */
1427 			if (cmpxchg(&notify->type, notify_type, 0) ==
1428 			    notify_type) {
1429 				atomic_dec(&ch->n_to_notify);
1430 				ret = ch->reason;
1431 			}
1432 
1433 			/* drop the reference grabbed in xpc_allocate_msg() */
1434 			xpc_msgqueue_deref(ch);
1435 			return ret;
1436 		}
1437 	}
1438 
1439 	msg->flags |= XPC_M_READY;
1440 
1441 	/*
1442 	 * The preceding store of msg->flags must occur before the following
1443 	 * load of ch->local_GP->put.
1444 	 */
1445 	mb();
1446 
1447 	/* see if the message is next in line to be sent, if so send it */
1448 
1449 	put = ch->local_GP->put;
1450 	if (put == msg_number)
1451 		xpc_send_msgs(ch, put);
1452 
1453 	/* drop the reference grabbed in xpc_allocate_msg() */
1454 	xpc_msgqueue_deref(ch);
1455 	return ret;
1456 }
1457 
1458 /*
1459  * Send a message previously allocated using xpc_initiate_allocate() on the
1460  * specified channel connected to the specified partition.
1461  *
1462  * This routine will not wait for the message to be received, nor will
1463  * notification be given when it does happen. Once this routine has returned
1464  * the message entry allocated via xpc_initiate_allocate() is no longer
1465  * accessable to the caller.
1466  *
1467  * This routine, although called by users, does not call xpc_part_ref() to
1468  * ensure that the partition infrastructure is in place. It relies on the
1469  * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
1470  *
1471  * Arguments:
1472  *
1473  *	partid - ID of partition to which the channel is connected.
1474  *	ch_number - channel # to send message on.
1475  *	payload - pointer to the payload area allocated via
1476  *			xpc_initiate_allocate().
1477  */
1478 enum xp_retval
1479 xpc_initiate_send(short partid, int ch_number, void *payload)
1480 {
1481 	struct xpc_partition *part = &xpc_partitions[partid];
1482 	struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1483 	enum xp_retval ret;
1484 
1485 	dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg,
1486 		partid, ch_number);
1487 
1488 	DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1489 	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1490 	DBUG_ON(msg == NULL);
1491 
1492 	ret = xpc_send_msg(&part->channels[ch_number], msg, 0, NULL, NULL);
1493 
1494 	return ret;
1495 }
1496 
1497 /*
1498  * Send a message previously allocated using xpc_initiate_allocate on the
1499  * specified channel connected to the specified partition.
1500  *
1501  * This routine will not wait for the message to be sent. Once this routine
1502  * has returned the message entry allocated via xpc_initiate_allocate() is no
1503  * longer accessable to the caller.
1504  *
1505  * Once the remote end of the channel has received the message, the function
1506  * passed as an argument to xpc_initiate_send_notify() will be called. This
1507  * allows the sender to free up or re-use any buffers referenced by the
1508  * message, but does NOT mean the message has been processed at the remote
1509  * end by a receiver.
1510  *
1511  * If this routine returns an error, the caller's function will NOT be called.
1512  *
1513  * This routine, although called by users, does not call xpc_part_ref() to
1514  * ensure that the partition infrastructure is in place. It relies on the
1515  * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
1516  *
1517  * Arguments:
1518  *
1519  *	partid - ID of partition to which the channel is connected.
1520  *	ch_number - channel # to send message on.
1521  *	payload - pointer to the payload area allocated via
1522  *			xpc_initiate_allocate().
1523  *	func - function to call with asynchronous notification of message
1524  *		  receipt. THIS FUNCTION MUST BE NON-BLOCKING.
1525  *	key - user-defined key to be passed to the function when it's called.
1526  */
1527 enum xp_retval
1528 xpc_initiate_send_notify(short partid, int ch_number, void *payload,
1529 			 xpc_notify_func func, void *key)
1530 {
1531 	struct xpc_partition *part = &xpc_partitions[partid];
1532 	struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1533 	enum xp_retval ret;
1534 
1535 	dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg,
1536 		partid, ch_number);
1537 
1538 	DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1539 	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1540 	DBUG_ON(msg == NULL);
1541 	DBUG_ON(func == NULL);
1542 
1543 	ret = xpc_send_msg(&part->channels[ch_number], msg, XPC_N_CALL,
1544 			   func, key);
1545 	return ret;
1546 }
1547 
1548 /*
1549  * Deliver a message to its intended recipient.
1550  */
1551 void
1552 xpc_deliver_msg(struct xpc_channel *ch)
1553 {
1554 	struct xpc_msg *msg;
1555 
1556 	msg = xpc_get_deliverable_msg(ch);
1557 	if (msg != NULL) {
1558 
1559 		/*
1560 		 * This ref is taken to protect the payload itself from being
1561 		 * freed before the user is finished with it, which the user
1562 		 * indicates by calling xpc_initiate_received().
1563 		 */
1564 		xpc_msgqueue_ref(ch);
1565 
1566 		atomic_inc(&ch->kthreads_active);
1567 
1568 		if (ch->func != NULL) {
1569 			dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, "
1570 				"msg_number=%ld, partid=%d, channel=%d\n",
1571 				(void *)msg, msg->number, ch->partid,
1572 				ch->number);
1573 
1574 			/* deliver the message to its intended recipient */
1575 			ch->func(xpMsgReceived, ch->partid, ch->number,
1576 				 &msg->payload, ch->key);
1577 
1578 			dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, "
1579 				"msg_number=%ld, partid=%d, channel=%d\n",
1580 				(void *)msg, msg->number, ch->partid,
1581 				ch->number);
1582 		}
1583 
1584 		atomic_dec(&ch->kthreads_active);
1585 	}
1586 }
1587 
1588 /*
1589  * Now we actually acknowledge the messages that have been delivered and ack'd
1590  * by advancing the cached remote message queue's Get value and if requested
1591  * send an IPI to the message sender's partition.
1592  */
1593 static void
1594 xpc_acknowledge_msgs(struct xpc_channel *ch, s64 initial_get, u8 msg_flags)
1595 {
1596 	struct xpc_msg *msg;
1597 	s64 get = initial_get + 1;
1598 	int send_IPI = 0;
1599 
1600 	while (1) {
1601 
1602 		while (1) {
1603 			if (get == ch->w_local_GP.get)
1604 				break;
1605 
1606 			msg = (struct xpc_msg *)((u64)ch->remote_msgqueue +
1607 						 (get % ch->remote_nentries) *
1608 						 ch->msg_size);
1609 
1610 			if (!(msg->flags & XPC_M_DONE))
1611 				break;
1612 
1613 			msg_flags |= msg->flags;
1614 			get++;
1615 		}
1616 
1617 		if (get == initial_get) {
1618 			/* nothing's changed */
1619 			break;
1620 		}
1621 
1622 		if (cmpxchg_rel(&ch->local_GP->get, initial_get, get) !=
1623 		    initial_get) {
1624 			/* someone else beat us to it */
1625 			DBUG_ON(ch->local_GP->get <= initial_get);
1626 			break;
1627 		}
1628 
1629 		/* we just set the new value of local_GP->get */
1630 
1631 		dev_dbg(xpc_chan, "local_GP->get changed to %ld, partid=%d, "
1632 			"channel=%d\n", get, ch->partid, ch->number);
1633 
1634 		send_IPI = (msg_flags & XPC_M_INTERRUPT);
1635 
1636 		/*
1637 		 * We need to ensure that the message referenced by
1638 		 * local_GP->get is not XPC_M_DONE or that local_GP->get
1639 		 * equals w_local_GP.get, so we'll go have a look.
1640 		 */
1641 		initial_get = get;
1642 	}
1643 
1644 	if (send_IPI)
1645 		xpc_IPI_send_msgrequest(ch);
1646 }
1647 
1648 /*
1649  * Acknowledge receipt of a delivered message.
1650  *
1651  * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
1652  * that sent the message.
1653  *
1654  * This function, although called by users, does not call xpc_part_ref() to
1655  * ensure that the partition infrastructure is in place. It relies on the
1656  * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
1657  *
1658  * Arguments:
1659  *
1660  *	partid - ID of partition to which the channel is connected.
1661  *	ch_number - channel # message received on.
1662  *	payload - pointer to the payload area allocated via
1663  *			xpc_initiate_allocate().
1664  */
1665 void
1666 xpc_initiate_received(short partid, int ch_number, void *payload)
1667 {
1668 	struct xpc_partition *part = &xpc_partitions[partid];
1669 	struct xpc_channel *ch;
1670 	struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
1671 	s64 get, msg_number = msg->number;
1672 
1673 	DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
1674 	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1675 
1676 	ch = &part->channels[ch_number];
1677 
1678 	dev_dbg(xpc_chan, "msg=0x%p, msg_number=%ld, partid=%d, channel=%d\n",
1679 		(void *)msg, msg_number, ch->partid, ch->number);
1680 
1681 	DBUG_ON((((u64)msg - (u64)ch->remote_msgqueue) / ch->msg_size) !=
1682 		msg_number % ch->remote_nentries);
1683 	DBUG_ON(msg->flags & XPC_M_DONE);
1684 
1685 	msg->flags |= XPC_M_DONE;
1686 
1687 	/*
1688 	 * The preceding store of msg->flags must occur before the following
1689 	 * load of ch->local_GP->get.
1690 	 */
1691 	mb();
1692 
1693 	/*
1694 	 * See if this message is next in line to be acknowledged as having
1695 	 * been delivered.
1696 	 */
1697 	get = ch->local_GP->get;
1698 	if (get == msg_number)
1699 		xpc_acknowledge_msgs(ch, get, msg->flags);
1700 
1701 	/* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg()  */
1702 	xpc_msgqueue_deref(ch);
1703 }
1704