1# 2# BitBake Process based server. 3# 4# Copyright (C) 2010 Bob Foerster <robert@erafx.com> 5# 6# SPDX-License-Identifier: GPL-2.0-only 7# 8 9""" 10 This module implements a multiprocessing.Process based server for bitbake. 11""" 12 13import bb 14import bb.event 15import logging 16import multiprocessing 17import threading 18import array 19import os 20import sys 21import time 22import select 23import socket 24import subprocess 25import errno 26import re 27import datetime 28import pickle 29import traceback 30import gc 31import stat 32import bb.server.xmlrpcserver 33from bb import daemonize 34from multiprocessing import queues 35 36logger = logging.getLogger('BitBake') 37 38class ProcessTimeout(SystemExit): 39 pass 40 41def serverlog(msg): 42 print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg) 43 sys.stdout.flush() 44 45# 46# When we have lockfile issues, try and find infomation about which process is 47# using the lockfile 48# 49def get_lockfile_process_msg(lockfile): 50 # Some systems may not have lsof available 51 procs = None 52 try: 53 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) 54 except subprocess.CalledProcessError: 55 # File was deleted? 56 pass 57 except OSError as e: 58 if e.errno != errno.ENOENT: 59 raise 60 if procs is None: 61 # Fall back to fuser if lsof is unavailable 62 try: 63 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) 64 except subprocess.CalledProcessError: 65 # File was deleted? 66 pass 67 except OSError as e: 68 if e.errno != errno.ENOENT: 69 raise 70 if procs: 71 return procs.decode("utf-8") 72 return None 73 74class idleFinish(): 75 def __init__(self, msg): 76 self.msg = msg 77 78class ProcessServer(): 79 profile_filename = "profile.log" 80 profile_processed_filename = "profile.log.processed" 81 82 def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface): 83 self.command_channel = False 84 self.command_channel_reply = False 85 self.quit = False 86 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. 87 self.next_heartbeat = time.time() 88 89 self.event_handle = None 90 self.hadanyui = False 91 self.haveui = False 92 self.maxuiwait = 30 93 self.xmlrpc = False 94 95 self.idle = None 96 # Need a lock for _idlefuns changes 97 self._idlefuns = {} 98 self._idlefuncsLock = threading.Lock() 99 self.idle_cond = threading.Condition(self._idlefuncsLock) 100 101 self.bitbake_lock = lock 102 self.bitbake_lock_name = lockname 103 self.sock = sock 104 self.sockname = sockname 105 # It is possible the directory may be renamed. Cache the inode of the socket file 106 # so we can tell if things changed. 107 self.sockinode = os.stat(self.sockname)[stat.ST_INO] 108 109 self.server_timeout = server_timeout 110 self.timeout = self.server_timeout 111 self.xmlrpcinterface = xmlrpcinterface 112 113 def register_idle_function(self, function, data): 114 """Register a function to be called while the server is idle""" 115 assert hasattr(function, '__call__') 116 with bb.utils.lock_timeout(self._idlefuncsLock): 117 self._idlefuns[function] = data 118 serverlog("Registering idle function %s" % str(function)) 119 120 def run(self): 121 122 if self.xmlrpcinterface[0]: 123 self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self) 124 125 serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port)) 126 127 try: 128 self.bitbake_lock.seek(0) 129 self.bitbake_lock.truncate() 130 if self.xmlrpc: 131 self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port)) 132 else: 133 self.bitbake_lock.write("%s\n" % (os.getpid())) 134 self.bitbake_lock.flush() 135 except Exception as e: 136 serverlog("Error writing to lock file: %s" % str(e)) 137 pass 138 139 if self.cooker.configuration.profile: 140 try: 141 import cProfile as profile 142 except: 143 import profile 144 prof = profile.Profile() 145 146 ret = profile.Profile.runcall(prof, self.main) 147 148 prof.dump_stats("profile.log") 149 bb.utils.process_profilelog("profile.log") 150 serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") 151 152 else: 153 ret = self.main() 154 155 return ret 156 157 def _idle_check(self): 158 return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None 159 160 def wait_for_idle(self, timeout=30): 161 # Wait for the idle loop to have cleared 162 with bb.utils.lock_timeout(self._idlefuncsLock): 163 return self.idle_cond.wait_for(self._idle_check, timeout) is not False 164 165 def set_async_cmd(self, cmd): 166 with bb.utils.lock_timeout(self._idlefuncsLock): 167 ret = self.idle_cond.wait_for(self._idle_check, 30) 168 if ret is False: 169 return False 170 self.cooker.command.currentAsyncCommand = cmd 171 return True 172 173 def clear_async_cmd(self): 174 with bb.utils.lock_timeout(self._idlefuncsLock): 175 self.cooker.command.currentAsyncCommand = None 176 self.idle_cond.notify_all() 177 178 def get_async_cmd(self): 179 with bb.utils.lock_timeout(self._idlefuncsLock): 180 return self.cooker.command.currentAsyncCommand 181 182 def main(self): 183 self.cooker.pre_serve() 184 185 bb.utils.set_process_name("Cooker") 186 187 ready = [] 188 newconnections = [] 189 190 self.controllersock = False 191 fds = [self.sock] 192 if self.xmlrpc: 193 fds.append(self.xmlrpc) 194 seendata = False 195 serverlog("Entering server connection loop") 196 serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname))) 197 198 def disconnect_client(self, fds): 199 serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname)) 200 if self.controllersock: 201 fds.remove(self.controllersock) 202 self.controllersock.close() 203 self.controllersock = False 204 if self.haveui: 205 # Wait for the idle loop to have cleared (30s max) 206 if not self.wait_for_idle(30): 207 serverlog("Idle loop didn't finish queued commands after 30s, exiting.") 208 self.quit = True 209 fds.remove(self.command_channel) 210 bb.event.unregister_UIHhandler(self.event_handle, True) 211 self.command_channel_reply.writer.close() 212 self.event_writer.writer.close() 213 self.command_channel.close() 214 self.command_channel = False 215 del self.event_writer 216 self.lastui = time.time() 217 self.cooker.clientComplete() 218 self.haveui = False 219 ready = select.select(fds,[],[],0)[0] 220 if newconnections and not self.quit: 221 serverlog("Starting new client") 222 conn = newconnections.pop(-1) 223 fds.append(conn) 224 self.controllersock = conn 225 elif not self.timeout and not ready: 226 serverlog("No timeout, exiting.") 227 self.quit = True 228 229 self.lastui = time.time() 230 while not self.quit: 231 if self.sock in ready: 232 while select.select([self.sock],[],[],0)[0]: 233 controllersock, address = self.sock.accept() 234 if self.controllersock: 235 serverlog("Queuing %s (%s)" % (str(ready), str(newconnections))) 236 newconnections.append(controllersock) 237 else: 238 serverlog("Accepting %s (%s)" % (str(ready), str(newconnections))) 239 self.controllersock = controllersock 240 fds.append(controllersock) 241 if self.controllersock in ready: 242 try: 243 serverlog("Processing Client") 244 ui_fds = recvfds(self.controllersock, 3) 245 serverlog("Connecting Client") 246 247 # Where to write events to 248 writer = ConnectionWriter(ui_fds[0]) 249 self.event_handle = bb.event.register_UIHhandler(writer, True) 250 self.event_writer = writer 251 252 # Where to read commands from 253 reader = ConnectionReader(ui_fds[1]) 254 fds.append(reader) 255 self.command_channel = reader 256 257 # Where to send command return values to 258 writer = ConnectionWriter(ui_fds[2]) 259 self.command_channel_reply = writer 260 261 self.haveui = True 262 self.hadanyui = True 263 264 except (EOFError, OSError): 265 disconnect_client(self, fds) 266 267 if not self.timeout == -1.0 and not self.haveui and self.timeout and \ 268 (self.lastui + self.timeout) < time.time(): 269 serverlog("Server timeout, exiting.") 270 self.quit = True 271 272 # If we don't see a UI connection within maxuiwait, its unlikely we're going to see 273 # one. We have had issue with processes hanging indefinitely so timing out UI-less 274 # servers is useful. 275 if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time(): 276 serverlog("No UI connection within max timeout, exiting to avoid infinite loop.") 277 self.quit = True 278 279 if self.command_channel in ready: 280 try: 281 command = self.command_channel.get() 282 except EOFError: 283 # Client connection shutting down 284 ready = [] 285 disconnect_client(self, fds) 286 continue 287 if command[0] == "terminateServer": 288 self.quit = True 289 continue 290 try: 291 serverlog("Running command %s" % command) 292 self.command_channel_reply.send(self.cooker.command.runCommand(command, self)) 293 serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname)) 294 except Exception as e: 295 stack = traceback.format_exc() 296 serverlog('Exception in server main event loop running command %s (%s)' % (command, stack)) 297 logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack)) 298 299 if self.xmlrpc in ready: 300 self.xmlrpc.handle_requests() 301 302 if not seendata and hasattr(self.cooker, "data"): 303 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') 304 if heartbeat_event: 305 try: 306 self.heartbeat_seconds = float(heartbeat_event) 307 except: 308 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) 309 310 self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT') 311 try: 312 if self.timeout: 313 self.timeout = float(self.timeout) 314 except: 315 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) 316 seendata = True 317 318 ready = self.idle_commands(.1, fds) 319 320 if self.idle: 321 self.idle.join() 322 323 serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname)) 324 # Remove the socket file so we don't get any more connections to avoid races 325 # The build directory could have been renamed so if the file isn't the one we created 326 # we shouldn't delete it. 327 try: 328 sockinode = os.stat(self.sockname)[stat.ST_INO] 329 if sockinode == self.sockinode: 330 os.unlink(self.sockname) 331 else: 332 serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode)) 333 except Exception as err: 334 serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err)) 335 self.sock.close() 336 337 try: 338 self.cooker.shutdown(True, idle=False) 339 self.cooker.notifier.stop() 340 self.cooker.confignotifier.stop() 341 except: 342 pass 343 344 self.cooker.post_serve() 345 346 if len(threading.enumerate()) != 1: 347 serverlog("More than one thread left?: " + str(threading.enumerate())) 348 349 # Flush logs before we release the lock 350 sys.stdout.flush() 351 sys.stderr.flush() 352 353 # Finally release the lockfile but warn about other processes holding it open 354 lock = self.bitbake_lock 355 lockfile = self.bitbake_lock_name 356 357 def get_lock_contents(lockfile): 358 try: 359 with open(lockfile, "r") as f: 360 return f.readlines() 361 except FileNotFoundError: 362 return None 363 364 lock.close() 365 lock = None 366 367 while not lock: 368 i = 0 369 lock = None 370 if not os.path.exists(os.path.basename(lockfile)): 371 serverlog("Lockfile directory gone, exiting.") 372 return 373 374 while not lock and i < 30: 375 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) 376 if not lock: 377 newlockcontents = get_lock_contents(lockfile) 378 if not newlockcontents[0].startswith([os.getpid() + "\n", os.getpid() + " "]): 379 # A new server was started, the lockfile contents changed, we can exit 380 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) 381 return 382 time.sleep(0.1) 383 i += 1 384 if lock: 385 # We hold the lock so we can remove the file (hide stale pid data) 386 # via unlockfile. 387 bb.utils.unlockfile(lock) 388 serverlog("Exiting as we could obtain the lock") 389 return 390 391 if not lock: 392 procs = get_lockfile_process_msg(lockfile) 393 msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"] 394 if procs: 395 msg.append(":\n%s" % procs) 396 serverlog("".join(msg)) 397 398 def idle_thread(self): 399 def remove_idle_func(function): 400 with bb.utils.lock_timeout(self._idlefuncsLock): 401 del self._idlefuns[function] 402 self.idle_cond.notify_all() 403 404 while not self.quit: 405 nextsleep = 0.1 406 fds = [] 407 408 self.cooker.process_inotify_updates() 409 410 with bb.utils.lock_timeout(self._idlefuncsLock): 411 items = list(self._idlefuns.items()) 412 413 for function, data in items: 414 try: 415 retval = function(self, data, False) 416 if isinstance(retval, idleFinish): 417 serverlog("Removing idle function %s at idleFinish" % str(function)) 418 remove_idle_func(function) 419 self.cooker.command.finishAsyncCommand(retval.msg) 420 nextsleep = None 421 elif retval is False: 422 serverlog("Removing idle function %s" % str(function)) 423 remove_idle_func(function) 424 nextsleep = None 425 elif retval is True: 426 nextsleep = None 427 elif isinstance(retval, float) and nextsleep: 428 if (retval < nextsleep): 429 nextsleep = retval 430 elif nextsleep is None: 431 continue 432 else: 433 fds = fds + retval 434 except SystemExit: 435 raise 436 except Exception as exc: 437 if not isinstance(exc, bb.BBHandledException): 438 logger.exception('Running idle function') 439 remove_idle_func(function) 440 serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc()) 441 self.quit = True 442 443 # Create new heartbeat event? 444 now = time.time() 445 if bb.event._heartbeat_enabled and now >= self.next_heartbeat: 446 # We might have missed heartbeats. Just trigger once in 447 # that case and continue after the usual delay. 448 self.next_heartbeat += self.heartbeat_seconds 449 if self.next_heartbeat <= now: 450 self.next_heartbeat = now + self.heartbeat_seconds 451 if hasattr(self.cooker, "data"): 452 heartbeat = bb.event.HeartbeatEvent(now) 453 try: 454 bb.event.fire(heartbeat, self.cooker.data) 455 except Exception as exc: 456 if not isinstance(exc, bb.BBHandledException): 457 logger.exception('Running heartbeat function') 458 serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc()) 459 self.quit = True 460 if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat: 461 # Shorten timeout so that we we wake up in time for 462 # the heartbeat. 463 nextsleep = self.next_heartbeat - now 464 465 if nextsleep is not None: 466 select.select(fds,[],[],nextsleep)[0] 467 468 def idle_commands(self, delay, fds=None): 469 nextsleep = delay 470 if not fds: 471 fds = [] 472 473 if not self.idle: 474 self.idle = threading.Thread(target=self.idle_thread) 475 self.idle.start() 476 477 if nextsleep is not None: 478 if self.xmlrpc: 479 nextsleep = self.xmlrpc.get_timeout(nextsleep) 480 try: 481 return select.select(fds,[],[],nextsleep)[0] 482 except InterruptedError: 483 # Ignore EINTR 484 return [] 485 else: 486 return select.select(fds,[],[],0)[0] 487 488 489class ServerCommunicator(): 490 def __init__(self, connection, recv): 491 self.connection = connection 492 self.recv = recv 493 494 def runCommand(self, command): 495 self.connection.send(command) 496 if not self.recv.poll(30): 497 logger.info("No reply from server in 30s") 498 if not self.recv.poll(30): 499 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)") 500 ret, exc = self.recv.get() 501 # Should probably turn all exceptions in exc back into exceptions? 502 # For now, at least handle BBHandledException 503 if exc and ("BBHandledException" in exc or "SystemExit" in exc): 504 raise bb.BBHandledException() 505 return ret, exc 506 507 def updateFeatureSet(self, featureset): 508 _, error = self.runCommand(["setFeatures", featureset]) 509 if error: 510 logger.error("Unable to set the cooker to the correct featureset: %s" % error) 511 raise BaseException(error) 512 513 def getEventHandle(self): 514 handle, error = self.runCommand(["getUIHandlerNum"]) 515 if error: 516 logger.error("Unable to get UI Handler Number: %s" % error) 517 raise BaseException(error) 518 519 return handle 520 521 def terminateServer(self): 522 self.connection.send(['terminateServer']) 523 return 524 525class BitBakeProcessServerConnection(object): 526 def __init__(self, ui_channel, recv, eq, sock): 527 self.connection = ServerCommunicator(ui_channel, recv) 528 self.events = eq 529 # Save sock so it doesn't get gc'd for the life of our connection 530 self.socket_connection = sock 531 532 def terminate(self): 533 self.events.close() 534 self.socket_connection.close() 535 self.connection.connection.close() 536 self.connection.recv.close() 537 return 538 539start_log_format = '--- Starting bitbake server pid %s at %s ---' 540start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' 541 542class BitBakeServer(object): 543 544 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile): 545 546 self.server_timeout = server_timeout 547 self.xmlrpcinterface = xmlrpcinterface 548 self.featureset = featureset 549 self.sockname = sockname 550 self.bitbake_lock = lock 551 self.profile = profile 552 self.readypipe, self.readypipein = os.pipe() 553 554 # Place the log in the builddirectory alongside the lock file 555 logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log") 556 self.logfile = logfile 557 558 startdatetime = datetime.datetime.now() 559 bb.daemonize.createDaemon(self._startServer, logfile) 560 self.bitbake_lock.close() 561 os.close(self.readypipein) 562 563 ready = ConnectionReader(self.readypipe) 564 r = ready.poll(5) 565 if not r: 566 bb.note("Bitbake server didn't start within 5 seconds, waiting for 90") 567 r = ready.poll(90) 568 if r: 569 try: 570 r = ready.get() 571 except EOFError: 572 # Trap the child exiting/closing the pipe and error out 573 r = None 574 if not r or r[0] != "r": 575 ready.close() 576 bb.error("Unable to start bitbake server (%s)" % str(r)) 577 if os.path.exists(logfile): 578 logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)')) 579 started = False 580 lines = [] 581 lastlines = [] 582 with open(logfile, "r") as f: 583 for line in f: 584 if started: 585 lines.append(line) 586 else: 587 lastlines.append(line) 588 res = logstart_re.search(line.rstrip()) 589 if res: 590 ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format) 591 if ldatetime >= startdatetime: 592 started = True 593 lines.append(line) 594 if len(lastlines) > 60: 595 lastlines = lastlines[-60:] 596 if lines: 597 if len(lines) > 60: 598 bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:]))) 599 else: 600 bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines))) 601 elif lastlines: 602 bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines))) 603 else: 604 bb.error("%s doesn't exist" % logfile) 605 606 raise SystemExit(1) 607 608 ready.close() 609 610 def _startServer(self): 611 os.close(self.readypipe) 612 os.set_inheritable(self.bitbake_lock.fileno(), True) 613 os.set_inheritable(self.readypipein, True) 614 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") 615 os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) 616 617def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile): 618 619 import bb.cookerdata 620 import bb.cooker 621 622 serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format))) 623 624 try: 625 bitbake_lock = os.fdopen(lockfd, "w") 626 627 # Create server control socket 628 if os.path.exists(sockname): 629 serverlog("WARNING: removing existing socket file '%s'" % sockname) 630 os.unlink(sockname) 631 632 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 633 # AF_UNIX has path length issues so chdir here to workaround 634 cwd = os.getcwd() 635 try: 636 os.chdir(os.path.dirname(sockname)) 637 sock.bind(os.path.basename(sockname)) 638 finally: 639 os.chdir(cwd) 640 sock.listen(1) 641 642 server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface) 643 writer = ConnectionWriter(readypipeinfd) 644 try: 645 featureset = [] 646 cooker = bb.cooker.BBCooker(featureset, server) 647 cooker.configuration.profile = profile 648 except bb.BBHandledException: 649 return None 650 writer.send("r") 651 writer.close() 652 server.cooker = cooker 653 serverlog("Started bitbake server pid %d" % os.getpid()) 654 655 server.run() 656 finally: 657 # Flush any messages/errors to the logfile before exit 658 sys.stdout.flush() 659 sys.stderr.flush() 660 661def connectProcessServer(sockname, featureset): 662 # Connect to socket 663 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 664 # AF_UNIX has path length issues so chdir here to workaround 665 cwd = os.getcwd() 666 667 readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None 668 eq = command_chan_recv = command_chan = None 669 670 sock.settimeout(10) 671 672 try: 673 try: 674 os.chdir(os.path.dirname(sockname)) 675 finished = False 676 while not finished: 677 try: 678 sock.connect(os.path.basename(sockname)) 679 finished = True 680 except IOError as e: 681 if e.errno == errno.EWOULDBLOCK: 682 pass 683 raise 684 finally: 685 os.chdir(cwd) 686 687 # Send an fd for the remote to write events to 688 readfd, writefd = os.pipe() 689 eq = BBUIEventQueue(readfd) 690 # Send an fd for the remote to recieve commands from 691 readfd1, writefd1 = os.pipe() 692 command_chan = ConnectionWriter(writefd1) 693 # Send an fd for the remote to write commands results to 694 readfd2, writefd2 = os.pipe() 695 command_chan_recv = ConnectionReader(readfd2) 696 697 sendfds(sock, [writefd, readfd1, writefd2]) 698 699 server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock) 700 701 # Close the ends of the pipes we won't use 702 for i in [writefd, readfd1, writefd2]: 703 os.close(i) 704 705 server_connection.connection.updateFeatureSet(featureset) 706 707 except (Exception, SystemExit) as e: 708 if command_chan_recv: 709 command_chan_recv.close() 710 if command_chan: 711 command_chan.close() 712 for i in [writefd, readfd1, writefd2]: 713 try: 714 if i: 715 os.close(i) 716 except OSError: 717 pass 718 sock.close() 719 raise 720 721 return server_connection 722 723def sendfds(sock, fds): 724 '''Send an array of fds over an AF_UNIX socket.''' 725 fds = array.array('i', fds) 726 msg = bytes([len(fds) % 256]) 727 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) 728 729def recvfds(sock, size): 730 '''Receive an array of fds over an AF_UNIX socket.''' 731 a = array.array('i') 732 bytes_size = a.itemsize * size 733 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size)) 734 if not msg and not ancdata: 735 raise EOFError 736 try: 737 if len(ancdata) != 1: 738 raise RuntimeError('received %d items of ancdata' % 739 len(ancdata)) 740 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 741 if (cmsg_level == socket.SOL_SOCKET and 742 cmsg_type == socket.SCM_RIGHTS): 743 if len(cmsg_data) % a.itemsize != 0: 744 raise ValueError 745 a.frombytes(cmsg_data) 746 assert len(a) % 256 == msg[0] 747 return list(a) 748 except (ValueError, IndexError): 749 pass 750 raise RuntimeError('Invalid data received') 751 752class BBUIEventQueue: 753 def __init__(self, readfd): 754 755 self.eventQueue = [] 756 self.eventQueueLock = threading.Lock() 757 self.eventQueueNotify = threading.Event() 758 759 self.reader = ConnectionReader(readfd) 760 761 self.t = threading.Thread() 762 self.t.run = self.startCallbackHandler 763 self.t.start() 764 765 def getEvent(self): 766 with bb.utils.lock_timeout(self.eventQueueLock): 767 if len(self.eventQueue) == 0: 768 return None 769 770 item = self.eventQueue.pop(0) 771 if len(self.eventQueue) == 0: 772 self.eventQueueNotify.clear() 773 774 return item 775 776 def waitEvent(self, delay): 777 self.eventQueueNotify.wait(delay) 778 return self.getEvent() 779 780 def queue_event(self, event): 781 with bb.utils.lock_timeout(self.eventQueueLock): 782 self.eventQueue.append(event) 783 self.eventQueueNotify.set() 784 785 def send_event(self, event): 786 self.queue_event(pickle.loads(event)) 787 788 def startCallbackHandler(self): 789 bb.utils.set_process_name("UIEventQueue") 790 while True: 791 try: 792 ready = self.reader.wait(0.25) 793 if ready: 794 event = self.reader.get() 795 self.queue_event(event) 796 except (EOFError, OSError, TypeError): 797 # Easiest way to exit is to close the file descriptor to cause an exit 798 break 799 800 def close(self): 801 self.reader.close() 802 self.t.join() 803 804class ConnectionReader(object): 805 806 def __init__(self, fd): 807 self.reader = multiprocessing.connection.Connection(fd, writable=False) 808 self.rlock = multiprocessing.Lock() 809 810 def wait(self, timeout=None): 811 return multiprocessing.connection.wait([self.reader], timeout) 812 813 def poll(self, timeout=None): 814 return self.reader.poll(timeout) 815 816 def get(self): 817 with bb.utils.lock_timeout(self.rlock): 818 res = self.reader.recv_bytes() 819 return multiprocessing.reduction.ForkingPickler.loads(res) 820 821 def fileno(self): 822 return self.reader.fileno() 823 824 def close(self): 825 return self.reader.close() 826 827 828class ConnectionWriter(object): 829 830 def __init__(self, fd): 831 self.writer = multiprocessing.connection.Connection(fd, readable=False) 832 self.wlock = multiprocessing.Lock() 833 # Why bb.event needs this I have no idea 834 self.event = self 835 836 def _send(self, obj): 837 gc.disable() 838 with bb.utils.lock_timeout(self.wlock): 839 self.writer.send_bytes(obj) 840 gc.enable() 841 842 def send(self, obj): 843 obj = multiprocessing.reduction.ForkingPickler.dumps(obj) 844 # See notes/code in CookerParser 845 # We must not terminate holding this lock else processes will hang. 846 # For SIGTERM, raising afterwards avoids this. 847 # For SIGINT, we don't want to have written partial data to the pipe. 848 # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139 849 process = multiprocessing.current_process() 850 if process and hasattr(process, "queue_signals"): 851 with bb.utils.lock_timeout(process.signal_threadlock): 852 process.queue_signals = True 853 self._send(obj) 854 process.queue_signals = False 855 try: 856 for sig in process.signal_received.pop(): 857 process.handle_sig(sig, None) 858 except IndexError: 859 pass 860 else: 861 self._send(obj) 862 863 def fileno(self): 864 return self.writer.fileno() 865 866 def close(self): 867 return self.writer.close() 868