1 /* 2 * This file is subject to the terms and conditions of the GNU General Public 3 * License. See the file "COPYING" in the main directory of this archive 4 * for more details. 5 * 6 * Copyright (c) 2004-2008 Silicon Graphics, Inc. All Rights Reserved. 7 */ 8 9 /* 10 * Cross Partition Communication (XPC) channel support. 11 * 12 * This is the part of XPC that manages the channels and 13 * sends/receives messages across them to/from other partitions. 14 * 15 */ 16 17 #include <linux/kernel.h> 18 #include <linux/init.h> 19 #include <linux/sched.h> 20 #include <linux/cache.h> 21 #include <linux/interrupt.h> 22 #include <linux/mutex.h> 23 #include <linux/completion.h> 24 #include <asm/sn/sn_sal.h> 25 #include "xpc.h" 26 27 /* 28 * Guarantee that the kzalloc'd memory is cacheline aligned. 29 */ 30 void * 31 xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base) 32 { 33 /* see if kzalloc will give us cachline aligned memory by default */ 34 *base = kzalloc(size, flags); 35 if (*base == NULL) 36 return NULL; 37 38 if ((u64)*base == L1_CACHE_ALIGN((u64)*base)) 39 return *base; 40 41 kfree(*base); 42 43 /* nope, we'll have to do it ourselves */ 44 *base = kzalloc(size + L1_CACHE_BYTES, flags); 45 if (*base == NULL) 46 return NULL; 47 48 return (void *)L1_CACHE_ALIGN((u64)*base); 49 } 50 51 /* 52 * Allocate the local message queue and the notify queue. 53 */ 54 static enum xp_retval 55 xpc_allocate_local_msgqueue(struct xpc_channel *ch) 56 { 57 unsigned long irq_flags; 58 int nentries; 59 size_t nbytes; 60 61 for (nentries = ch->local_nentries; nentries > 0; nentries--) { 62 63 nbytes = nentries * ch->msg_size; 64 ch->local_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes, 65 GFP_KERNEL, 66 &ch->local_msgqueue_base); 67 if (ch->local_msgqueue == NULL) 68 continue; 69 70 nbytes = nentries * sizeof(struct xpc_notify); 71 ch->notify_queue = kzalloc(nbytes, GFP_KERNEL); 72 if (ch->notify_queue == NULL) { 73 kfree(ch->local_msgqueue_base); 74 ch->local_msgqueue = NULL; 75 continue; 76 } 77 78 spin_lock_irqsave(&ch->lock, irq_flags); 79 if (nentries < ch->local_nentries) { 80 dev_dbg(xpc_chan, "nentries=%d local_nentries=%d, " 81 "partid=%d, channel=%d\n", nentries, 82 ch->local_nentries, ch->partid, ch->number); 83 84 ch->local_nentries = nentries; 85 } 86 spin_unlock_irqrestore(&ch->lock, irq_flags); 87 return xpSuccess; 88 } 89 90 dev_dbg(xpc_chan, "can't get memory for local message queue and notify " 91 "queue, partid=%d, channel=%d\n", ch->partid, ch->number); 92 return xpNoMemory; 93 } 94 95 /* 96 * Allocate the cached remote message queue. 97 */ 98 static enum xp_retval 99 xpc_allocate_remote_msgqueue(struct xpc_channel *ch) 100 { 101 unsigned long irq_flags; 102 int nentries; 103 size_t nbytes; 104 105 DBUG_ON(ch->remote_nentries <= 0); 106 107 for (nentries = ch->remote_nentries; nentries > 0; nentries--) { 108 109 nbytes = nentries * ch->msg_size; 110 ch->remote_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes, 111 GFP_KERNEL, 112 &ch->remote_msgqueue_base); 113 if (ch->remote_msgqueue == NULL) 114 continue; 115 116 spin_lock_irqsave(&ch->lock, irq_flags); 117 if (nentries < ch->remote_nentries) { 118 dev_dbg(xpc_chan, "nentries=%d remote_nentries=%d, " 119 "partid=%d, channel=%d\n", nentries, 120 ch->remote_nentries, ch->partid, ch->number); 121 122 ch->remote_nentries = nentries; 123 } 124 spin_unlock_irqrestore(&ch->lock, irq_flags); 125 return xpSuccess; 126 } 127 128 dev_dbg(xpc_chan, "can't get memory for cached remote message queue, " 129 "partid=%d, channel=%d\n", ch->partid, ch->number); 130 return xpNoMemory; 131 } 132 133 /* 134 * Allocate message queues and other stuff associated with a channel. 135 * 136 * Note: Assumes all of the channel sizes are filled in. 137 */ 138 static enum xp_retval 139 xpc_allocate_msgqueues(struct xpc_channel *ch) 140 { 141 unsigned long irq_flags; 142 enum xp_retval ret; 143 144 DBUG_ON(ch->flags & XPC_C_SETUP); 145 146 ret = xpc_allocate_local_msgqueue(ch); 147 if (ret != xpSuccess) 148 return ret; 149 150 ret = xpc_allocate_remote_msgqueue(ch); 151 if (ret != xpSuccess) { 152 kfree(ch->local_msgqueue_base); 153 ch->local_msgqueue = NULL; 154 kfree(ch->notify_queue); 155 ch->notify_queue = NULL; 156 return ret; 157 } 158 159 spin_lock_irqsave(&ch->lock, irq_flags); 160 ch->flags |= XPC_C_SETUP; 161 spin_unlock_irqrestore(&ch->lock, irq_flags); 162 163 return xpSuccess; 164 } 165 166 /* 167 * Process a connect message from a remote partition. 168 * 169 * Note: xpc_process_connect() is expecting to be called with the 170 * spin_lock_irqsave held and will leave it locked upon return. 171 */ 172 static void 173 xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags) 174 { 175 enum xp_retval ret; 176 177 DBUG_ON(!spin_is_locked(&ch->lock)); 178 179 if (!(ch->flags & XPC_C_OPENREQUEST) || 180 !(ch->flags & XPC_C_ROPENREQUEST)) { 181 /* nothing more to do for now */ 182 return; 183 } 184 DBUG_ON(!(ch->flags & XPC_C_CONNECTING)); 185 186 if (!(ch->flags & XPC_C_SETUP)) { 187 spin_unlock_irqrestore(&ch->lock, *irq_flags); 188 ret = xpc_allocate_msgqueues(ch); 189 spin_lock_irqsave(&ch->lock, *irq_flags); 190 191 if (ret != xpSuccess) 192 XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags); 193 194 if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING)) 195 return; 196 197 DBUG_ON(!(ch->flags & XPC_C_SETUP)); 198 DBUG_ON(ch->local_msgqueue == NULL); 199 DBUG_ON(ch->remote_msgqueue == NULL); 200 } 201 202 if (!(ch->flags & XPC_C_OPENREPLY)) { 203 ch->flags |= XPC_C_OPENREPLY; 204 xpc_IPI_send_openreply(ch, irq_flags); 205 } 206 207 if (!(ch->flags & XPC_C_ROPENREPLY)) 208 return; 209 210 DBUG_ON(ch->remote_msgqueue_pa == 0); 211 212 ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP); /* clear all else */ 213 214 dev_info(xpc_chan, "channel %d to partition %d connected\n", 215 ch->number, ch->partid); 216 217 spin_unlock_irqrestore(&ch->lock, *irq_flags); 218 xpc_create_kthreads(ch, 1, 0); 219 spin_lock_irqsave(&ch->lock, *irq_flags); 220 } 221 222 /* 223 * Notify those who wanted to be notified upon delivery of their message. 224 */ 225 static void 226 xpc_notify_senders(struct xpc_channel *ch, enum xp_retval reason, s64 put) 227 { 228 struct xpc_notify *notify; 229 u8 notify_type; 230 s64 get = ch->w_remote_GP.get - 1; 231 232 while (++get < put && atomic_read(&ch->n_to_notify) > 0) { 233 234 notify = &ch->notify_queue[get % ch->local_nentries]; 235 236 /* 237 * See if the notify entry indicates it was associated with 238 * a message who's sender wants to be notified. It is possible 239 * that it is, but someone else is doing or has done the 240 * notification. 241 */ 242 notify_type = notify->type; 243 if (notify_type == 0 || 244 cmpxchg(¬ify->type, notify_type, 0) != notify_type) { 245 continue; 246 } 247 248 DBUG_ON(notify_type != XPC_N_CALL); 249 250 atomic_dec(&ch->n_to_notify); 251 252 if (notify->func != NULL) { 253 dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, " 254 "msg_number=%ld, partid=%d, channel=%d\n", 255 (void *)notify, get, ch->partid, ch->number); 256 257 notify->func(reason, ch->partid, ch->number, 258 notify->key); 259 260 dev_dbg(xpc_chan, "notify->func() returned, " 261 "notify=0x%p, msg_number=%ld, partid=%d, " 262 "channel=%d\n", (void *)notify, get, 263 ch->partid, ch->number); 264 } 265 } 266 } 267 268 /* 269 * Free up message queues and other stuff that were allocated for the specified 270 * channel. 271 * 272 * Note: ch->reason and ch->reason_line are left set for debugging purposes, 273 * they're cleared when XPC_C_DISCONNECTED is cleared. 274 */ 275 static void 276 xpc_free_msgqueues(struct xpc_channel *ch) 277 { 278 DBUG_ON(!spin_is_locked(&ch->lock)); 279 DBUG_ON(atomic_read(&ch->n_to_notify) != 0); 280 281 ch->remote_msgqueue_pa = 0; 282 ch->func = NULL; 283 ch->key = NULL; 284 ch->msg_size = 0; 285 ch->local_nentries = 0; 286 ch->remote_nentries = 0; 287 ch->kthreads_assigned_limit = 0; 288 ch->kthreads_idle_limit = 0; 289 290 ch->local_GP->get = 0; 291 ch->local_GP->put = 0; 292 ch->remote_GP.get = 0; 293 ch->remote_GP.put = 0; 294 ch->w_local_GP.get = 0; 295 ch->w_local_GP.put = 0; 296 ch->w_remote_GP.get = 0; 297 ch->w_remote_GP.put = 0; 298 ch->next_msg_to_pull = 0; 299 300 if (ch->flags & XPC_C_SETUP) { 301 ch->flags &= ~XPC_C_SETUP; 302 303 dev_dbg(xpc_chan, "ch->flags=0x%x, partid=%d, channel=%d\n", 304 ch->flags, ch->partid, ch->number); 305 306 kfree(ch->local_msgqueue_base); 307 ch->local_msgqueue = NULL; 308 kfree(ch->remote_msgqueue_base); 309 ch->remote_msgqueue = NULL; 310 kfree(ch->notify_queue); 311 ch->notify_queue = NULL; 312 } 313 } 314 315 /* 316 * spin_lock_irqsave() is expected to be held on entry. 317 */ 318 static void 319 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags) 320 { 321 struct xpc_partition *part = &xpc_partitions[ch->partid]; 322 u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED); 323 324 DBUG_ON(!spin_is_locked(&ch->lock)); 325 326 if (!(ch->flags & XPC_C_DISCONNECTING)) 327 return; 328 329 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST)); 330 331 /* make sure all activity has settled down first */ 332 333 if (atomic_read(&ch->kthreads_assigned) > 0 || 334 atomic_read(&ch->references) > 0) { 335 return; 336 } 337 DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) && 338 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE)); 339 340 if (part->act_state == XPC_P_DEACTIVATING) { 341 /* can't proceed until the other side disengages from us */ 342 if (xpc_partition_engaged(1UL << ch->partid)) 343 return; 344 345 } else { 346 347 /* as long as the other side is up do the full protocol */ 348 349 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) 350 return; 351 352 if (!(ch->flags & XPC_C_CLOSEREPLY)) { 353 ch->flags |= XPC_C_CLOSEREPLY; 354 xpc_IPI_send_closereply(ch, irq_flags); 355 } 356 357 if (!(ch->flags & XPC_C_RCLOSEREPLY)) 358 return; 359 } 360 361 /* wake those waiting for notify completion */ 362 if (atomic_read(&ch->n_to_notify) > 0) { 363 /* >>> we do callout while holding ch->lock */ 364 xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put); 365 } 366 367 /* both sides are disconnected now */ 368 369 if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) { 370 spin_unlock_irqrestore(&ch->lock, *irq_flags); 371 xpc_disconnect_callout(ch, xpDisconnected); 372 spin_lock_irqsave(&ch->lock, *irq_flags); 373 } 374 375 /* it's now safe to free the channel's message queues */ 376 xpc_free_msgqueues(ch); 377 378 /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */ 379 ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT)); 380 381 atomic_dec(&part->nchannels_active); 382 383 if (channel_was_connected) { 384 dev_info(xpc_chan, "channel %d to partition %d disconnected, " 385 "reason=%d\n", ch->number, ch->partid, ch->reason); 386 } 387 388 if (ch->flags & XPC_C_WDISCONNECT) { 389 /* we won't lose the CPU since we're holding ch->lock */ 390 complete(&ch->wdisconnect_wait); 391 } else if (ch->delayed_IPI_flags) { 392 if (part->act_state != XPC_P_DEACTIVATING) { 393 /* time to take action on any delayed IPI flags */ 394 spin_lock(&part->IPI_lock); 395 XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number, 396 ch->delayed_IPI_flags); 397 spin_unlock(&part->IPI_lock); 398 } 399 ch->delayed_IPI_flags = 0; 400 } 401 } 402 403 /* 404 * Process a change in the channel's remote connection state. 405 */ 406 static void 407 xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, 408 u8 IPI_flags) 409 { 410 unsigned long irq_flags; 411 struct xpc_openclose_args *args = 412 &part->remote_openclose_args[ch_number]; 413 struct xpc_channel *ch = &part->channels[ch_number]; 414 enum xp_retval reason; 415 416 spin_lock_irqsave(&ch->lock, irq_flags); 417 418 again: 419 420 if ((ch->flags & XPC_C_DISCONNECTED) && 421 (ch->flags & XPC_C_WDISCONNECT)) { 422 /* 423 * Delay processing IPI flags until thread waiting disconnect 424 * has had a chance to see that the channel is disconnected. 425 */ 426 ch->delayed_IPI_flags |= IPI_flags; 427 spin_unlock_irqrestore(&ch->lock, irq_flags); 428 return; 429 } 430 431 if (IPI_flags & XPC_IPI_CLOSEREQUEST) { 432 433 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREQUEST (reason=%d) received " 434 "from partid=%d, channel=%d\n", args->reason, 435 ch->partid, ch->number); 436 437 /* 438 * If RCLOSEREQUEST is set, we're probably waiting for 439 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed 440 * with this RCLOSEREQUEST in the IPI_flags. 441 */ 442 443 if (ch->flags & XPC_C_RCLOSEREQUEST) { 444 DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING)); 445 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST)); 446 DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY)); 447 DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY); 448 449 DBUG_ON(!(IPI_flags & XPC_IPI_CLOSEREPLY)); 450 IPI_flags &= ~XPC_IPI_CLOSEREPLY; 451 ch->flags |= XPC_C_RCLOSEREPLY; 452 453 /* both sides have finished disconnecting */ 454 xpc_process_disconnect(ch, &irq_flags); 455 DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED)); 456 goto again; 457 } 458 459 if (ch->flags & XPC_C_DISCONNECTED) { 460 if (!(IPI_flags & XPC_IPI_OPENREQUEST)) { 461 if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, 462 ch_number) & 463 XPC_IPI_OPENREQUEST)) { 464 465 DBUG_ON(ch->delayed_IPI_flags != 0); 466 spin_lock(&part->IPI_lock); 467 XPC_SET_IPI_FLAGS(part->local_IPI_amo, 468 ch_number, 469 XPC_IPI_CLOSEREQUEST); 470 spin_unlock(&part->IPI_lock); 471 } 472 spin_unlock_irqrestore(&ch->lock, irq_flags); 473 return; 474 } 475 476 XPC_SET_REASON(ch, 0, 0); 477 ch->flags &= ~XPC_C_DISCONNECTED; 478 479 atomic_inc(&part->nchannels_active); 480 ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST); 481 } 482 483 IPI_flags &= ~(XPC_IPI_OPENREQUEST | XPC_IPI_OPENREPLY); 484 485 /* 486 * The meaningful CLOSEREQUEST connection state fields are: 487 * reason = reason connection is to be closed 488 */ 489 490 ch->flags |= XPC_C_RCLOSEREQUEST; 491 492 if (!(ch->flags & XPC_C_DISCONNECTING)) { 493 reason = args->reason; 494 if (reason <= xpSuccess || reason > xpUnknownReason) 495 reason = xpUnknownReason; 496 else if (reason == xpUnregistering) 497 reason = xpOtherUnregistering; 498 499 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags); 500 501 DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY); 502 spin_unlock_irqrestore(&ch->lock, irq_flags); 503 return; 504 } 505 506 xpc_process_disconnect(ch, &irq_flags); 507 } 508 509 if (IPI_flags & XPC_IPI_CLOSEREPLY) { 510 511 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREPLY received from partid=%d," 512 " channel=%d\n", ch->partid, ch->number); 513 514 if (ch->flags & XPC_C_DISCONNECTED) { 515 DBUG_ON(part->act_state != XPC_P_DEACTIVATING); 516 spin_unlock_irqrestore(&ch->lock, irq_flags); 517 return; 518 } 519 520 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST)); 521 522 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) { 523 if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number) 524 & XPC_IPI_CLOSEREQUEST)) { 525 526 DBUG_ON(ch->delayed_IPI_flags != 0); 527 spin_lock(&part->IPI_lock); 528 XPC_SET_IPI_FLAGS(part->local_IPI_amo, 529 ch_number, 530 XPC_IPI_CLOSEREPLY); 531 spin_unlock(&part->IPI_lock); 532 } 533 spin_unlock_irqrestore(&ch->lock, irq_flags); 534 return; 535 } 536 537 ch->flags |= XPC_C_RCLOSEREPLY; 538 539 if (ch->flags & XPC_C_CLOSEREPLY) { 540 /* both sides have finished disconnecting */ 541 xpc_process_disconnect(ch, &irq_flags); 542 } 543 } 544 545 if (IPI_flags & XPC_IPI_OPENREQUEST) { 546 547 dev_dbg(xpc_chan, "XPC_IPI_OPENREQUEST (msg_size=%d, " 548 "local_nentries=%d) received from partid=%d, " 549 "channel=%d\n", args->msg_size, args->local_nentries, 550 ch->partid, ch->number); 551 552 if (part->act_state == XPC_P_DEACTIVATING || 553 (ch->flags & XPC_C_ROPENREQUEST)) { 554 spin_unlock_irqrestore(&ch->lock, irq_flags); 555 return; 556 } 557 558 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) { 559 ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST; 560 spin_unlock_irqrestore(&ch->lock, irq_flags); 561 return; 562 } 563 DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED | 564 XPC_C_OPENREQUEST))); 565 DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY | 566 XPC_C_OPENREPLY | XPC_C_CONNECTED)); 567 568 /* 569 * The meaningful OPENREQUEST connection state fields are: 570 * msg_size = size of channel's messages in bytes 571 * local_nentries = remote partition's local_nentries 572 */ 573 if (args->msg_size == 0 || args->local_nentries == 0) { 574 /* assume OPENREQUEST was delayed by mistake */ 575 spin_unlock_irqrestore(&ch->lock, irq_flags); 576 return; 577 } 578 579 ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING); 580 ch->remote_nentries = args->local_nentries; 581 582 if (ch->flags & XPC_C_OPENREQUEST) { 583 if (args->msg_size != ch->msg_size) { 584 XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes, 585 &irq_flags); 586 spin_unlock_irqrestore(&ch->lock, irq_flags); 587 return; 588 } 589 } else { 590 ch->msg_size = args->msg_size; 591 592 XPC_SET_REASON(ch, 0, 0); 593 ch->flags &= ~XPC_C_DISCONNECTED; 594 595 atomic_inc(&part->nchannels_active); 596 } 597 598 xpc_process_connect(ch, &irq_flags); 599 } 600 601 if (IPI_flags & XPC_IPI_OPENREPLY) { 602 603 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, " 604 "local_nentries=%d, remote_nentries=%d) received from " 605 "partid=%d, channel=%d\n", args->local_msgqueue_pa, 606 args->local_nentries, args->remote_nentries, 607 ch->partid, ch->number); 608 609 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) { 610 spin_unlock_irqrestore(&ch->lock, irq_flags); 611 return; 612 } 613 if (!(ch->flags & XPC_C_OPENREQUEST)) { 614 XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError, 615 &irq_flags); 616 spin_unlock_irqrestore(&ch->lock, irq_flags); 617 return; 618 } 619 620 DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST)); 621 DBUG_ON(ch->flags & XPC_C_CONNECTED); 622 623 /* 624 * The meaningful OPENREPLY connection state fields are: 625 * local_msgqueue_pa = physical address of remote 626 * partition's local_msgqueue 627 * local_nentries = remote partition's local_nentries 628 * remote_nentries = remote partition's remote_nentries 629 */ 630 DBUG_ON(args->local_msgqueue_pa == 0); 631 DBUG_ON(args->local_nentries == 0); 632 DBUG_ON(args->remote_nentries == 0); 633 634 ch->flags |= XPC_C_ROPENREPLY; 635 ch->remote_msgqueue_pa = args->local_msgqueue_pa; 636 637 if (args->local_nentries < ch->remote_nentries) { 638 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new " 639 "remote_nentries=%d, old remote_nentries=%d, " 640 "partid=%d, channel=%d\n", 641 args->local_nentries, ch->remote_nentries, 642 ch->partid, ch->number); 643 644 ch->remote_nentries = args->local_nentries; 645 } 646 if (args->remote_nentries < ch->local_nentries) { 647 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new " 648 "local_nentries=%d, old local_nentries=%d, " 649 "partid=%d, channel=%d\n", 650 args->remote_nentries, ch->local_nentries, 651 ch->partid, ch->number); 652 653 ch->local_nentries = args->remote_nentries; 654 } 655 656 xpc_process_connect(ch, &irq_flags); 657 } 658 659 spin_unlock_irqrestore(&ch->lock, irq_flags); 660 } 661 662 /* 663 * Attempt to establish a channel connection to a remote partition. 664 */ 665 static enum xp_retval 666 xpc_connect_channel(struct xpc_channel *ch) 667 { 668 unsigned long irq_flags; 669 struct xpc_registration *registration = &xpc_registrations[ch->number]; 670 671 if (mutex_trylock(®istration->mutex) == 0) 672 return xpRetry; 673 674 if (!XPC_CHANNEL_REGISTERED(ch->number)) { 675 mutex_unlock(®istration->mutex); 676 return xpUnregistered; 677 } 678 679 spin_lock_irqsave(&ch->lock, irq_flags); 680 681 DBUG_ON(ch->flags & XPC_C_CONNECTED); 682 DBUG_ON(ch->flags & XPC_C_OPENREQUEST); 683 684 if (ch->flags & XPC_C_DISCONNECTING) { 685 spin_unlock_irqrestore(&ch->lock, irq_flags); 686 mutex_unlock(®istration->mutex); 687 return ch->reason; 688 } 689 690 /* add info from the channel connect registration to the channel */ 691 692 ch->kthreads_assigned_limit = registration->assigned_limit; 693 ch->kthreads_idle_limit = registration->idle_limit; 694 DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0); 695 DBUG_ON(atomic_read(&ch->kthreads_idle) != 0); 696 DBUG_ON(atomic_read(&ch->kthreads_active) != 0); 697 698 ch->func = registration->func; 699 DBUG_ON(registration->func == NULL); 700 ch->key = registration->key; 701 702 ch->local_nentries = registration->nentries; 703 704 if (ch->flags & XPC_C_ROPENREQUEST) { 705 if (registration->msg_size != ch->msg_size) { 706 /* the local and remote sides aren't the same */ 707 708 /* 709 * Because XPC_DISCONNECT_CHANNEL() can block we're 710 * forced to up the registration sema before we unlock 711 * the channel lock. But that's okay here because we're 712 * done with the part that required the registration 713 * sema. XPC_DISCONNECT_CHANNEL() requires that the 714 * channel lock be locked and will unlock and relock 715 * the channel lock as needed. 716 */ 717 mutex_unlock(®istration->mutex); 718 XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes, 719 &irq_flags); 720 spin_unlock_irqrestore(&ch->lock, irq_flags); 721 return xpUnequalMsgSizes; 722 } 723 } else { 724 ch->msg_size = registration->msg_size; 725 726 XPC_SET_REASON(ch, 0, 0); 727 ch->flags &= ~XPC_C_DISCONNECTED; 728 729 atomic_inc(&xpc_partitions[ch->partid].nchannels_active); 730 } 731 732 mutex_unlock(®istration->mutex); 733 734 /* initiate the connection */ 735 736 ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING); 737 xpc_IPI_send_openrequest(ch, &irq_flags); 738 739 xpc_process_connect(ch, &irq_flags); 740 741 spin_unlock_irqrestore(&ch->lock, irq_flags); 742 743 return xpSuccess; 744 } 745 746 /* 747 * Clear some of the msg flags in the local message queue. 748 */ 749 static inline void 750 xpc_clear_local_msgqueue_flags(struct xpc_channel *ch) 751 { 752 struct xpc_msg *msg; 753 s64 get; 754 755 get = ch->w_remote_GP.get; 756 do { 757 msg = (struct xpc_msg *)((u64)ch->local_msgqueue + 758 (get % ch->local_nentries) * 759 ch->msg_size); 760 msg->flags = 0; 761 } while (++get < ch->remote_GP.get); 762 } 763 764 /* 765 * Clear some of the msg flags in the remote message queue. 766 */ 767 static inline void 768 xpc_clear_remote_msgqueue_flags(struct xpc_channel *ch) 769 { 770 struct xpc_msg *msg; 771 s64 put; 772 773 put = ch->w_remote_GP.put; 774 do { 775 msg = (struct xpc_msg *)((u64)ch->remote_msgqueue + 776 (put % ch->remote_nentries) * 777 ch->msg_size); 778 msg->flags = 0; 779 } while (++put < ch->remote_GP.put); 780 } 781 782 static void 783 xpc_process_msg_IPI(struct xpc_partition *part, int ch_number) 784 { 785 struct xpc_channel *ch = &part->channels[ch_number]; 786 int nmsgs_sent; 787 788 ch->remote_GP = part->remote_GPs[ch_number]; 789 790 /* See what, if anything, has changed for each connected channel */ 791 792 xpc_msgqueue_ref(ch); 793 794 if (ch->w_remote_GP.get == ch->remote_GP.get && 795 ch->w_remote_GP.put == ch->remote_GP.put) { 796 /* nothing changed since GPs were last pulled */ 797 xpc_msgqueue_deref(ch); 798 return; 799 } 800 801 if (!(ch->flags & XPC_C_CONNECTED)) { 802 xpc_msgqueue_deref(ch); 803 return; 804 } 805 806 /* 807 * First check to see if messages recently sent by us have been 808 * received by the other side. (The remote GET value will have 809 * changed since we last looked at it.) 810 */ 811 812 if (ch->w_remote_GP.get != ch->remote_GP.get) { 813 814 /* 815 * We need to notify any senders that want to be notified 816 * that their sent messages have been received by their 817 * intended recipients. We need to do this before updating 818 * w_remote_GP.get so that we don't allocate the same message 819 * queue entries prematurely (see xpc_allocate_msg()). 820 */ 821 if (atomic_read(&ch->n_to_notify) > 0) { 822 /* 823 * Notify senders that messages sent have been 824 * received and delivered by the other side. 825 */ 826 xpc_notify_senders(ch, xpMsgDelivered, 827 ch->remote_GP.get); 828 } 829 830 /* 831 * Clear msg->flags in previously sent messages, so that 832 * they're ready for xpc_allocate_msg(). 833 */ 834 xpc_clear_local_msgqueue_flags(ch); 835 836 ch->w_remote_GP.get = ch->remote_GP.get; 837 838 dev_dbg(xpc_chan, "w_remote_GP.get changed to %ld, partid=%d, " 839 "channel=%d\n", ch->w_remote_GP.get, ch->partid, 840 ch->number); 841 842 /* 843 * If anyone was waiting for message queue entries to become 844 * available, wake them up. 845 */ 846 if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) 847 wake_up(&ch->msg_allocate_wq); 848 } 849 850 /* 851 * Now check for newly sent messages by the other side. (The remote 852 * PUT value will have changed since we last looked at it.) 853 */ 854 855 if (ch->w_remote_GP.put != ch->remote_GP.put) { 856 /* 857 * Clear msg->flags in previously received messages, so that 858 * they're ready for xpc_get_deliverable_msg(). 859 */ 860 xpc_clear_remote_msgqueue_flags(ch); 861 862 ch->w_remote_GP.put = ch->remote_GP.put; 863 864 dev_dbg(xpc_chan, "w_remote_GP.put changed to %ld, partid=%d, " 865 "channel=%d\n", ch->w_remote_GP.put, ch->partid, 866 ch->number); 867 868 nmsgs_sent = ch->w_remote_GP.put - ch->w_local_GP.get; 869 if (nmsgs_sent > 0) { 870 dev_dbg(xpc_chan, "msgs waiting to be copied and " 871 "delivered=%d, partid=%d, channel=%d\n", 872 nmsgs_sent, ch->partid, ch->number); 873 874 if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) 875 xpc_activate_kthreads(ch, nmsgs_sent); 876 } 877 } 878 879 xpc_msgqueue_deref(ch); 880 } 881 882 void 883 xpc_process_channel_activity(struct xpc_partition *part) 884 { 885 unsigned long irq_flags; 886 u64 IPI_amo, IPI_flags; 887 struct xpc_channel *ch; 888 int ch_number; 889 u32 ch_flags; 890 891 IPI_amo = xpc_get_IPI_flags(part); 892 893 /* 894 * Initiate channel connections for registered channels. 895 * 896 * For each connected channel that has pending messages activate idle 897 * kthreads and/or create new kthreads as needed. 898 */ 899 900 for (ch_number = 0; ch_number < part->nchannels; ch_number++) { 901 ch = &part->channels[ch_number]; 902 903 /* 904 * Process any open or close related IPI flags, and then deal 905 * with connecting or disconnecting the channel as required. 906 */ 907 908 IPI_flags = XPC_GET_IPI_FLAGS(IPI_amo, ch_number); 909 910 if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags)) 911 xpc_process_openclose_IPI(part, ch_number, IPI_flags); 912 913 ch_flags = ch->flags; /* need an atomic snapshot of flags */ 914 915 if (ch_flags & XPC_C_DISCONNECTING) { 916 spin_lock_irqsave(&ch->lock, irq_flags); 917 xpc_process_disconnect(ch, &irq_flags); 918 spin_unlock_irqrestore(&ch->lock, irq_flags); 919 continue; 920 } 921 922 if (part->act_state == XPC_P_DEACTIVATING) 923 continue; 924 925 if (!(ch_flags & XPC_C_CONNECTED)) { 926 if (!(ch_flags & XPC_C_OPENREQUEST)) { 927 DBUG_ON(ch_flags & XPC_C_SETUP); 928 (void)xpc_connect_channel(ch); 929 } else { 930 spin_lock_irqsave(&ch->lock, irq_flags); 931 xpc_process_connect(ch, &irq_flags); 932 spin_unlock_irqrestore(&ch->lock, irq_flags); 933 } 934 continue; 935 } 936 937 /* 938 * Process any message related IPI flags, this may involve the 939 * activation of kthreads to deliver any pending messages sent 940 * from the other partition. 941 */ 942 943 if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags)) 944 xpc_process_msg_IPI(part, ch_number); 945 } 946 } 947 948 /* 949 * XPC's heartbeat code calls this function to inform XPC that a partition is 950 * going down. XPC responds by tearing down the XPartition Communication 951 * infrastructure used for the just downed partition. 952 * 953 * XPC's heartbeat code will never call this function and xpc_partition_up() 954 * at the same time. Nor will it ever make multiple calls to either function 955 * at the same time. 956 */ 957 void 958 xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason) 959 { 960 unsigned long irq_flags; 961 int ch_number; 962 struct xpc_channel *ch; 963 964 dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n", 965 XPC_PARTID(part), reason); 966 967 if (!xpc_part_ref(part)) { 968 /* infrastructure for this partition isn't currently set up */ 969 return; 970 } 971 972 /* disconnect channels associated with the partition going down */ 973 974 for (ch_number = 0; ch_number < part->nchannels; ch_number++) { 975 ch = &part->channels[ch_number]; 976 977 xpc_msgqueue_ref(ch); 978 spin_lock_irqsave(&ch->lock, irq_flags); 979 980 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags); 981 982 spin_unlock_irqrestore(&ch->lock, irq_flags); 983 xpc_msgqueue_deref(ch); 984 } 985 986 xpc_wakeup_channel_mgr(part); 987 988 xpc_part_deref(part); 989 } 990 991 /* 992 * Called by XP at the time of channel connection registration to cause 993 * XPC to establish connections to all currently active partitions. 994 */ 995 void 996 xpc_initiate_connect(int ch_number) 997 { 998 short partid; 999 struct xpc_partition *part; 1000 struct xpc_channel *ch; 1001 1002 DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS); 1003 1004 for (partid = 0; partid < xp_max_npartitions; partid++) { 1005 part = &xpc_partitions[partid]; 1006 1007 if (xpc_part_ref(part)) { 1008 ch = &part->channels[ch_number]; 1009 1010 /* 1011 * Initiate the establishment of a connection on the 1012 * newly registered channel to the remote partition. 1013 */ 1014 xpc_wakeup_channel_mgr(part); 1015 xpc_part_deref(part); 1016 } 1017 } 1018 } 1019 1020 void 1021 xpc_connected_callout(struct xpc_channel *ch) 1022 { 1023 /* let the registerer know that a connection has been established */ 1024 1025 if (ch->func != NULL) { 1026 dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, " 1027 "partid=%d, channel=%d\n", ch->partid, ch->number); 1028 1029 ch->func(xpConnected, ch->partid, ch->number, 1030 (void *)(u64)ch->local_nentries, ch->key); 1031 1032 dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, " 1033 "partid=%d, channel=%d\n", ch->partid, ch->number); 1034 } 1035 } 1036 1037 /* 1038 * Called by XP at the time of channel connection unregistration to cause 1039 * XPC to teardown all current connections for the specified channel. 1040 * 1041 * Before returning xpc_initiate_disconnect() will wait until all connections 1042 * on the specified channel have been closed/torndown. So the caller can be 1043 * assured that they will not be receiving any more callouts from XPC to the 1044 * function they registered via xpc_connect(). 1045 * 1046 * Arguments: 1047 * 1048 * ch_number - channel # to unregister. 1049 */ 1050 void 1051 xpc_initiate_disconnect(int ch_number) 1052 { 1053 unsigned long irq_flags; 1054 short partid; 1055 struct xpc_partition *part; 1056 struct xpc_channel *ch; 1057 1058 DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS); 1059 1060 /* initiate the channel disconnect for every active partition */ 1061 for (partid = 0; partid < xp_max_npartitions; partid++) { 1062 part = &xpc_partitions[partid]; 1063 1064 if (xpc_part_ref(part)) { 1065 ch = &part->channels[ch_number]; 1066 xpc_msgqueue_ref(ch); 1067 1068 spin_lock_irqsave(&ch->lock, irq_flags); 1069 1070 if (!(ch->flags & XPC_C_DISCONNECTED)) { 1071 ch->flags |= XPC_C_WDISCONNECT; 1072 1073 XPC_DISCONNECT_CHANNEL(ch, xpUnregistering, 1074 &irq_flags); 1075 } 1076 1077 spin_unlock_irqrestore(&ch->lock, irq_flags); 1078 1079 xpc_msgqueue_deref(ch); 1080 xpc_part_deref(part); 1081 } 1082 } 1083 1084 xpc_disconnect_wait(ch_number); 1085 } 1086 1087 /* 1088 * To disconnect a channel, and reflect it back to all who may be waiting. 1089 * 1090 * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by 1091 * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by 1092 * xpc_disconnect_wait(). 1093 * 1094 * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN. 1095 */ 1096 void 1097 xpc_disconnect_channel(const int line, struct xpc_channel *ch, 1098 enum xp_retval reason, unsigned long *irq_flags) 1099 { 1100 u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED); 1101 1102 DBUG_ON(!spin_is_locked(&ch->lock)); 1103 1104 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) 1105 return; 1106 1107 DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED))); 1108 1109 dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n", 1110 reason, line, ch->partid, ch->number); 1111 1112 XPC_SET_REASON(ch, reason, line); 1113 1114 ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING); 1115 /* some of these may not have been set */ 1116 ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY | 1117 XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY | 1118 XPC_C_CONNECTING | XPC_C_CONNECTED); 1119 1120 xpc_IPI_send_closerequest(ch, irq_flags); 1121 1122 if (channel_was_connected) 1123 ch->flags |= XPC_C_WASCONNECTED; 1124 1125 spin_unlock_irqrestore(&ch->lock, *irq_flags); 1126 1127 /* wake all idle kthreads so they can exit */ 1128 if (atomic_read(&ch->kthreads_idle) > 0) { 1129 wake_up_all(&ch->idle_wq); 1130 1131 } else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) && 1132 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) { 1133 /* start a kthread that will do the xpDisconnecting callout */ 1134 xpc_create_kthreads(ch, 1, 1); 1135 } 1136 1137 /* wake those waiting to allocate an entry from the local msg queue */ 1138 if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) 1139 wake_up(&ch->msg_allocate_wq); 1140 1141 spin_lock_irqsave(&ch->lock, *irq_flags); 1142 } 1143 1144 void 1145 xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason) 1146 { 1147 /* 1148 * Let the channel's registerer know that the channel is being 1149 * disconnected. We don't want to do this if the registerer was never 1150 * informed of a connection being made. 1151 */ 1152 1153 if (ch->func != NULL) { 1154 dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, " 1155 "channel=%d\n", reason, ch->partid, ch->number); 1156 1157 ch->func(reason, ch->partid, ch->number, NULL, ch->key); 1158 1159 dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, " 1160 "channel=%d\n", reason, ch->partid, ch->number); 1161 } 1162 } 1163 1164 /* 1165 * Wait for a message entry to become available for the specified channel, 1166 * but don't wait any longer than 1 jiffy. 1167 */ 1168 static enum xp_retval 1169 xpc_allocate_msg_wait(struct xpc_channel *ch) 1170 { 1171 enum xp_retval ret; 1172 1173 if (ch->flags & XPC_C_DISCONNECTING) { 1174 DBUG_ON(ch->reason == xpInterrupted); 1175 return ch->reason; 1176 } 1177 1178 atomic_inc(&ch->n_on_msg_allocate_wq); 1179 ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1); 1180 atomic_dec(&ch->n_on_msg_allocate_wq); 1181 1182 if (ch->flags & XPC_C_DISCONNECTING) { 1183 ret = ch->reason; 1184 DBUG_ON(ch->reason == xpInterrupted); 1185 } else if (ret == 0) { 1186 ret = xpTimeout; 1187 } else { 1188 ret = xpInterrupted; 1189 } 1190 1191 return ret; 1192 } 1193 1194 /* 1195 * Allocate an entry for a message from the message queue associated with the 1196 * specified channel. 1197 */ 1198 static enum xp_retval 1199 xpc_allocate_msg(struct xpc_channel *ch, u32 flags, 1200 struct xpc_msg **address_of_msg) 1201 { 1202 struct xpc_msg *msg; 1203 enum xp_retval ret; 1204 s64 put; 1205 1206 /* this reference will be dropped in xpc_send_msg() */ 1207 xpc_msgqueue_ref(ch); 1208 1209 if (ch->flags & XPC_C_DISCONNECTING) { 1210 xpc_msgqueue_deref(ch); 1211 return ch->reason; 1212 } 1213 if (!(ch->flags & XPC_C_CONNECTED)) { 1214 xpc_msgqueue_deref(ch); 1215 return xpNotConnected; 1216 } 1217 1218 /* 1219 * Get the next available message entry from the local message queue. 1220 * If none are available, we'll make sure that we grab the latest 1221 * GP values. 1222 */ 1223 ret = xpTimeout; 1224 1225 while (1) { 1226 1227 put = ch->w_local_GP.put; 1228 rmb(); /* guarantee that .put loads before .get */ 1229 if (put - ch->w_remote_GP.get < ch->local_nentries) { 1230 1231 /* There are available message entries. We need to try 1232 * to secure one for ourselves. We'll do this by trying 1233 * to increment w_local_GP.put as long as someone else 1234 * doesn't beat us to it. If they do, we'll have to 1235 * try again. 1236 */ 1237 if (cmpxchg(&ch->w_local_GP.put, put, put + 1) == put) { 1238 /* we got the entry referenced by put */ 1239 break; 1240 } 1241 continue; /* try again */ 1242 } 1243 1244 /* 1245 * There aren't any available msg entries at this time. 1246 * 1247 * In waiting for a message entry to become available, 1248 * we set a timeout in case the other side is not 1249 * sending completion IPIs. This lets us fake an IPI 1250 * that will cause the IPI handler to fetch the latest 1251 * GP values as if an IPI was sent by the other side. 1252 */ 1253 if (ret == xpTimeout) 1254 xpc_IPI_send_local_msgrequest(ch); 1255 1256 if (flags & XPC_NOWAIT) { 1257 xpc_msgqueue_deref(ch); 1258 return xpNoWait; 1259 } 1260 1261 ret = xpc_allocate_msg_wait(ch); 1262 if (ret != xpInterrupted && ret != xpTimeout) { 1263 xpc_msgqueue_deref(ch); 1264 return ret; 1265 } 1266 } 1267 1268 /* get the message's address and initialize it */ 1269 msg = (struct xpc_msg *)((u64)ch->local_msgqueue + 1270 (put % ch->local_nentries) * ch->msg_size); 1271 1272 DBUG_ON(msg->flags != 0); 1273 msg->number = put; 1274 1275 dev_dbg(xpc_chan, "w_local_GP.put changed to %ld; msg=0x%p, " 1276 "msg_number=%ld, partid=%d, channel=%d\n", put + 1, 1277 (void *)msg, msg->number, ch->partid, ch->number); 1278 1279 *address_of_msg = msg; 1280 1281 return xpSuccess; 1282 } 1283 1284 /* 1285 * Allocate an entry for a message from the message queue associated with the 1286 * specified channel. NOTE that this routine can sleep waiting for a message 1287 * entry to become available. To not sleep, pass in the XPC_NOWAIT flag. 1288 * 1289 * Arguments: 1290 * 1291 * partid - ID of partition to which the channel is connected. 1292 * ch_number - channel #. 1293 * flags - see xpc.h for valid flags. 1294 * payload - address of the allocated payload area pointer (filled in on 1295 * return) in which the user-defined message is constructed. 1296 */ 1297 enum xp_retval 1298 xpc_initiate_allocate(short partid, int ch_number, u32 flags, void **payload) 1299 { 1300 struct xpc_partition *part = &xpc_partitions[partid]; 1301 enum xp_retval ret = xpUnknownReason; 1302 struct xpc_msg *msg = NULL; 1303 1304 DBUG_ON(partid < 0 || partid >= xp_max_npartitions); 1305 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); 1306 1307 *payload = NULL; 1308 1309 if (xpc_part_ref(part)) { 1310 ret = xpc_allocate_msg(&part->channels[ch_number], flags, &msg); 1311 xpc_part_deref(part); 1312 1313 if (msg != NULL) 1314 *payload = &msg->payload; 1315 } 1316 1317 return ret; 1318 } 1319 1320 /* 1321 * Now we actually send the messages that are ready to be sent by advancing 1322 * the local message queue's Put value and then send an IPI to the recipient 1323 * partition. 1324 */ 1325 static void 1326 xpc_send_msgs(struct xpc_channel *ch, s64 initial_put) 1327 { 1328 struct xpc_msg *msg; 1329 s64 put = initial_put + 1; 1330 int send_IPI = 0; 1331 1332 while (1) { 1333 1334 while (1) { 1335 if (put == ch->w_local_GP.put) 1336 break; 1337 1338 msg = (struct xpc_msg *)((u64)ch->local_msgqueue + 1339 (put % ch->local_nentries) * 1340 ch->msg_size); 1341 1342 if (!(msg->flags & XPC_M_READY)) 1343 break; 1344 1345 put++; 1346 } 1347 1348 if (put == initial_put) { 1349 /* nothing's changed */ 1350 break; 1351 } 1352 1353 if (cmpxchg_rel(&ch->local_GP->put, initial_put, put) != 1354 initial_put) { 1355 /* someone else beat us to it */ 1356 DBUG_ON(ch->local_GP->put < initial_put); 1357 break; 1358 } 1359 1360 /* we just set the new value of local_GP->put */ 1361 1362 dev_dbg(xpc_chan, "local_GP->put changed to %ld, partid=%d, " 1363 "channel=%d\n", put, ch->partid, ch->number); 1364 1365 send_IPI = 1; 1366 1367 /* 1368 * We need to ensure that the message referenced by 1369 * local_GP->put is not XPC_M_READY or that local_GP->put 1370 * equals w_local_GP.put, so we'll go have a look. 1371 */ 1372 initial_put = put; 1373 } 1374 1375 if (send_IPI) 1376 xpc_IPI_send_msgrequest(ch); 1377 } 1378 1379 /* 1380 * Common code that does the actual sending of the message by advancing the 1381 * local message queue's Put value and sends an IPI to the partition the 1382 * message is being sent to. 1383 */ 1384 static enum xp_retval 1385 xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type, 1386 xpc_notify_func func, void *key) 1387 { 1388 enum xp_retval ret = xpSuccess; 1389 struct xpc_notify *notify = notify; 1390 s64 put, msg_number = msg->number; 1391 1392 DBUG_ON(notify_type == XPC_N_CALL && func == NULL); 1393 DBUG_ON((((u64)msg - (u64)ch->local_msgqueue) / ch->msg_size) != 1394 msg_number % ch->local_nentries); 1395 DBUG_ON(msg->flags & XPC_M_READY); 1396 1397 if (ch->flags & XPC_C_DISCONNECTING) { 1398 /* drop the reference grabbed in xpc_allocate_msg() */ 1399 xpc_msgqueue_deref(ch); 1400 return ch->reason; 1401 } 1402 1403 if (notify_type != 0) { 1404 /* 1405 * Tell the remote side to send an ACK interrupt when the 1406 * message has been delivered. 1407 */ 1408 msg->flags |= XPC_M_INTERRUPT; 1409 1410 atomic_inc(&ch->n_to_notify); 1411 1412 notify = &ch->notify_queue[msg_number % ch->local_nentries]; 1413 notify->func = func; 1414 notify->key = key; 1415 notify->type = notify_type; 1416 1417 /* >>> is a mb() needed here? */ 1418 1419 if (ch->flags & XPC_C_DISCONNECTING) { 1420 /* 1421 * An error occurred between our last error check and 1422 * this one. We will try to clear the type field from 1423 * the notify entry. If we succeed then 1424 * xpc_disconnect_channel() didn't already process 1425 * the notify entry. 1426 */ 1427 if (cmpxchg(¬ify->type, notify_type, 0) == 1428 notify_type) { 1429 atomic_dec(&ch->n_to_notify); 1430 ret = ch->reason; 1431 } 1432 1433 /* drop the reference grabbed in xpc_allocate_msg() */ 1434 xpc_msgqueue_deref(ch); 1435 return ret; 1436 } 1437 } 1438 1439 msg->flags |= XPC_M_READY; 1440 1441 /* 1442 * The preceding store of msg->flags must occur before the following 1443 * load of ch->local_GP->put. 1444 */ 1445 mb(); 1446 1447 /* see if the message is next in line to be sent, if so send it */ 1448 1449 put = ch->local_GP->put; 1450 if (put == msg_number) 1451 xpc_send_msgs(ch, put); 1452 1453 /* drop the reference grabbed in xpc_allocate_msg() */ 1454 xpc_msgqueue_deref(ch); 1455 return ret; 1456 } 1457 1458 /* 1459 * Send a message previously allocated using xpc_initiate_allocate() on the 1460 * specified channel connected to the specified partition. 1461 * 1462 * This routine will not wait for the message to be received, nor will 1463 * notification be given when it does happen. Once this routine has returned 1464 * the message entry allocated via xpc_initiate_allocate() is no longer 1465 * accessable to the caller. 1466 * 1467 * This routine, although called by users, does not call xpc_part_ref() to 1468 * ensure that the partition infrastructure is in place. It relies on the 1469 * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg(). 1470 * 1471 * Arguments: 1472 * 1473 * partid - ID of partition to which the channel is connected. 1474 * ch_number - channel # to send message on. 1475 * payload - pointer to the payload area allocated via 1476 * xpc_initiate_allocate(). 1477 */ 1478 enum xp_retval 1479 xpc_initiate_send(short partid, int ch_number, void *payload) 1480 { 1481 struct xpc_partition *part = &xpc_partitions[partid]; 1482 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload); 1483 enum xp_retval ret; 1484 1485 dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg, 1486 partid, ch_number); 1487 1488 DBUG_ON(partid < 0 || partid >= xp_max_npartitions); 1489 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); 1490 DBUG_ON(msg == NULL); 1491 1492 ret = xpc_send_msg(&part->channels[ch_number], msg, 0, NULL, NULL); 1493 1494 return ret; 1495 } 1496 1497 /* 1498 * Send a message previously allocated using xpc_initiate_allocate on the 1499 * specified channel connected to the specified partition. 1500 * 1501 * This routine will not wait for the message to be sent. Once this routine 1502 * has returned the message entry allocated via xpc_initiate_allocate() is no 1503 * longer accessable to the caller. 1504 * 1505 * Once the remote end of the channel has received the message, the function 1506 * passed as an argument to xpc_initiate_send_notify() will be called. This 1507 * allows the sender to free up or re-use any buffers referenced by the 1508 * message, but does NOT mean the message has been processed at the remote 1509 * end by a receiver. 1510 * 1511 * If this routine returns an error, the caller's function will NOT be called. 1512 * 1513 * This routine, although called by users, does not call xpc_part_ref() to 1514 * ensure that the partition infrastructure is in place. It relies on the 1515 * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg(). 1516 * 1517 * Arguments: 1518 * 1519 * partid - ID of partition to which the channel is connected. 1520 * ch_number - channel # to send message on. 1521 * payload - pointer to the payload area allocated via 1522 * xpc_initiate_allocate(). 1523 * func - function to call with asynchronous notification of message 1524 * receipt. THIS FUNCTION MUST BE NON-BLOCKING. 1525 * key - user-defined key to be passed to the function when it's called. 1526 */ 1527 enum xp_retval 1528 xpc_initiate_send_notify(short partid, int ch_number, void *payload, 1529 xpc_notify_func func, void *key) 1530 { 1531 struct xpc_partition *part = &xpc_partitions[partid]; 1532 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload); 1533 enum xp_retval ret; 1534 1535 dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg, 1536 partid, ch_number); 1537 1538 DBUG_ON(partid < 0 || partid >= xp_max_npartitions); 1539 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); 1540 DBUG_ON(msg == NULL); 1541 DBUG_ON(func == NULL); 1542 1543 ret = xpc_send_msg(&part->channels[ch_number], msg, XPC_N_CALL, 1544 func, key); 1545 return ret; 1546 } 1547 1548 /* 1549 * Deliver a message to its intended recipient. 1550 */ 1551 void 1552 xpc_deliver_msg(struct xpc_channel *ch) 1553 { 1554 struct xpc_msg *msg; 1555 1556 msg = xpc_get_deliverable_msg(ch); 1557 if (msg != NULL) { 1558 1559 /* 1560 * This ref is taken to protect the payload itself from being 1561 * freed before the user is finished with it, which the user 1562 * indicates by calling xpc_initiate_received(). 1563 */ 1564 xpc_msgqueue_ref(ch); 1565 1566 atomic_inc(&ch->kthreads_active); 1567 1568 if (ch->func != NULL) { 1569 dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, " 1570 "msg_number=%ld, partid=%d, channel=%d\n", 1571 (void *)msg, msg->number, ch->partid, 1572 ch->number); 1573 1574 /* deliver the message to its intended recipient */ 1575 ch->func(xpMsgReceived, ch->partid, ch->number, 1576 &msg->payload, ch->key); 1577 1578 dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, " 1579 "msg_number=%ld, partid=%d, channel=%d\n", 1580 (void *)msg, msg->number, ch->partid, 1581 ch->number); 1582 } 1583 1584 atomic_dec(&ch->kthreads_active); 1585 } 1586 } 1587 1588 /* 1589 * Now we actually acknowledge the messages that have been delivered and ack'd 1590 * by advancing the cached remote message queue's Get value and if requested 1591 * send an IPI to the message sender's partition. 1592 */ 1593 static void 1594 xpc_acknowledge_msgs(struct xpc_channel *ch, s64 initial_get, u8 msg_flags) 1595 { 1596 struct xpc_msg *msg; 1597 s64 get = initial_get + 1; 1598 int send_IPI = 0; 1599 1600 while (1) { 1601 1602 while (1) { 1603 if (get == ch->w_local_GP.get) 1604 break; 1605 1606 msg = (struct xpc_msg *)((u64)ch->remote_msgqueue + 1607 (get % ch->remote_nentries) * 1608 ch->msg_size); 1609 1610 if (!(msg->flags & XPC_M_DONE)) 1611 break; 1612 1613 msg_flags |= msg->flags; 1614 get++; 1615 } 1616 1617 if (get == initial_get) { 1618 /* nothing's changed */ 1619 break; 1620 } 1621 1622 if (cmpxchg_rel(&ch->local_GP->get, initial_get, get) != 1623 initial_get) { 1624 /* someone else beat us to it */ 1625 DBUG_ON(ch->local_GP->get <= initial_get); 1626 break; 1627 } 1628 1629 /* we just set the new value of local_GP->get */ 1630 1631 dev_dbg(xpc_chan, "local_GP->get changed to %ld, partid=%d, " 1632 "channel=%d\n", get, ch->partid, ch->number); 1633 1634 send_IPI = (msg_flags & XPC_M_INTERRUPT); 1635 1636 /* 1637 * We need to ensure that the message referenced by 1638 * local_GP->get is not XPC_M_DONE or that local_GP->get 1639 * equals w_local_GP.get, so we'll go have a look. 1640 */ 1641 initial_get = get; 1642 } 1643 1644 if (send_IPI) 1645 xpc_IPI_send_msgrequest(ch); 1646 } 1647 1648 /* 1649 * Acknowledge receipt of a delivered message. 1650 * 1651 * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition 1652 * that sent the message. 1653 * 1654 * This function, although called by users, does not call xpc_part_ref() to 1655 * ensure that the partition infrastructure is in place. It relies on the 1656 * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg(). 1657 * 1658 * Arguments: 1659 * 1660 * partid - ID of partition to which the channel is connected. 1661 * ch_number - channel # message received on. 1662 * payload - pointer to the payload area allocated via 1663 * xpc_initiate_allocate(). 1664 */ 1665 void 1666 xpc_initiate_received(short partid, int ch_number, void *payload) 1667 { 1668 struct xpc_partition *part = &xpc_partitions[partid]; 1669 struct xpc_channel *ch; 1670 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload); 1671 s64 get, msg_number = msg->number; 1672 1673 DBUG_ON(partid < 0 || partid >= xp_max_npartitions); 1674 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); 1675 1676 ch = &part->channels[ch_number]; 1677 1678 dev_dbg(xpc_chan, "msg=0x%p, msg_number=%ld, partid=%d, channel=%d\n", 1679 (void *)msg, msg_number, ch->partid, ch->number); 1680 1681 DBUG_ON((((u64)msg - (u64)ch->remote_msgqueue) / ch->msg_size) != 1682 msg_number % ch->remote_nentries); 1683 DBUG_ON(msg->flags & XPC_M_DONE); 1684 1685 msg->flags |= XPC_M_DONE; 1686 1687 /* 1688 * The preceding store of msg->flags must occur before the following 1689 * load of ch->local_GP->get. 1690 */ 1691 mb(); 1692 1693 /* 1694 * See if this message is next in line to be acknowledged as having 1695 * been delivered. 1696 */ 1697 get = ch->local_GP->get; 1698 if (get == msg_number) 1699 xpc_acknowledge_msgs(ch, get, msg->flags); 1700 1701 /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg() */ 1702 xpc_msgqueue_deref(ch); 1703 } 1704