1 /* 2 * This file is subject to the terms and conditions of the GNU General Public 3 * License. See the file "COPYING" in the main directory of this archive 4 * for more details. 5 * 6 * Copyright (c) 2004-2008 Silicon Graphics, Inc. All Rights Reserved. 7 */ 8 9 /* 10 * Cross Partition Communication (XPC) channel support. 11 * 12 * This is the part of XPC that manages the channels and 13 * sends/receives messages across them to/from other partitions. 14 * 15 */ 16 17 #include <linux/kernel.h> 18 #include <linux/init.h> 19 #include <linux/sched.h> 20 #include <linux/cache.h> 21 #include <linux/interrupt.h> 22 #include <linux/mutex.h> 23 #include <linux/completion.h> 24 #include <asm/sn/sn_sal.h> 25 #include "xpc.h" 26 27 /* 28 * Guarantee that the kzalloc'd memory is cacheline aligned. 29 */ 30 static void * 31 xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base) 32 { 33 /* see if kzalloc will give us cachline aligned memory by default */ 34 *base = kzalloc(size, flags); 35 if (*base == NULL) 36 return NULL; 37 38 if ((u64)*base == L1_CACHE_ALIGN((u64)*base)) 39 return *base; 40 41 kfree(*base); 42 43 /* nope, we'll have to do it ourselves */ 44 *base = kzalloc(size + L1_CACHE_BYTES, flags); 45 if (*base == NULL) 46 return NULL; 47 48 return (void *)L1_CACHE_ALIGN((u64)*base); 49 } 50 51 /* 52 * Set up the initial values for the XPartition Communication channels. 53 */ 54 static void 55 xpc_initialize_channels(struct xpc_partition *part, short partid) 56 { 57 int ch_number; 58 struct xpc_channel *ch; 59 60 for (ch_number = 0; ch_number < part->nchannels; ch_number++) { 61 ch = &part->channels[ch_number]; 62 63 ch->partid = partid; 64 ch->number = ch_number; 65 ch->flags = XPC_C_DISCONNECTED; 66 67 ch->local_GP = &part->local_GPs[ch_number]; 68 ch->local_openclose_args = 69 &part->local_openclose_args[ch_number]; 70 71 atomic_set(&ch->kthreads_assigned, 0); 72 atomic_set(&ch->kthreads_idle, 0); 73 atomic_set(&ch->kthreads_active, 0); 74 75 atomic_set(&ch->references, 0); 76 atomic_set(&ch->n_to_notify, 0); 77 78 spin_lock_init(&ch->lock); 79 mutex_init(&ch->msg_to_pull_mutex); 80 init_completion(&ch->wdisconnect_wait); 81 82 atomic_set(&ch->n_on_msg_allocate_wq, 0); 83 init_waitqueue_head(&ch->msg_allocate_wq); 84 init_waitqueue_head(&ch->idle_wq); 85 } 86 } 87 88 /* 89 * Setup the infrastructure necessary to support XPartition Communication 90 * between the specified remote partition and the local one. 91 */ 92 enum xp_retval 93 xpc_setup_infrastructure(struct xpc_partition *part) 94 { 95 int ret, cpuid; 96 struct timer_list *timer; 97 short partid = XPC_PARTID(part); 98 99 /* 100 * Zero out MOST of the entry for this partition. Only the fields 101 * starting with `nchannels' will be zeroed. The preceding fields must 102 * remain `viable' across partition ups and downs, since they may be 103 * referenced during this memset() operation. 104 */ 105 memset(&part->nchannels, 0, sizeof(struct xpc_partition) - 106 offsetof(struct xpc_partition, nchannels)); 107 108 /* 109 * Allocate all of the channel structures as a contiguous chunk of 110 * memory. 111 */ 112 part->channels = kzalloc(sizeof(struct xpc_channel) * XPC_MAX_NCHANNELS, 113 GFP_KERNEL); 114 if (part->channels == NULL) { 115 dev_err(xpc_chan, "can't get memory for channels\n"); 116 return xpNoMemory; 117 } 118 119 part->nchannels = XPC_MAX_NCHANNELS; 120 121 /* allocate all the required GET/PUT values */ 122 123 part->local_GPs = xpc_kzalloc_cacheline_aligned(XPC_GP_SIZE, 124 GFP_KERNEL, 125 &part->local_GPs_base); 126 if (part->local_GPs == NULL) { 127 kfree(part->channels); 128 part->channels = NULL; 129 dev_err(xpc_chan, "can't get memory for local get/put " 130 "values\n"); 131 return xpNoMemory; 132 } 133 134 part->remote_GPs = xpc_kzalloc_cacheline_aligned(XPC_GP_SIZE, 135 GFP_KERNEL, 136 &part-> 137 remote_GPs_base); 138 if (part->remote_GPs == NULL) { 139 dev_err(xpc_chan, "can't get memory for remote get/put " 140 "values\n"); 141 kfree(part->local_GPs_base); 142 part->local_GPs = NULL; 143 kfree(part->channels); 144 part->channels = NULL; 145 return xpNoMemory; 146 } 147 148 /* allocate all the required open and close args */ 149 150 part->local_openclose_args = 151 xpc_kzalloc_cacheline_aligned(XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL, 152 &part->local_openclose_args_base); 153 if (part->local_openclose_args == NULL) { 154 dev_err(xpc_chan, "can't get memory for local connect args\n"); 155 kfree(part->remote_GPs_base); 156 part->remote_GPs = NULL; 157 kfree(part->local_GPs_base); 158 part->local_GPs = NULL; 159 kfree(part->channels); 160 part->channels = NULL; 161 return xpNoMemory; 162 } 163 164 part->remote_openclose_args = 165 xpc_kzalloc_cacheline_aligned(XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL, 166 &part->remote_openclose_args_base); 167 if (part->remote_openclose_args == NULL) { 168 dev_err(xpc_chan, "can't get memory for remote connect args\n"); 169 kfree(part->local_openclose_args_base); 170 part->local_openclose_args = NULL; 171 kfree(part->remote_GPs_base); 172 part->remote_GPs = NULL; 173 kfree(part->local_GPs_base); 174 part->local_GPs = NULL; 175 kfree(part->channels); 176 part->channels = NULL; 177 return xpNoMemory; 178 } 179 180 xpc_initialize_channels(part, partid); 181 182 atomic_set(&part->nchannels_active, 0); 183 atomic_set(&part->nchannels_engaged, 0); 184 185 /* local_IPI_amo were set to 0 by an earlier memset() */ 186 187 /* Initialize this partitions AMO_t structure */ 188 part->local_IPI_amo_va = xpc_IPI_init(partid); 189 190 spin_lock_init(&part->IPI_lock); 191 192 atomic_set(&part->channel_mgr_requests, 1); 193 init_waitqueue_head(&part->channel_mgr_wq); 194 195 sprintf(part->IPI_owner, "xpc%02d", partid); 196 ret = request_irq(SGI_XPC_NOTIFY, xpc_notify_IRQ_handler, IRQF_SHARED, 197 part->IPI_owner, (void *)(u64)partid); 198 if (ret != 0) { 199 dev_err(xpc_chan, "can't register NOTIFY IRQ handler, " 200 "errno=%d\n", -ret); 201 kfree(part->remote_openclose_args_base); 202 part->remote_openclose_args = NULL; 203 kfree(part->local_openclose_args_base); 204 part->local_openclose_args = NULL; 205 kfree(part->remote_GPs_base); 206 part->remote_GPs = NULL; 207 kfree(part->local_GPs_base); 208 part->local_GPs = NULL; 209 kfree(part->channels); 210 part->channels = NULL; 211 return xpLackOfResources; 212 } 213 214 /* Setup a timer to check for dropped IPIs */ 215 timer = &part->dropped_IPI_timer; 216 init_timer(timer); 217 timer->function = (void (*)(unsigned long))xpc_dropped_IPI_check; 218 timer->data = (unsigned long)part; 219 timer->expires = jiffies + XPC_P_DROPPED_IPI_WAIT; 220 add_timer(timer); 221 222 /* 223 * With the setting of the partition setup_state to XPC_P_SETUP, we're 224 * declaring that this partition is ready to go. 225 */ 226 part->setup_state = XPC_P_SETUP; 227 228 /* 229 * Setup the per partition specific variables required by the 230 * remote partition to establish channel connections with us. 231 * 232 * The setting of the magic # indicates that these per partition 233 * specific variables are ready to be used. 234 */ 235 xpc_vars_part[partid].GPs_pa = __pa(part->local_GPs); 236 xpc_vars_part[partid].openclose_args_pa = 237 __pa(part->local_openclose_args); 238 xpc_vars_part[partid].IPI_amo_pa = __pa(part->local_IPI_amo_va); 239 cpuid = raw_smp_processor_id(); /* any CPU in this partition will do */ 240 xpc_vars_part[partid].IPI_nasid = cpuid_to_nasid(cpuid); 241 xpc_vars_part[partid].IPI_phys_cpuid = cpu_physical_id(cpuid); 242 xpc_vars_part[partid].nchannels = part->nchannels; 243 xpc_vars_part[partid].magic = XPC_VP_MAGIC1; 244 245 return xpSuccess; 246 } 247 248 /* 249 * Create a wrapper that hides the underlying mechanism for pulling a cacheline 250 * (or multiple cachelines) from a remote partition. 251 * 252 * src must be a cacheline aligned physical address on the remote partition. 253 * dst must be a cacheline aligned virtual address on this partition. 254 * cnt must be cacheline sized 255 */ 256 static enum xp_retval 257 xpc_pull_remote_cachelines(struct xpc_partition *part, void *dst, 258 const void *src, size_t cnt) 259 { 260 enum xp_retval ret; 261 262 DBUG_ON((u64)src != L1_CACHE_ALIGN((u64)src)); 263 DBUG_ON((u64)dst != L1_CACHE_ALIGN((u64)dst)); 264 DBUG_ON(cnt != L1_CACHE_ALIGN(cnt)); 265 266 if (part->act_state == XPC_P_DEACTIVATING) 267 return part->reason; 268 269 ret = xp_remote_memcpy(dst, src, cnt); 270 if (ret != xpSuccess) { 271 dev_dbg(xpc_chan, "xp_remote_memcpy() from partition %d failed," 272 " ret=%d\n", XPC_PARTID(part), ret); 273 } 274 return ret; 275 } 276 277 /* 278 * Pull the remote per partition specific variables from the specified 279 * partition. 280 */ 281 enum xp_retval 282 xpc_pull_remote_vars_part(struct xpc_partition *part) 283 { 284 u8 buffer[L1_CACHE_BYTES * 2]; 285 struct xpc_vars_part *pulled_entry_cacheline = 286 (struct xpc_vars_part *)L1_CACHE_ALIGN((u64)buffer); 287 struct xpc_vars_part *pulled_entry; 288 u64 remote_entry_cacheline_pa, remote_entry_pa; 289 short partid = XPC_PARTID(part); 290 enum xp_retval ret; 291 292 /* pull the cacheline that contains the variables we're interested in */ 293 294 DBUG_ON(part->remote_vars_part_pa != 295 L1_CACHE_ALIGN(part->remote_vars_part_pa)); 296 DBUG_ON(sizeof(struct xpc_vars_part) != L1_CACHE_BYTES / 2); 297 298 remote_entry_pa = part->remote_vars_part_pa + 299 sn_partition_id * sizeof(struct xpc_vars_part); 300 301 remote_entry_cacheline_pa = (remote_entry_pa & ~(L1_CACHE_BYTES - 1)); 302 303 pulled_entry = (struct xpc_vars_part *)((u64)pulled_entry_cacheline + 304 (remote_entry_pa & 305 (L1_CACHE_BYTES - 1))); 306 307 ret = xpc_pull_remote_cachelines(part, pulled_entry_cacheline, 308 (void *)remote_entry_cacheline_pa, 309 L1_CACHE_BYTES); 310 if (ret != xpSuccess) { 311 dev_dbg(xpc_chan, "failed to pull XPC vars_part from " 312 "partition %d, ret=%d\n", partid, ret); 313 return ret; 314 } 315 316 /* see if they've been set up yet */ 317 318 if (pulled_entry->magic != XPC_VP_MAGIC1 && 319 pulled_entry->magic != XPC_VP_MAGIC2) { 320 321 if (pulled_entry->magic != 0) { 322 dev_dbg(xpc_chan, "partition %d's XPC vars_part for " 323 "partition %d has bad magic value (=0x%lx)\n", 324 partid, sn_partition_id, pulled_entry->magic); 325 return xpBadMagic; 326 } 327 328 /* they've not been initialized yet */ 329 return xpRetry; 330 } 331 332 if (xpc_vars_part[partid].magic == XPC_VP_MAGIC1) { 333 334 /* validate the variables */ 335 336 if (pulled_entry->GPs_pa == 0 || 337 pulled_entry->openclose_args_pa == 0 || 338 pulled_entry->IPI_amo_pa == 0) { 339 340 dev_err(xpc_chan, "partition %d's XPC vars_part for " 341 "partition %d are not valid\n", partid, 342 sn_partition_id); 343 return xpInvalidAddress; 344 } 345 346 /* the variables we imported look to be valid */ 347 348 part->remote_GPs_pa = pulled_entry->GPs_pa; 349 part->remote_openclose_args_pa = 350 pulled_entry->openclose_args_pa; 351 part->remote_IPI_amo_va = 352 (AMO_t *)__va(pulled_entry->IPI_amo_pa); 353 part->remote_IPI_nasid = pulled_entry->IPI_nasid; 354 part->remote_IPI_phys_cpuid = pulled_entry->IPI_phys_cpuid; 355 356 if (part->nchannels > pulled_entry->nchannels) 357 part->nchannels = pulled_entry->nchannels; 358 359 /* let the other side know that we've pulled their variables */ 360 361 xpc_vars_part[partid].magic = XPC_VP_MAGIC2; 362 } 363 364 if (pulled_entry->magic == XPC_VP_MAGIC1) 365 return xpRetry; 366 367 return xpSuccess; 368 } 369 370 /* 371 * Get the IPI flags and pull the openclose args and/or remote GPs as needed. 372 */ 373 static u64 374 xpc_get_IPI_flags(struct xpc_partition *part) 375 { 376 unsigned long irq_flags; 377 u64 IPI_amo; 378 enum xp_retval ret; 379 380 /* 381 * See if there are any IPI flags to be handled. 382 */ 383 384 spin_lock_irqsave(&part->IPI_lock, irq_flags); 385 IPI_amo = part->local_IPI_amo; 386 if (IPI_amo != 0) 387 part->local_IPI_amo = 0; 388 389 spin_unlock_irqrestore(&part->IPI_lock, irq_flags); 390 391 if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_amo)) { 392 ret = xpc_pull_remote_cachelines(part, 393 part->remote_openclose_args, 394 (void *)part-> 395 remote_openclose_args_pa, 396 XPC_OPENCLOSE_ARGS_SIZE); 397 if (ret != xpSuccess) { 398 XPC_DEACTIVATE_PARTITION(part, ret); 399 400 dev_dbg(xpc_chan, "failed to pull openclose args from " 401 "partition %d, ret=%d\n", XPC_PARTID(part), 402 ret); 403 404 /* don't bother processing IPIs anymore */ 405 IPI_amo = 0; 406 } 407 } 408 409 if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_amo)) { 410 ret = xpc_pull_remote_cachelines(part, part->remote_GPs, 411 (void *)part->remote_GPs_pa, 412 XPC_GP_SIZE); 413 if (ret != xpSuccess) { 414 XPC_DEACTIVATE_PARTITION(part, ret); 415 416 dev_dbg(xpc_chan, "failed to pull GPs from partition " 417 "%d, ret=%d\n", XPC_PARTID(part), ret); 418 419 /* don't bother processing IPIs anymore */ 420 IPI_amo = 0; 421 } 422 } 423 424 return IPI_amo; 425 } 426 427 /* 428 * Allocate the local message queue and the notify queue. 429 */ 430 static enum xp_retval 431 xpc_allocate_local_msgqueue(struct xpc_channel *ch) 432 { 433 unsigned long irq_flags; 434 int nentries; 435 size_t nbytes; 436 437 for (nentries = ch->local_nentries; nentries > 0; nentries--) { 438 439 nbytes = nentries * ch->msg_size; 440 ch->local_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes, 441 GFP_KERNEL, 442 &ch->local_msgqueue_base); 443 if (ch->local_msgqueue == NULL) 444 continue; 445 446 nbytes = nentries * sizeof(struct xpc_notify); 447 ch->notify_queue = kzalloc(nbytes, GFP_KERNEL); 448 if (ch->notify_queue == NULL) { 449 kfree(ch->local_msgqueue_base); 450 ch->local_msgqueue = NULL; 451 continue; 452 } 453 454 spin_lock_irqsave(&ch->lock, irq_flags); 455 if (nentries < ch->local_nentries) { 456 dev_dbg(xpc_chan, "nentries=%d local_nentries=%d, " 457 "partid=%d, channel=%d\n", nentries, 458 ch->local_nentries, ch->partid, ch->number); 459 460 ch->local_nentries = nentries; 461 } 462 spin_unlock_irqrestore(&ch->lock, irq_flags); 463 return xpSuccess; 464 } 465 466 dev_dbg(xpc_chan, "can't get memory for local message queue and notify " 467 "queue, partid=%d, channel=%d\n", ch->partid, ch->number); 468 return xpNoMemory; 469 } 470 471 /* 472 * Allocate the cached remote message queue. 473 */ 474 static enum xp_retval 475 xpc_allocate_remote_msgqueue(struct xpc_channel *ch) 476 { 477 unsigned long irq_flags; 478 int nentries; 479 size_t nbytes; 480 481 DBUG_ON(ch->remote_nentries <= 0); 482 483 for (nentries = ch->remote_nentries; nentries > 0; nentries--) { 484 485 nbytes = nentries * ch->msg_size; 486 ch->remote_msgqueue = xpc_kzalloc_cacheline_aligned(nbytes, 487 GFP_KERNEL, 488 &ch->remote_msgqueue_base); 489 if (ch->remote_msgqueue == NULL) 490 continue; 491 492 spin_lock_irqsave(&ch->lock, irq_flags); 493 if (nentries < ch->remote_nentries) { 494 dev_dbg(xpc_chan, "nentries=%d remote_nentries=%d, " 495 "partid=%d, channel=%d\n", nentries, 496 ch->remote_nentries, ch->partid, ch->number); 497 498 ch->remote_nentries = nentries; 499 } 500 spin_unlock_irqrestore(&ch->lock, irq_flags); 501 return xpSuccess; 502 } 503 504 dev_dbg(xpc_chan, "can't get memory for cached remote message queue, " 505 "partid=%d, channel=%d\n", ch->partid, ch->number); 506 return xpNoMemory; 507 } 508 509 /* 510 * Allocate message queues and other stuff associated with a channel. 511 * 512 * Note: Assumes all of the channel sizes are filled in. 513 */ 514 static enum xp_retval 515 xpc_allocate_msgqueues(struct xpc_channel *ch) 516 { 517 unsigned long irq_flags; 518 enum xp_retval ret; 519 520 DBUG_ON(ch->flags & XPC_C_SETUP); 521 522 ret = xpc_allocate_local_msgqueue(ch); 523 if (ret != xpSuccess) 524 return ret; 525 526 ret = xpc_allocate_remote_msgqueue(ch); 527 if (ret != xpSuccess) { 528 kfree(ch->local_msgqueue_base); 529 ch->local_msgqueue = NULL; 530 kfree(ch->notify_queue); 531 ch->notify_queue = NULL; 532 return ret; 533 } 534 535 spin_lock_irqsave(&ch->lock, irq_flags); 536 ch->flags |= XPC_C_SETUP; 537 spin_unlock_irqrestore(&ch->lock, irq_flags); 538 539 return xpSuccess; 540 } 541 542 /* 543 * Process a connect message from a remote partition. 544 * 545 * Note: xpc_process_connect() is expecting to be called with the 546 * spin_lock_irqsave held and will leave it locked upon return. 547 */ 548 static void 549 xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags) 550 { 551 enum xp_retval ret; 552 553 DBUG_ON(!spin_is_locked(&ch->lock)); 554 555 if (!(ch->flags & XPC_C_OPENREQUEST) || 556 !(ch->flags & XPC_C_ROPENREQUEST)) { 557 /* nothing more to do for now */ 558 return; 559 } 560 DBUG_ON(!(ch->flags & XPC_C_CONNECTING)); 561 562 if (!(ch->flags & XPC_C_SETUP)) { 563 spin_unlock_irqrestore(&ch->lock, *irq_flags); 564 ret = xpc_allocate_msgqueues(ch); 565 spin_lock_irqsave(&ch->lock, *irq_flags); 566 567 if (ret != xpSuccess) 568 XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags); 569 570 if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING)) 571 return; 572 573 DBUG_ON(!(ch->flags & XPC_C_SETUP)); 574 DBUG_ON(ch->local_msgqueue == NULL); 575 DBUG_ON(ch->remote_msgqueue == NULL); 576 } 577 578 if (!(ch->flags & XPC_C_OPENREPLY)) { 579 ch->flags |= XPC_C_OPENREPLY; 580 xpc_IPI_send_openreply(ch, irq_flags); 581 } 582 583 if (!(ch->flags & XPC_C_ROPENREPLY)) 584 return; 585 586 DBUG_ON(ch->remote_msgqueue_pa == 0); 587 588 ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP); /* clear all else */ 589 590 dev_info(xpc_chan, "channel %d to partition %d connected\n", 591 ch->number, ch->partid); 592 593 spin_unlock_irqrestore(&ch->lock, *irq_flags); 594 xpc_create_kthreads(ch, 1, 0); 595 spin_lock_irqsave(&ch->lock, *irq_flags); 596 } 597 598 /* 599 * Notify those who wanted to be notified upon delivery of their message. 600 */ 601 static void 602 xpc_notify_senders(struct xpc_channel *ch, enum xp_retval reason, s64 put) 603 { 604 struct xpc_notify *notify; 605 u8 notify_type; 606 s64 get = ch->w_remote_GP.get - 1; 607 608 while (++get < put && atomic_read(&ch->n_to_notify) > 0) { 609 610 notify = &ch->notify_queue[get % ch->local_nentries]; 611 612 /* 613 * See if the notify entry indicates it was associated with 614 * a message who's sender wants to be notified. It is possible 615 * that it is, but someone else is doing or has done the 616 * notification. 617 */ 618 notify_type = notify->type; 619 if (notify_type == 0 || 620 cmpxchg(¬ify->type, notify_type, 0) != notify_type) { 621 continue; 622 } 623 624 DBUG_ON(notify_type != XPC_N_CALL); 625 626 atomic_dec(&ch->n_to_notify); 627 628 if (notify->func != NULL) { 629 dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, " 630 "msg_number=%ld, partid=%d, channel=%d\n", 631 (void *)notify, get, ch->partid, ch->number); 632 633 notify->func(reason, ch->partid, ch->number, 634 notify->key); 635 636 dev_dbg(xpc_chan, "notify->func() returned, " 637 "notify=0x%p, msg_number=%ld, partid=%d, " 638 "channel=%d\n", (void *)notify, get, 639 ch->partid, ch->number); 640 } 641 } 642 } 643 644 /* 645 * Free up message queues and other stuff that were allocated for the specified 646 * channel. 647 * 648 * Note: ch->reason and ch->reason_line are left set for debugging purposes, 649 * they're cleared when XPC_C_DISCONNECTED is cleared. 650 */ 651 static void 652 xpc_free_msgqueues(struct xpc_channel *ch) 653 { 654 DBUG_ON(!spin_is_locked(&ch->lock)); 655 DBUG_ON(atomic_read(&ch->n_to_notify) != 0); 656 657 ch->remote_msgqueue_pa = 0; 658 ch->func = NULL; 659 ch->key = NULL; 660 ch->msg_size = 0; 661 ch->local_nentries = 0; 662 ch->remote_nentries = 0; 663 ch->kthreads_assigned_limit = 0; 664 ch->kthreads_idle_limit = 0; 665 666 ch->local_GP->get = 0; 667 ch->local_GP->put = 0; 668 ch->remote_GP.get = 0; 669 ch->remote_GP.put = 0; 670 ch->w_local_GP.get = 0; 671 ch->w_local_GP.put = 0; 672 ch->w_remote_GP.get = 0; 673 ch->w_remote_GP.put = 0; 674 ch->next_msg_to_pull = 0; 675 676 if (ch->flags & XPC_C_SETUP) { 677 ch->flags &= ~XPC_C_SETUP; 678 679 dev_dbg(xpc_chan, "ch->flags=0x%x, partid=%d, channel=%d\n", 680 ch->flags, ch->partid, ch->number); 681 682 kfree(ch->local_msgqueue_base); 683 ch->local_msgqueue = NULL; 684 kfree(ch->remote_msgqueue_base); 685 ch->remote_msgqueue = NULL; 686 kfree(ch->notify_queue); 687 ch->notify_queue = NULL; 688 } 689 } 690 691 /* 692 * spin_lock_irqsave() is expected to be held on entry. 693 */ 694 static void 695 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags) 696 { 697 struct xpc_partition *part = &xpc_partitions[ch->partid]; 698 u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED); 699 700 DBUG_ON(!spin_is_locked(&ch->lock)); 701 702 if (!(ch->flags & XPC_C_DISCONNECTING)) 703 return; 704 705 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST)); 706 707 /* make sure all activity has settled down first */ 708 709 if (atomic_read(&ch->kthreads_assigned) > 0 || 710 atomic_read(&ch->references) > 0) { 711 return; 712 } 713 DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) && 714 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE)); 715 716 if (part->act_state == XPC_P_DEACTIVATING) { 717 /* can't proceed until the other side disengages from us */ 718 if (xpc_partition_engaged(1UL << ch->partid)) 719 return; 720 721 } else { 722 723 /* as long as the other side is up do the full protocol */ 724 725 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) 726 return; 727 728 if (!(ch->flags & XPC_C_CLOSEREPLY)) { 729 ch->flags |= XPC_C_CLOSEREPLY; 730 xpc_IPI_send_closereply(ch, irq_flags); 731 } 732 733 if (!(ch->flags & XPC_C_RCLOSEREPLY)) 734 return; 735 } 736 737 /* wake those waiting for notify completion */ 738 if (atomic_read(&ch->n_to_notify) > 0) { 739 /* >>> we do callout while holding ch->lock */ 740 xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put); 741 } 742 743 /* both sides are disconnected now */ 744 745 if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) { 746 spin_unlock_irqrestore(&ch->lock, *irq_flags); 747 xpc_disconnect_callout(ch, xpDisconnected); 748 spin_lock_irqsave(&ch->lock, *irq_flags); 749 } 750 751 /* it's now safe to free the channel's message queues */ 752 xpc_free_msgqueues(ch); 753 754 /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */ 755 ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT)); 756 757 atomic_dec(&part->nchannels_active); 758 759 if (channel_was_connected) { 760 dev_info(xpc_chan, "channel %d to partition %d disconnected, " 761 "reason=%d\n", ch->number, ch->partid, ch->reason); 762 } 763 764 if (ch->flags & XPC_C_WDISCONNECT) { 765 /* we won't lose the CPU since we're holding ch->lock */ 766 complete(&ch->wdisconnect_wait); 767 } else if (ch->delayed_IPI_flags) { 768 if (part->act_state != XPC_P_DEACTIVATING) { 769 /* time to take action on any delayed IPI flags */ 770 spin_lock(&part->IPI_lock); 771 XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number, 772 ch->delayed_IPI_flags); 773 spin_unlock(&part->IPI_lock); 774 } 775 ch->delayed_IPI_flags = 0; 776 } 777 } 778 779 /* 780 * Process a change in the channel's remote connection state. 781 */ 782 static void 783 xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, 784 u8 IPI_flags) 785 { 786 unsigned long irq_flags; 787 struct xpc_openclose_args *args = 788 &part->remote_openclose_args[ch_number]; 789 struct xpc_channel *ch = &part->channels[ch_number]; 790 enum xp_retval reason; 791 792 spin_lock_irqsave(&ch->lock, irq_flags); 793 794 again: 795 796 if ((ch->flags & XPC_C_DISCONNECTED) && 797 (ch->flags & XPC_C_WDISCONNECT)) { 798 /* 799 * Delay processing IPI flags until thread waiting disconnect 800 * has had a chance to see that the channel is disconnected. 801 */ 802 ch->delayed_IPI_flags |= IPI_flags; 803 spin_unlock_irqrestore(&ch->lock, irq_flags); 804 return; 805 } 806 807 if (IPI_flags & XPC_IPI_CLOSEREQUEST) { 808 809 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREQUEST (reason=%d) received " 810 "from partid=%d, channel=%d\n", args->reason, 811 ch->partid, ch->number); 812 813 /* 814 * If RCLOSEREQUEST is set, we're probably waiting for 815 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed 816 * with this RCLOSEREQUEST in the IPI_flags. 817 */ 818 819 if (ch->flags & XPC_C_RCLOSEREQUEST) { 820 DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING)); 821 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST)); 822 DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY)); 823 DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY); 824 825 DBUG_ON(!(IPI_flags & XPC_IPI_CLOSEREPLY)); 826 IPI_flags &= ~XPC_IPI_CLOSEREPLY; 827 ch->flags |= XPC_C_RCLOSEREPLY; 828 829 /* both sides have finished disconnecting */ 830 xpc_process_disconnect(ch, &irq_flags); 831 DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED)); 832 goto again; 833 } 834 835 if (ch->flags & XPC_C_DISCONNECTED) { 836 if (!(IPI_flags & XPC_IPI_OPENREQUEST)) { 837 if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, 838 ch_number) & 839 XPC_IPI_OPENREQUEST)) { 840 841 DBUG_ON(ch->delayed_IPI_flags != 0); 842 spin_lock(&part->IPI_lock); 843 XPC_SET_IPI_FLAGS(part->local_IPI_amo, 844 ch_number, 845 XPC_IPI_CLOSEREQUEST); 846 spin_unlock(&part->IPI_lock); 847 } 848 spin_unlock_irqrestore(&ch->lock, irq_flags); 849 return; 850 } 851 852 XPC_SET_REASON(ch, 0, 0); 853 ch->flags &= ~XPC_C_DISCONNECTED; 854 855 atomic_inc(&part->nchannels_active); 856 ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST); 857 } 858 859 IPI_flags &= ~(XPC_IPI_OPENREQUEST | XPC_IPI_OPENREPLY); 860 861 /* 862 * The meaningful CLOSEREQUEST connection state fields are: 863 * reason = reason connection is to be closed 864 */ 865 866 ch->flags |= XPC_C_RCLOSEREQUEST; 867 868 if (!(ch->flags & XPC_C_DISCONNECTING)) { 869 reason = args->reason; 870 if (reason <= xpSuccess || reason > xpUnknownReason) 871 reason = xpUnknownReason; 872 else if (reason == xpUnregistering) 873 reason = xpOtherUnregistering; 874 875 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags); 876 877 DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY); 878 spin_unlock_irqrestore(&ch->lock, irq_flags); 879 return; 880 } 881 882 xpc_process_disconnect(ch, &irq_flags); 883 } 884 885 if (IPI_flags & XPC_IPI_CLOSEREPLY) { 886 887 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREPLY received from partid=%d," 888 " channel=%d\n", ch->partid, ch->number); 889 890 if (ch->flags & XPC_C_DISCONNECTED) { 891 DBUG_ON(part->act_state != XPC_P_DEACTIVATING); 892 spin_unlock_irqrestore(&ch->lock, irq_flags); 893 return; 894 } 895 896 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST)); 897 898 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) { 899 if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number) 900 & XPC_IPI_CLOSEREQUEST)) { 901 902 DBUG_ON(ch->delayed_IPI_flags != 0); 903 spin_lock(&part->IPI_lock); 904 XPC_SET_IPI_FLAGS(part->local_IPI_amo, 905 ch_number, 906 XPC_IPI_CLOSEREPLY); 907 spin_unlock(&part->IPI_lock); 908 } 909 spin_unlock_irqrestore(&ch->lock, irq_flags); 910 return; 911 } 912 913 ch->flags |= XPC_C_RCLOSEREPLY; 914 915 if (ch->flags & XPC_C_CLOSEREPLY) { 916 /* both sides have finished disconnecting */ 917 xpc_process_disconnect(ch, &irq_flags); 918 } 919 } 920 921 if (IPI_flags & XPC_IPI_OPENREQUEST) { 922 923 dev_dbg(xpc_chan, "XPC_IPI_OPENREQUEST (msg_size=%d, " 924 "local_nentries=%d) received from partid=%d, " 925 "channel=%d\n", args->msg_size, args->local_nentries, 926 ch->partid, ch->number); 927 928 if (part->act_state == XPC_P_DEACTIVATING || 929 (ch->flags & XPC_C_ROPENREQUEST)) { 930 spin_unlock_irqrestore(&ch->lock, irq_flags); 931 return; 932 } 933 934 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) { 935 ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST; 936 spin_unlock_irqrestore(&ch->lock, irq_flags); 937 return; 938 } 939 DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED | 940 XPC_C_OPENREQUEST))); 941 DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY | 942 XPC_C_OPENREPLY | XPC_C_CONNECTED)); 943 944 /* 945 * The meaningful OPENREQUEST connection state fields are: 946 * msg_size = size of channel's messages in bytes 947 * local_nentries = remote partition's local_nentries 948 */ 949 if (args->msg_size == 0 || args->local_nentries == 0) { 950 /* assume OPENREQUEST was delayed by mistake */ 951 spin_unlock_irqrestore(&ch->lock, irq_flags); 952 return; 953 } 954 955 ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING); 956 ch->remote_nentries = args->local_nentries; 957 958 if (ch->flags & XPC_C_OPENREQUEST) { 959 if (args->msg_size != ch->msg_size) { 960 XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes, 961 &irq_flags); 962 spin_unlock_irqrestore(&ch->lock, irq_flags); 963 return; 964 } 965 } else { 966 ch->msg_size = args->msg_size; 967 968 XPC_SET_REASON(ch, 0, 0); 969 ch->flags &= ~XPC_C_DISCONNECTED; 970 971 atomic_inc(&part->nchannels_active); 972 } 973 974 xpc_process_connect(ch, &irq_flags); 975 } 976 977 if (IPI_flags & XPC_IPI_OPENREPLY) { 978 979 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, " 980 "local_nentries=%d, remote_nentries=%d) received from " 981 "partid=%d, channel=%d\n", args->local_msgqueue_pa, 982 args->local_nentries, args->remote_nentries, 983 ch->partid, ch->number); 984 985 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) { 986 spin_unlock_irqrestore(&ch->lock, irq_flags); 987 return; 988 } 989 if (!(ch->flags & XPC_C_OPENREQUEST)) { 990 XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError, 991 &irq_flags); 992 spin_unlock_irqrestore(&ch->lock, irq_flags); 993 return; 994 } 995 996 DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST)); 997 DBUG_ON(ch->flags & XPC_C_CONNECTED); 998 999 /* 1000 * The meaningful OPENREPLY connection state fields are: 1001 * local_msgqueue_pa = physical address of remote 1002 * partition's local_msgqueue 1003 * local_nentries = remote partition's local_nentries 1004 * remote_nentries = remote partition's remote_nentries 1005 */ 1006 DBUG_ON(args->local_msgqueue_pa == 0); 1007 DBUG_ON(args->local_nentries == 0); 1008 DBUG_ON(args->remote_nentries == 0); 1009 1010 ch->flags |= XPC_C_ROPENREPLY; 1011 ch->remote_msgqueue_pa = args->local_msgqueue_pa; 1012 1013 if (args->local_nentries < ch->remote_nentries) { 1014 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new " 1015 "remote_nentries=%d, old remote_nentries=%d, " 1016 "partid=%d, channel=%d\n", 1017 args->local_nentries, ch->remote_nentries, 1018 ch->partid, ch->number); 1019 1020 ch->remote_nentries = args->local_nentries; 1021 } 1022 if (args->remote_nentries < ch->local_nentries) { 1023 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new " 1024 "local_nentries=%d, old local_nentries=%d, " 1025 "partid=%d, channel=%d\n", 1026 args->remote_nentries, ch->local_nentries, 1027 ch->partid, ch->number); 1028 1029 ch->local_nentries = args->remote_nentries; 1030 } 1031 1032 xpc_process_connect(ch, &irq_flags); 1033 } 1034 1035 spin_unlock_irqrestore(&ch->lock, irq_flags); 1036 } 1037 1038 /* 1039 * Attempt to establish a channel connection to a remote partition. 1040 */ 1041 static enum xp_retval 1042 xpc_connect_channel(struct xpc_channel *ch) 1043 { 1044 unsigned long irq_flags; 1045 struct xpc_registration *registration = &xpc_registrations[ch->number]; 1046 1047 if (mutex_trylock(®istration->mutex) == 0) 1048 return xpRetry; 1049 1050 if (!XPC_CHANNEL_REGISTERED(ch->number)) { 1051 mutex_unlock(®istration->mutex); 1052 return xpUnregistered; 1053 } 1054 1055 spin_lock_irqsave(&ch->lock, irq_flags); 1056 1057 DBUG_ON(ch->flags & XPC_C_CONNECTED); 1058 DBUG_ON(ch->flags & XPC_C_OPENREQUEST); 1059 1060 if (ch->flags & XPC_C_DISCONNECTING) { 1061 spin_unlock_irqrestore(&ch->lock, irq_flags); 1062 mutex_unlock(®istration->mutex); 1063 return ch->reason; 1064 } 1065 1066 /* add info from the channel connect registration to the channel */ 1067 1068 ch->kthreads_assigned_limit = registration->assigned_limit; 1069 ch->kthreads_idle_limit = registration->idle_limit; 1070 DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0); 1071 DBUG_ON(atomic_read(&ch->kthreads_idle) != 0); 1072 DBUG_ON(atomic_read(&ch->kthreads_active) != 0); 1073 1074 ch->func = registration->func; 1075 DBUG_ON(registration->func == NULL); 1076 ch->key = registration->key; 1077 1078 ch->local_nentries = registration->nentries; 1079 1080 if (ch->flags & XPC_C_ROPENREQUEST) { 1081 if (registration->msg_size != ch->msg_size) { 1082 /* the local and remote sides aren't the same */ 1083 1084 /* 1085 * Because XPC_DISCONNECT_CHANNEL() can block we're 1086 * forced to up the registration sema before we unlock 1087 * the channel lock. But that's okay here because we're 1088 * done with the part that required the registration 1089 * sema. XPC_DISCONNECT_CHANNEL() requires that the 1090 * channel lock be locked and will unlock and relock 1091 * the channel lock as needed. 1092 */ 1093 mutex_unlock(®istration->mutex); 1094 XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes, 1095 &irq_flags); 1096 spin_unlock_irqrestore(&ch->lock, irq_flags); 1097 return xpUnequalMsgSizes; 1098 } 1099 } else { 1100 ch->msg_size = registration->msg_size; 1101 1102 XPC_SET_REASON(ch, 0, 0); 1103 ch->flags &= ~XPC_C_DISCONNECTED; 1104 1105 atomic_inc(&xpc_partitions[ch->partid].nchannels_active); 1106 } 1107 1108 mutex_unlock(®istration->mutex); 1109 1110 /* initiate the connection */ 1111 1112 ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING); 1113 xpc_IPI_send_openrequest(ch, &irq_flags); 1114 1115 xpc_process_connect(ch, &irq_flags); 1116 1117 spin_unlock_irqrestore(&ch->lock, irq_flags); 1118 1119 return xpSuccess; 1120 } 1121 1122 /* 1123 * Clear some of the msg flags in the local message queue. 1124 */ 1125 static inline void 1126 xpc_clear_local_msgqueue_flags(struct xpc_channel *ch) 1127 { 1128 struct xpc_msg *msg; 1129 s64 get; 1130 1131 get = ch->w_remote_GP.get; 1132 do { 1133 msg = (struct xpc_msg *)((u64)ch->local_msgqueue + 1134 (get % ch->local_nentries) * 1135 ch->msg_size); 1136 msg->flags = 0; 1137 } while (++get < ch->remote_GP.get); 1138 } 1139 1140 /* 1141 * Clear some of the msg flags in the remote message queue. 1142 */ 1143 static inline void 1144 xpc_clear_remote_msgqueue_flags(struct xpc_channel *ch) 1145 { 1146 struct xpc_msg *msg; 1147 s64 put; 1148 1149 put = ch->w_remote_GP.put; 1150 do { 1151 msg = (struct xpc_msg *)((u64)ch->remote_msgqueue + 1152 (put % ch->remote_nentries) * 1153 ch->msg_size); 1154 msg->flags = 0; 1155 } while (++put < ch->remote_GP.put); 1156 } 1157 1158 static void 1159 xpc_process_msg_IPI(struct xpc_partition *part, int ch_number) 1160 { 1161 struct xpc_channel *ch = &part->channels[ch_number]; 1162 int nmsgs_sent; 1163 1164 ch->remote_GP = part->remote_GPs[ch_number]; 1165 1166 /* See what, if anything, has changed for each connected channel */ 1167 1168 xpc_msgqueue_ref(ch); 1169 1170 if (ch->w_remote_GP.get == ch->remote_GP.get && 1171 ch->w_remote_GP.put == ch->remote_GP.put) { 1172 /* nothing changed since GPs were last pulled */ 1173 xpc_msgqueue_deref(ch); 1174 return; 1175 } 1176 1177 if (!(ch->flags & XPC_C_CONNECTED)) { 1178 xpc_msgqueue_deref(ch); 1179 return; 1180 } 1181 1182 /* 1183 * First check to see if messages recently sent by us have been 1184 * received by the other side. (The remote GET value will have 1185 * changed since we last looked at it.) 1186 */ 1187 1188 if (ch->w_remote_GP.get != ch->remote_GP.get) { 1189 1190 /* 1191 * We need to notify any senders that want to be notified 1192 * that their sent messages have been received by their 1193 * intended recipients. We need to do this before updating 1194 * w_remote_GP.get so that we don't allocate the same message 1195 * queue entries prematurely (see xpc_allocate_msg()). 1196 */ 1197 if (atomic_read(&ch->n_to_notify) > 0) { 1198 /* 1199 * Notify senders that messages sent have been 1200 * received and delivered by the other side. 1201 */ 1202 xpc_notify_senders(ch, xpMsgDelivered, 1203 ch->remote_GP.get); 1204 } 1205 1206 /* 1207 * Clear msg->flags in previously sent messages, so that 1208 * they're ready for xpc_allocate_msg(). 1209 */ 1210 xpc_clear_local_msgqueue_flags(ch); 1211 1212 ch->w_remote_GP.get = ch->remote_GP.get; 1213 1214 dev_dbg(xpc_chan, "w_remote_GP.get changed to %ld, partid=%d, " 1215 "channel=%d\n", ch->w_remote_GP.get, ch->partid, 1216 ch->number); 1217 1218 /* 1219 * If anyone was waiting for message queue entries to become 1220 * available, wake them up. 1221 */ 1222 if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) 1223 wake_up(&ch->msg_allocate_wq); 1224 } 1225 1226 /* 1227 * Now check for newly sent messages by the other side. (The remote 1228 * PUT value will have changed since we last looked at it.) 1229 */ 1230 1231 if (ch->w_remote_GP.put != ch->remote_GP.put) { 1232 /* 1233 * Clear msg->flags in previously received messages, so that 1234 * they're ready for xpc_get_deliverable_msg(). 1235 */ 1236 xpc_clear_remote_msgqueue_flags(ch); 1237 1238 ch->w_remote_GP.put = ch->remote_GP.put; 1239 1240 dev_dbg(xpc_chan, "w_remote_GP.put changed to %ld, partid=%d, " 1241 "channel=%d\n", ch->w_remote_GP.put, ch->partid, 1242 ch->number); 1243 1244 nmsgs_sent = ch->w_remote_GP.put - ch->w_local_GP.get; 1245 if (nmsgs_sent > 0) { 1246 dev_dbg(xpc_chan, "msgs waiting to be copied and " 1247 "delivered=%d, partid=%d, channel=%d\n", 1248 nmsgs_sent, ch->partid, ch->number); 1249 1250 if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) 1251 xpc_activate_kthreads(ch, nmsgs_sent); 1252 } 1253 } 1254 1255 xpc_msgqueue_deref(ch); 1256 } 1257 1258 void 1259 xpc_process_channel_activity(struct xpc_partition *part) 1260 { 1261 unsigned long irq_flags; 1262 u64 IPI_amo, IPI_flags; 1263 struct xpc_channel *ch; 1264 int ch_number; 1265 u32 ch_flags; 1266 1267 IPI_amo = xpc_get_IPI_flags(part); 1268 1269 /* 1270 * Initiate channel connections for registered channels. 1271 * 1272 * For each connected channel that has pending messages activate idle 1273 * kthreads and/or create new kthreads as needed. 1274 */ 1275 1276 for (ch_number = 0; ch_number < part->nchannels; ch_number++) { 1277 ch = &part->channels[ch_number]; 1278 1279 /* 1280 * Process any open or close related IPI flags, and then deal 1281 * with connecting or disconnecting the channel as required. 1282 */ 1283 1284 IPI_flags = XPC_GET_IPI_FLAGS(IPI_amo, ch_number); 1285 1286 if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags)) 1287 xpc_process_openclose_IPI(part, ch_number, IPI_flags); 1288 1289 ch_flags = ch->flags; /* need an atomic snapshot of flags */ 1290 1291 if (ch_flags & XPC_C_DISCONNECTING) { 1292 spin_lock_irqsave(&ch->lock, irq_flags); 1293 xpc_process_disconnect(ch, &irq_flags); 1294 spin_unlock_irqrestore(&ch->lock, irq_flags); 1295 continue; 1296 } 1297 1298 if (part->act_state == XPC_P_DEACTIVATING) 1299 continue; 1300 1301 if (!(ch_flags & XPC_C_CONNECTED)) { 1302 if (!(ch_flags & XPC_C_OPENREQUEST)) { 1303 DBUG_ON(ch_flags & XPC_C_SETUP); 1304 (void)xpc_connect_channel(ch); 1305 } else { 1306 spin_lock_irqsave(&ch->lock, irq_flags); 1307 xpc_process_connect(ch, &irq_flags); 1308 spin_unlock_irqrestore(&ch->lock, irq_flags); 1309 } 1310 continue; 1311 } 1312 1313 /* 1314 * Process any message related IPI flags, this may involve the 1315 * activation of kthreads to deliver any pending messages sent 1316 * from the other partition. 1317 */ 1318 1319 if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags)) 1320 xpc_process_msg_IPI(part, ch_number); 1321 } 1322 } 1323 1324 /* 1325 * XPC's heartbeat code calls this function to inform XPC that a partition is 1326 * going down. XPC responds by tearing down the XPartition Communication 1327 * infrastructure used for the just downed partition. 1328 * 1329 * XPC's heartbeat code will never call this function and xpc_partition_up() 1330 * at the same time. Nor will it ever make multiple calls to either function 1331 * at the same time. 1332 */ 1333 void 1334 xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason) 1335 { 1336 unsigned long irq_flags; 1337 int ch_number; 1338 struct xpc_channel *ch; 1339 1340 dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n", 1341 XPC_PARTID(part), reason); 1342 1343 if (!xpc_part_ref(part)) { 1344 /* infrastructure for this partition isn't currently set up */ 1345 return; 1346 } 1347 1348 /* disconnect channels associated with the partition going down */ 1349 1350 for (ch_number = 0; ch_number < part->nchannels; ch_number++) { 1351 ch = &part->channels[ch_number]; 1352 1353 xpc_msgqueue_ref(ch); 1354 spin_lock_irqsave(&ch->lock, irq_flags); 1355 1356 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags); 1357 1358 spin_unlock_irqrestore(&ch->lock, irq_flags); 1359 xpc_msgqueue_deref(ch); 1360 } 1361 1362 xpc_wakeup_channel_mgr(part); 1363 1364 xpc_part_deref(part); 1365 } 1366 1367 /* 1368 * Teardown the infrastructure necessary to support XPartition Communication 1369 * between the specified remote partition and the local one. 1370 */ 1371 void 1372 xpc_teardown_infrastructure(struct xpc_partition *part) 1373 { 1374 short partid = XPC_PARTID(part); 1375 1376 /* 1377 * We start off by making this partition inaccessible to local 1378 * processes by marking it as no longer setup. Then we make it 1379 * inaccessible to remote processes by clearing the XPC per partition 1380 * specific variable's magic # (which indicates that these variables 1381 * are no longer valid) and by ignoring all XPC notify IPIs sent to 1382 * this partition. 1383 */ 1384 1385 DBUG_ON(atomic_read(&part->nchannels_engaged) != 0); 1386 DBUG_ON(atomic_read(&part->nchannels_active) != 0); 1387 DBUG_ON(part->setup_state != XPC_P_SETUP); 1388 part->setup_state = XPC_P_WTEARDOWN; 1389 1390 xpc_vars_part[partid].magic = 0; 1391 1392 free_irq(SGI_XPC_NOTIFY, (void *)(u64)partid); 1393 1394 /* 1395 * Before proceeding with the teardown we have to wait until all 1396 * existing references cease. 1397 */ 1398 wait_event(part->teardown_wq, (atomic_read(&part->references) == 0)); 1399 1400 /* now we can begin tearing down the infrastructure */ 1401 1402 part->setup_state = XPC_P_TORNDOWN; 1403 1404 /* in case we've still got outstanding timers registered... */ 1405 del_timer_sync(&part->dropped_IPI_timer); 1406 1407 kfree(part->remote_openclose_args_base); 1408 part->remote_openclose_args = NULL; 1409 kfree(part->local_openclose_args_base); 1410 part->local_openclose_args = NULL; 1411 kfree(part->remote_GPs_base); 1412 part->remote_GPs = NULL; 1413 kfree(part->local_GPs_base); 1414 part->local_GPs = NULL; 1415 kfree(part->channels); 1416 part->channels = NULL; 1417 part->local_IPI_amo_va = NULL; 1418 } 1419 1420 /* 1421 * Called by XP at the time of channel connection registration to cause 1422 * XPC to establish connections to all currently active partitions. 1423 */ 1424 void 1425 xpc_initiate_connect(int ch_number) 1426 { 1427 short partid; 1428 struct xpc_partition *part; 1429 struct xpc_channel *ch; 1430 1431 DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS); 1432 1433 for (partid = 0; partid < xp_max_npartitions; partid++) { 1434 part = &xpc_partitions[partid]; 1435 1436 if (xpc_part_ref(part)) { 1437 ch = &part->channels[ch_number]; 1438 1439 /* 1440 * Initiate the establishment of a connection on the 1441 * newly registered channel to the remote partition. 1442 */ 1443 xpc_wakeup_channel_mgr(part); 1444 xpc_part_deref(part); 1445 } 1446 } 1447 } 1448 1449 void 1450 xpc_connected_callout(struct xpc_channel *ch) 1451 { 1452 /* let the registerer know that a connection has been established */ 1453 1454 if (ch->func != NULL) { 1455 dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, " 1456 "partid=%d, channel=%d\n", ch->partid, ch->number); 1457 1458 ch->func(xpConnected, ch->partid, ch->number, 1459 (void *)(u64)ch->local_nentries, ch->key); 1460 1461 dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, " 1462 "partid=%d, channel=%d\n", ch->partid, ch->number); 1463 } 1464 } 1465 1466 /* 1467 * Called by XP at the time of channel connection unregistration to cause 1468 * XPC to teardown all current connections for the specified channel. 1469 * 1470 * Before returning xpc_initiate_disconnect() will wait until all connections 1471 * on the specified channel have been closed/torndown. So the caller can be 1472 * assured that they will not be receiving any more callouts from XPC to the 1473 * function they registered via xpc_connect(). 1474 * 1475 * Arguments: 1476 * 1477 * ch_number - channel # to unregister. 1478 */ 1479 void 1480 xpc_initiate_disconnect(int ch_number) 1481 { 1482 unsigned long irq_flags; 1483 short partid; 1484 struct xpc_partition *part; 1485 struct xpc_channel *ch; 1486 1487 DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS); 1488 1489 /* initiate the channel disconnect for every active partition */ 1490 for (partid = 0; partid < xp_max_npartitions; partid++) { 1491 part = &xpc_partitions[partid]; 1492 1493 if (xpc_part_ref(part)) { 1494 ch = &part->channels[ch_number]; 1495 xpc_msgqueue_ref(ch); 1496 1497 spin_lock_irqsave(&ch->lock, irq_flags); 1498 1499 if (!(ch->flags & XPC_C_DISCONNECTED)) { 1500 ch->flags |= XPC_C_WDISCONNECT; 1501 1502 XPC_DISCONNECT_CHANNEL(ch, xpUnregistering, 1503 &irq_flags); 1504 } 1505 1506 spin_unlock_irqrestore(&ch->lock, irq_flags); 1507 1508 xpc_msgqueue_deref(ch); 1509 xpc_part_deref(part); 1510 } 1511 } 1512 1513 xpc_disconnect_wait(ch_number); 1514 } 1515 1516 /* 1517 * To disconnect a channel, and reflect it back to all who may be waiting. 1518 * 1519 * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by 1520 * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by 1521 * xpc_disconnect_wait(). 1522 * 1523 * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN. 1524 */ 1525 void 1526 xpc_disconnect_channel(const int line, struct xpc_channel *ch, 1527 enum xp_retval reason, unsigned long *irq_flags) 1528 { 1529 u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED); 1530 1531 DBUG_ON(!spin_is_locked(&ch->lock)); 1532 1533 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) 1534 return; 1535 1536 DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED))); 1537 1538 dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n", 1539 reason, line, ch->partid, ch->number); 1540 1541 XPC_SET_REASON(ch, reason, line); 1542 1543 ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING); 1544 /* some of these may not have been set */ 1545 ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY | 1546 XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY | 1547 XPC_C_CONNECTING | XPC_C_CONNECTED); 1548 1549 xpc_IPI_send_closerequest(ch, irq_flags); 1550 1551 if (channel_was_connected) 1552 ch->flags |= XPC_C_WASCONNECTED; 1553 1554 spin_unlock_irqrestore(&ch->lock, *irq_flags); 1555 1556 /* wake all idle kthreads so they can exit */ 1557 if (atomic_read(&ch->kthreads_idle) > 0) { 1558 wake_up_all(&ch->idle_wq); 1559 1560 } else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) && 1561 !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) { 1562 /* start a kthread that will do the xpDisconnecting callout */ 1563 xpc_create_kthreads(ch, 1, 1); 1564 } 1565 1566 /* wake those waiting to allocate an entry from the local msg queue */ 1567 if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) 1568 wake_up(&ch->msg_allocate_wq); 1569 1570 spin_lock_irqsave(&ch->lock, *irq_flags); 1571 } 1572 1573 void 1574 xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason) 1575 { 1576 /* 1577 * Let the channel's registerer know that the channel is being 1578 * disconnected. We don't want to do this if the registerer was never 1579 * informed of a connection being made. 1580 */ 1581 1582 if (ch->func != NULL) { 1583 dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, " 1584 "channel=%d\n", reason, ch->partid, ch->number); 1585 1586 ch->func(reason, ch->partid, ch->number, NULL, ch->key); 1587 1588 dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, " 1589 "channel=%d\n", reason, ch->partid, ch->number); 1590 } 1591 } 1592 1593 /* 1594 * Wait for a message entry to become available for the specified channel, 1595 * but don't wait any longer than 1 jiffy. 1596 */ 1597 static enum xp_retval 1598 xpc_allocate_msg_wait(struct xpc_channel *ch) 1599 { 1600 enum xp_retval ret; 1601 1602 if (ch->flags & XPC_C_DISCONNECTING) { 1603 DBUG_ON(ch->reason == xpInterrupted); 1604 return ch->reason; 1605 } 1606 1607 atomic_inc(&ch->n_on_msg_allocate_wq); 1608 ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1); 1609 atomic_dec(&ch->n_on_msg_allocate_wq); 1610 1611 if (ch->flags & XPC_C_DISCONNECTING) { 1612 ret = ch->reason; 1613 DBUG_ON(ch->reason == xpInterrupted); 1614 } else if (ret == 0) { 1615 ret = xpTimeout; 1616 } else { 1617 ret = xpInterrupted; 1618 } 1619 1620 return ret; 1621 } 1622 1623 /* 1624 * Allocate an entry for a message from the message queue associated with the 1625 * specified channel. 1626 */ 1627 static enum xp_retval 1628 xpc_allocate_msg(struct xpc_channel *ch, u32 flags, 1629 struct xpc_msg **address_of_msg) 1630 { 1631 struct xpc_msg *msg; 1632 enum xp_retval ret; 1633 s64 put; 1634 1635 /* this reference will be dropped in xpc_send_msg() */ 1636 xpc_msgqueue_ref(ch); 1637 1638 if (ch->flags & XPC_C_DISCONNECTING) { 1639 xpc_msgqueue_deref(ch); 1640 return ch->reason; 1641 } 1642 if (!(ch->flags & XPC_C_CONNECTED)) { 1643 xpc_msgqueue_deref(ch); 1644 return xpNotConnected; 1645 } 1646 1647 /* 1648 * Get the next available message entry from the local message queue. 1649 * If none are available, we'll make sure that we grab the latest 1650 * GP values. 1651 */ 1652 ret = xpTimeout; 1653 1654 while (1) { 1655 1656 put = ch->w_local_GP.put; 1657 rmb(); /* guarantee that .put loads before .get */ 1658 if (put - ch->w_remote_GP.get < ch->local_nentries) { 1659 1660 /* There are available message entries. We need to try 1661 * to secure one for ourselves. We'll do this by trying 1662 * to increment w_local_GP.put as long as someone else 1663 * doesn't beat us to it. If they do, we'll have to 1664 * try again. 1665 */ 1666 if (cmpxchg(&ch->w_local_GP.put, put, put + 1) == put) { 1667 /* we got the entry referenced by put */ 1668 break; 1669 } 1670 continue; /* try again */ 1671 } 1672 1673 /* 1674 * There aren't any available msg entries at this time. 1675 * 1676 * In waiting for a message entry to become available, 1677 * we set a timeout in case the other side is not 1678 * sending completion IPIs. This lets us fake an IPI 1679 * that will cause the IPI handler to fetch the latest 1680 * GP values as if an IPI was sent by the other side. 1681 */ 1682 if (ret == xpTimeout) 1683 xpc_IPI_send_local_msgrequest(ch); 1684 1685 if (flags & XPC_NOWAIT) { 1686 xpc_msgqueue_deref(ch); 1687 return xpNoWait; 1688 } 1689 1690 ret = xpc_allocate_msg_wait(ch); 1691 if (ret != xpInterrupted && ret != xpTimeout) { 1692 xpc_msgqueue_deref(ch); 1693 return ret; 1694 } 1695 } 1696 1697 /* get the message's address and initialize it */ 1698 msg = (struct xpc_msg *)((u64)ch->local_msgqueue + 1699 (put % ch->local_nentries) * ch->msg_size); 1700 1701 DBUG_ON(msg->flags != 0); 1702 msg->number = put; 1703 1704 dev_dbg(xpc_chan, "w_local_GP.put changed to %ld; msg=0x%p, " 1705 "msg_number=%ld, partid=%d, channel=%d\n", put + 1, 1706 (void *)msg, msg->number, ch->partid, ch->number); 1707 1708 *address_of_msg = msg; 1709 1710 return xpSuccess; 1711 } 1712 1713 /* 1714 * Allocate an entry for a message from the message queue associated with the 1715 * specified channel. NOTE that this routine can sleep waiting for a message 1716 * entry to become available. To not sleep, pass in the XPC_NOWAIT flag. 1717 * 1718 * Arguments: 1719 * 1720 * partid - ID of partition to which the channel is connected. 1721 * ch_number - channel #. 1722 * flags - see xpc.h for valid flags. 1723 * payload - address of the allocated payload area pointer (filled in on 1724 * return) in which the user-defined message is constructed. 1725 */ 1726 enum xp_retval 1727 xpc_initiate_allocate(short partid, int ch_number, u32 flags, void **payload) 1728 { 1729 struct xpc_partition *part = &xpc_partitions[partid]; 1730 enum xp_retval ret = xpUnknownReason; 1731 struct xpc_msg *msg = NULL; 1732 1733 DBUG_ON(partid < 0 || partid >= xp_max_npartitions); 1734 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); 1735 1736 *payload = NULL; 1737 1738 if (xpc_part_ref(part)) { 1739 ret = xpc_allocate_msg(&part->channels[ch_number], flags, &msg); 1740 xpc_part_deref(part); 1741 1742 if (msg != NULL) 1743 *payload = &msg->payload; 1744 } 1745 1746 return ret; 1747 } 1748 1749 /* 1750 * Now we actually send the messages that are ready to be sent by advancing 1751 * the local message queue's Put value and then send an IPI to the recipient 1752 * partition. 1753 */ 1754 static void 1755 xpc_send_msgs(struct xpc_channel *ch, s64 initial_put) 1756 { 1757 struct xpc_msg *msg; 1758 s64 put = initial_put + 1; 1759 int send_IPI = 0; 1760 1761 while (1) { 1762 1763 while (1) { 1764 if (put == ch->w_local_GP.put) 1765 break; 1766 1767 msg = (struct xpc_msg *)((u64)ch->local_msgqueue + 1768 (put % ch->local_nentries) * 1769 ch->msg_size); 1770 1771 if (!(msg->flags & XPC_M_READY)) 1772 break; 1773 1774 put++; 1775 } 1776 1777 if (put == initial_put) { 1778 /* nothing's changed */ 1779 break; 1780 } 1781 1782 if (cmpxchg_rel(&ch->local_GP->put, initial_put, put) != 1783 initial_put) { 1784 /* someone else beat us to it */ 1785 DBUG_ON(ch->local_GP->put < initial_put); 1786 break; 1787 } 1788 1789 /* we just set the new value of local_GP->put */ 1790 1791 dev_dbg(xpc_chan, "local_GP->put changed to %ld, partid=%d, " 1792 "channel=%d\n", put, ch->partid, ch->number); 1793 1794 send_IPI = 1; 1795 1796 /* 1797 * We need to ensure that the message referenced by 1798 * local_GP->put is not XPC_M_READY or that local_GP->put 1799 * equals w_local_GP.put, so we'll go have a look. 1800 */ 1801 initial_put = put; 1802 } 1803 1804 if (send_IPI) 1805 xpc_IPI_send_msgrequest(ch); 1806 } 1807 1808 /* 1809 * Common code that does the actual sending of the message by advancing the 1810 * local message queue's Put value and sends an IPI to the partition the 1811 * message is being sent to. 1812 */ 1813 static enum xp_retval 1814 xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type, 1815 xpc_notify_func func, void *key) 1816 { 1817 enum xp_retval ret = xpSuccess; 1818 struct xpc_notify *notify = notify; 1819 s64 put, msg_number = msg->number; 1820 1821 DBUG_ON(notify_type == XPC_N_CALL && func == NULL); 1822 DBUG_ON((((u64)msg - (u64)ch->local_msgqueue) / ch->msg_size) != 1823 msg_number % ch->local_nentries); 1824 DBUG_ON(msg->flags & XPC_M_READY); 1825 1826 if (ch->flags & XPC_C_DISCONNECTING) { 1827 /* drop the reference grabbed in xpc_allocate_msg() */ 1828 xpc_msgqueue_deref(ch); 1829 return ch->reason; 1830 } 1831 1832 if (notify_type != 0) { 1833 /* 1834 * Tell the remote side to send an ACK interrupt when the 1835 * message has been delivered. 1836 */ 1837 msg->flags |= XPC_M_INTERRUPT; 1838 1839 atomic_inc(&ch->n_to_notify); 1840 1841 notify = &ch->notify_queue[msg_number % ch->local_nentries]; 1842 notify->func = func; 1843 notify->key = key; 1844 notify->type = notify_type; 1845 1846 /* >>> is a mb() needed here? */ 1847 1848 if (ch->flags & XPC_C_DISCONNECTING) { 1849 /* 1850 * An error occurred between our last error check and 1851 * this one. We will try to clear the type field from 1852 * the notify entry. If we succeed then 1853 * xpc_disconnect_channel() didn't already process 1854 * the notify entry. 1855 */ 1856 if (cmpxchg(¬ify->type, notify_type, 0) == 1857 notify_type) { 1858 atomic_dec(&ch->n_to_notify); 1859 ret = ch->reason; 1860 } 1861 1862 /* drop the reference grabbed in xpc_allocate_msg() */ 1863 xpc_msgqueue_deref(ch); 1864 return ret; 1865 } 1866 } 1867 1868 msg->flags |= XPC_M_READY; 1869 1870 /* 1871 * The preceding store of msg->flags must occur before the following 1872 * load of ch->local_GP->put. 1873 */ 1874 mb(); 1875 1876 /* see if the message is next in line to be sent, if so send it */ 1877 1878 put = ch->local_GP->put; 1879 if (put == msg_number) 1880 xpc_send_msgs(ch, put); 1881 1882 /* drop the reference grabbed in xpc_allocate_msg() */ 1883 xpc_msgqueue_deref(ch); 1884 return ret; 1885 } 1886 1887 /* 1888 * Send a message previously allocated using xpc_initiate_allocate() on the 1889 * specified channel connected to the specified partition. 1890 * 1891 * This routine will not wait for the message to be received, nor will 1892 * notification be given when it does happen. Once this routine has returned 1893 * the message entry allocated via xpc_initiate_allocate() is no longer 1894 * accessable to the caller. 1895 * 1896 * This routine, although called by users, does not call xpc_part_ref() to 1897 * ensure that the partition infrastructure is in place. It relies on the 1898 * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg(). 1899 * 1900 * Arguments: 1901 * 1902 * partid - ID of partition to which the channel is connected. 1903 * ch_number - channel # to send message on. 1904 * payload - pointer to the payload area allocated via 1905 * xpc_initiate_allocate(). 1906 */ 1907 enum xp_retval 1908 xpc_initiate_send(short partid, int ch_number, void *payload) 1909 { 1910 struct xpc_partition *part = &xpc_partitions[partid]; 1911 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload); 1912 enum xp_retval ret; 1913 1914 dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg, 1915 partid, ch_number); 1916 1917 DBUG_ON(partid < 0 || partid >= xp_max_npartitions); 1918 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); 1919 DBUG_ON(msg == NULL); 1920 1921 ret = xpc_send_msg(&part->channels[ch_number], msg, 0, NULL, NULL); 1922 1923 return ret; 1924 } 1925 1926 /* 1927 * Send a message previously allocated using xpc_initiate_allocate on the 1928 * specified channel connected to the specified partition. 1929 * 1930 * This routine will not wait for the message to be sent. Once this routine 1931 * has returned the message entry allocated via xpc_initiate_allocate() is no 1932 * longer accessable to the caller. 1933 * 1934 * Once the remote end of the channel has received the message, the function 1935 * passed as an argument to xpc_initiate_send_notify() will be called. This 1936 * allows the sender to free up or re-use any buffers referenced by the 1937 * message, but does NOT mean the message has been processed at the remote 1938 * end by a receiver. 1939 * 1940 * If this routine returns an error, the caller's function will NOT be called. 1941 * 1942 * This routine, although called by users, does not call xpc_part_ref() to 1943 * ensure that the partition infrastructure is in place. It relies on the 1944 * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg(). 1945 * 1946 * Arguments: 1947 * 1948 * partid - ID of partition to which the channel is connected. 1949 * ch_number - channel # to send message on. 1950 * payload - pointer to the payload area allocated via 1951 * xpc_initiate_allocate(). 1952 * func - function to call with asynchronous notification of message 1953 * receipt. THIS FUNCTION MUST BE NON-BLOCKING. 1954 * key - user-defined key to be passed to the function when it's called. 1955 */ 1956 enum xp_retval 1957 xpc_initiate_send_notify(short partid, int ch_number, void *payload, 1958 xpc_notify_func func, void *key) 1959 { 1960 struct xpc_partition *part = &xpc_partitions[partid]; 1961 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload); 1962 enum xp_retval ret; 1963 1964 dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *)msg, 1965 partid, ch_number); 1966 1967 DBUG_ON(partid < 0 || partid >= xp_max_npartitions); 1968 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); 1969 DBUG_ON(msg == NULL); 1970 DBUG_ON(func == NULL); 1971 1972 ret = xpc_send_msg(&part->channels[ch_number], msg, XPC_N_CALL, 1973 func, key); 1974 return ret; 1975 } 1976 1977 static struct xpc_msg * 1978 xpc_pull_remote_msg(struct xpc_channel *ch, s64 get) 1979 { 1980 struct xpc_partition *part = &xpc_partitions[ch->partid]; 1981 struct xpc_msg *remote_msg, *msg; 1982 u32 msg_index, nmsgs; 1983 u64 msg_offset; 1984 enum xp_retval ret; 1985 1986 if (mutex_lock_interruptible(&ch->msg_to_pull_mutex) != 0) { 1987 /* we were interrupted by a signal */ 1988 return NULL; 1989 } 1990 1991 while (get >= ch->next_msg_to_pull) { 1992 1993 /* pull as many messages as are ready and able to be pulled */ 1994 1995 msg_index = ch->next_msg_to_pull % ch->remote_nentries; 1996 1997 DBUG_ON(ch->next_msg_to_pull >= ch->w_remote_GP.put); 1998 nmsgs = ch->w_remote_GP.put - ch->next_msg_to_pull; 1999 if (msg_index + nmsgs > ch->remote_nentries) { 2000 /* ignore the ones that wrap the msg queue for now */ 2001 nmsgs = ch->remote_nentries - msg_index; 2002 } 2003 2004 msg_offset = msg_index * ch->msg_size; 2005 msg = (struct xpc_msg *)((u64)ch->remote_msgqueue + msg_offset); 2006 remote_msg = (struct xpc_msg *)(ch->remote_msgqueue_pa + 2007 msg_offset); 2008 2009 ret = xpc_pull_remote_cachelines(part, msg, remote_msg, 2010 nmsgs * ch->msg_size); 2011 if (ret != xpSuccess) { 2012 2013 dev_dbg(xpc_chan, "failed to pull %d msgs starting with" 2014 " msg %ld from partition %d, channel=%d, " 2015 "ret=%d\n", nmsgs, ch->next_msg_to_pull, 2016 ch->partid, ch->number, ret); 2017 2018 XPC_DEACTIVATE_PARTITION(part, ret); 2019 2020 mutex_unlock(&ch->msg_to_pull_mutex); 2021 return NULL; 2022 } 2023 2024 ch->next_msg_to_pull += nmsgs; 2025 } 2026 2027 mutex_unlock(&ch->msg_to_pull_mutex); 2028 2029 /* return the message we were looking for */ 2030 msg_offset = (get % ch->remote_nentries) * ch->msg_size; 2031 msg = (struct xpc_msg *)((u64)ch->remote_msgqueue + msg_offset); 2032 2033 return msg; 2034 } 2035 2036 /* 2037 * Get a message to be delivered. 2038 */ 2039 static struct xpc_msg * 2040 xpc_get_deliverable_msg(struct xpc_channel *ch) 2041 { 2042 struct xpc_msg *msg = NULL; 2043 s64 get; 2044 2045 do { 2046 if (ch->flags & XPC_C_DISCONNECTING) 2047 break; 2048 2049 get = ch->w_local_GP.get; 2050 rmb(); /* guarantee that .get loads before .put */ 2051 if (get == ch->w_remote_GP.put) 2052 break; 2053 2054 /* There are messages waiting to be pulled and delivered. 2055 * We need to try to secure one for ourselves. We'll do this 2056 * by trying to increment w_local_GP.get and hope that no one 2057 * else beats us to it. If they do, we'll we'll simply have 2058 * to try again for the next one. 2059 */ 2060 2061 if (cmpxchg(&ch->w_local_GP.get, get, get + 1) == get) { 2062 /* we got the entry referenced by get */ 2063 2064 dev_dbg(xpc_chan, "w_local_GP.get changed to %ld, " 2065 "partid=%d, channel=%d\n", get + 1, 2066 ch->partid, ch->number); 2067 2068 /* pull the message from the remote partition */ 2069 2070 msg = xpc_pull_remote_msg(ch, get); 2071 2072 DBUG_ON(msg != NULL && msg->number != get); 2073 DBUG_ON(msg != NULL && (msg->flags & XPC_M_DONE)); 2074 DBUG_ON(msg != NULL && !(msg->flags & XPC_M_READY)); 2075 2076 break; 2077 } 2078 2079 } while (1); 2080 2081 return msg; 2082 } 2083 2084 /* 2085 * Deliver a message to its intended recipient. 2086 */ 2087 void 2088 xpc_deliver_msg(struct xpc_channel *ch) 2089 { 2090 struct xpc_msg *msg; 2091 2092 msg = xpc_get_deliverable_msg(ch); 2093 if (msg != NULL) { 2094 2095 /* 2096 * This ref is taken to protect the payload itself from being 2097 * freed before the user is finished with it, which the user 2098 * indicates by calling xpc_initiate_received(). 2099 */ 2100 xpc_msgqueue_ref(ch); 2101 2102 atomic_inc(&ch->kthreads_active); 2103 2104 if (ch->func != NULL) { 2105 dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, " 2106 "msg_number=%ld, partid=%d, channel=%d\n", 2107 (void *)msg, msg->number, ch->partid, 2108 ch->number); 2109 2110 /* deliver the message to its intended recipient */ 2111 ch->func(xpMsgReceived, ch->partid, ch->number, 2112 &msg->payload, ch->key); 2113 2114 dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, " 2115 "msg_number=%ld, partid=%d, channel=%d\n", 2116 (void *)msg, msg->number, ch->partid, 2117 ch->number); 2118 } 2119 2120 atomic_dec(&ch->kthreads_active); 2121 } 2122 } 2123 2124 /* 2125 * Now we actually acknowledge the messages that have been delivered and ack'd 2126 * by advancing the cached remote message queue's Get value and if requested 2127 * send an IPI to the message sender's partition. 2128 */ 2129 static void 2130 xpc_acknowledge_msgs(struct xpc_channel *ch, s64 initial_get, u8 msg_flags) 2131 { 2132 struct xpc_msg *msg; 2133 s64 get = initial_get + 1; 2134 int send_IPI = 0; 2135 2136 while (1) { 2137 2138 while (1) { 2139 if (get == ch->w_local_GP.get) 2140 break; 2141 2142 msg = (struct xpc_msg *)((u64)ch->remote_msgqueue + 2143 (get % ch->remote_nentries) * 2144 ch->msg_size); 2145 2146 if (!(msg->flags & XPC_M_DONE)) 2147 break; 2148 2149 msg_flags |= msg->flags; 2150 get++; 2151 } 2152 2153 if (get == initial_get) { 2154 /* nothing's changed */ 2155 break; 2156 } 2157 2158 if (cmpxchg_rel(&ch->local_GP->get, initial_get, get) != 2159 initial_get) { 2160 /* someone else beat us to it */ 2161 DBUG_ON(ch->local_GP->get <= initial_get); 2162 break; 2163 } 2164 2165 /* we just set the new value of local_GP->get */ 2166 2167 dev_dbg(xpc_chan, "local_GP->get changed to %ld, partid=%d, " 2168 "channel=%d\n", get, ch->partid, ch->number); 2169 2170 send_IPI = (msg_flags & XPC_M_INTERRUPT); 2171 2172 /* 2173 * We need to ensure that the message referenced by 2174 * local_GP->get is not XPC_M_DONE or that local_GP->get 2175 * equals w_local_GP.get, so we'll go have a look. 2176 */ 2177 initial_get = get; 2178 } 2179 2180 if (send_IPI) 2181 xpc_IPI_send_msgrequest(ch); 2182 } 2183 2184 /* 2185 * Acknowledge receipt of a delivered message. 2186 * 2187 * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition 2188 * that sent the message. 2189 * 2190 * This function, although called by users, does not call xpc_part_ref() to 2191 * ensure that the partition infrastructure is in place. It relies on the 2192 * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg(). 2193 * 2194 * Arguments: 2195 * 2196 * partid - ID of partition to which the channel is connected. 2197 * ch_number - channel # message received on. 2198 * payload - pointer to the payload area allocated via 2199 * xpc_initiate_allocate(). 2200 */ 2201 void 2202 xpc_initiate_received(short partid, int ch_number, void *payload) 2203 { 2204 struct xpc_partition *part = &xpc_partitions[partid]; 2205 struct xpc_channel *ch; 2206 struct xpc_msg *msg = XPC_MSG_ADDRESS(payload); 2207 s64 get, msg_number = msg->number; 2208 2209 DBUG_ON(partid < 0 || partid >= xp_max_npartitions); 2210 DBUG_ON(ch_number < 0 || ch_number >= part->nchannels); 2211 2212 ch = &part->channels[ch_number]; 2213 2214 dev_dbg(xpc_chan, "msg=0x%p, msg_number=%ld, partid=%d, channel=%d\n", 2215 (void *)msg, msg_number, ch->partid, ch->number); 2216 2217 DBUG_ON((((u64)msg - (u64)ch->remote_msgqueue) / ch->msg_size) != 2218 msg_number % ch->remote_nentries); 2219 DBUG_ON(msg->flags & XPC_M_DONE); 2220 2221 msg->flags |= XPC_M_DONE; 2222 2223 /* 2224 * The preceding store of msg->flags must occur before the following 2225 * load of ch->local_GP->get. 2226 */ 2227 mb(); 2228 2229 /* 2230 * See if this message is next in line to be acknowledged as having 2231 * been delivered. 2232 */ 2233 get = ch->local_GP->get; 2234 if (get == msg_number) 2235 xpc_acknowledge_msgs(ch, get, msg->flags); 2236 2237 /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg() */ 2238 xpc_msgqueue_deref(ch); 2239 } 2240