1# 2# BitBake Process based server. 3# 4# Copyright (C) 2010 Bob Foerster <robert@erafx.com> 5# 6# SPDX-License-Identifier: GPL-2.0-only 7# 8 9""" 10 This module implements a multiprocessing.Process based server for bitbake. 11""" 12 13import bb 14import bb.event 15import logging 16import multiprocessing 17import threading 18import array 19import os 20import sys 21import time 22import select 23import socket 24import subprocess 25import errno 26import re 27import datetime 28import pickle 29import traceback 30import gc 31import stat 32import bb.server.xmlrpcserver 33from bb import daemonize 34from multiprocessing import queues 35 36logger = logging.getLogger('BitBake') 37 38class ProcessTimeout(SystemExit): 39 pass 40 41def currenttime(): 42 return datetime.datetime.now().strftime('%H:%M:%S.%f') 43 44def serverlog(msg): 45 print(str(os.getpid()) + " " + currenttime() + " " + msg) 46 sys.stdout.flush() 47 48# 49# When we have lockfile issues, try and find infomation about which process is 50# using the lockfile 51# 52def get_lockfile_process_msg(lockfile): 53 # Some systems may not have lsof available 54 procs = None 55 try: 56 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) 57 except subprocess.CalledProcessError: 58 # File was deleted? 59 pass 60 except OSError as e: 61 if e.errno != errno.ENOENT: 62 raise 63 if procs is None: 64 # Fall back to fuser if lsof is unavailable 65 try: 66 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) 67 except subprocess.CalledProcessError: 68 # File was deleted? 69 pass 70 except OSError as e: 71 if e.errno != errno.ENOENT: 72 raise 73 if procs: 74 return procs.decode("utf-8") 75 return None 76 77class idleFinish(): 78 def __init__(self, msg): 79 self.msg = msg 80 81class ProcessServer(): 82 profile_filename = "profile.log" 83 profile_processed_filename = "profile.log.processed" 84 85 def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface): 86 self.command_channel = False 87 self.command_channel_reply = False 88 self.quit = False 89 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. 90 self.next_heartbeat = time.time() 91 92 self.event_handle = None 93 self.hadanyui = False 94 self.haveui = False 95 self.maxuiwait = 30 96 self.xmlrpc = False 97 98 self.idle = None 99 # Need a lock for _idlefuns changes 100 self._idlefuns = {} 101 self._idlefuncsLock = threading.Lock() 102 self.idle_cond = threading.Condition(self._idlefuncsLock) 103 104 self.bitbake_lock = lock 105 self.bitbake_lock_name = lockname 106 self.sock = sock 107 self.sockname = sockname 108 # It is possible the directory may be renamed. Cache the inode of the socket file 109 # so we can tell if things changed. 110 self.sockinode = os.stat(self.sockname)[stat.ST_INO] 111 112 self.server_timeout = server_timeout 113 self.timeout = self.server_timeout 114 self.xmlrpcinterface = xmlrpcinterface 115 116 def register_idle_function(self, function, data): 117 """Register a function to be called while the server is idle""" 118 assert hasattr(function, '__call__') 119 with bb.utils.lock_timeout(self._idlefuncsLock): 120 self._idlefuns[function] = data 121 serverlog("Registering idle function %s" % str(function)) 122 123 def run(self): 124 125 if self.xmlrpcinterface[0]: 126 self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self) 127 128 serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port)) 129 130 try: 131 self.bitbake_lock.seek(0) 132 self.bitbake_lock.truncate() 133 if self.xmlrpc: 134 self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port)) 135 else: 136 self.bitbake_lock.write("%s\n" % (os.getpid())) 137 self.bitbake_lock.flush() 138 except Exception as e: 139 serverlog("Error writing to lock file: %s" % str(e)) 140 pass 141 142 if self.cooker.configuration.profile: 143 try: 144 import cProfile as profile 145 except: 146 import profile 147 prof = profile.Profile() 148 149 ret = profile.Profile.runcall(prof, self.main) 150 151 prof.dump_stats("profile.log") 152 bb.utils.process_profilelog("profile.log") 153 serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") 154 155 else: 156 ret = self.main() 157 158 return ret 159 160 def _idle_check(self): 161 return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None 162 163 def wait_for_idle(self, timeout=30): 164 # Wait for the idle loop to have cleared 165 with bb.utils.lock_timeout(self._idlefuncsLock): 166 return self.idle_cond.wait_for(self._idle_check, timeout) is not False 167 168 def set_async_cmd(self, cmd): 169 with bb.utils.lock_timeout(self._idlefuncsLock): 170 ret = self.idle_cond.wait_for(self._idle_check, 30) 171 if ret is False: 172 return False 173 self.cooker.command.currentAsyncCommand = cmd 174 return True 175 176 def clear_async_cmd(self): 177 with bb.utils.lock_timeout(self._idlefuncsLock): 178 self.cooker.command.currentAsyncCommand = None 179 self.idle_cond.notify_all() 180 181 def get_async_cmd(self): 182 with bb.utils.lock_timeout(self._idlefuncsLock): 183 return self.cooker.command.currentAsyncCommand 184 185 def main(self): 186 self.cooker.pre_serve() 187 188 bb.utils.set_process_name("Cooker") 189 190 ready = [] 191 newconnections = [] 192 193 self.controllersock = False 194 fds = [self.sock] 195 if self.xmlrpc: 196 fds.append(self.xmlrpc) 197 seendata = False 198 serverlog("Entering server connection loop") 199 serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname))) 200 201 def disconnect_client(self, fds): 202 serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname)) 203 if self.controllersock: 204 fds.remove(self.controllersock) 205 self.controllersock.close() 206 self.controllersock = False 207 if self.haveui: 208 # Wait for the idle loop to have cleared (30s max) 209 if not self.wait_for_idle(30): 210 serverlog("Idle loop didn't finish queued commands after 30s, exiting.") 211 self.quit = True 212 fds.remove(self.command_channel) 213 bb.event.unregister_UIHhandler(self.event_handle, True) 214 self.command_channel_reply.writer.close() 215 self.event_writer.writer.close() 216 self.command_channel.close() 217 self.command_channel = False 218 del self.event_writer 219 self.lastui = time.time() 220 self.cooker.clientComplete() 221 self.haveui = False 222 ready = select.select(fds,[],[],0)[0] 223 if newconnections and not self.quit: 224 serverlog("Starting new client") 225 conn = newconnections.pop(-1) 226 fds.append(conn) 227 self.controllersock = conn 228 elif not self.timeout and not ready: 229 serverlog("No timeout, exiting.") 230 self.quit = True 231 232 self.lastui = time.time() 233 while not self.quit: 234 if self.sock in ready: 235 while select.select([self.sock],[],[],0)[0]: 236 controllersock, address = self.sock.accept() 237 if self.controllersock: 238 serverlog("Queuing %s (%s)" % (str(ready), str(newconnections))) 239 newconnections.append(controllersock) 240 else: 241 serverlog("Accepting %s (%s)" % (str(ready), str(newconnections))) 242 self.controllersock = controllersock 243 fds.append(controllersock) 244 if self.controllersock in ready: 245 try: 246 serverlog("Processing Client") 247 ui_fds = recvfds(self.controllersock, 3) 248 serverlog("Connecting Client") 249 250 # Where to write events to 251 writer = ConnectionWriter(ui_fds[0]) 252 self.event_handle = bb.event.register_UIHhandler(writer, True) 253 self.event_writer = writer 254 255 # Where to read commands from 256 reader = ConnectionReader(ui_fds[1]) 257 fds.append(reader) 258 self.command_channel = reader 259 260 # Where to send command return values to 261 writer = ConnectionWriter(ui_fds[2]) 262 self.command_channel_reply = writer 263 264 self.haveui = True 265 self.hadanyui = True 266 267 except (EOFError, OSError): 268 disconnect_client(self, fds) 269 270 if not self.timeout == -1.0 and not self.haveui and self.timeout and \ 271 (self.lastui + self.timeout) < time.time(): 272 serverlog("Server timeout, exiting.") 273 self.quit = True 274 275 # If we don't see a UI connection within maxuiwait, its unlikely we're going to see 276 # one. We have had issue with processes hanging indefinitely so timing out UI-less 277 # servers is useful. 278 if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time(): 279 serverlog("No UI connection within max timeout, exiting to avoid infinite loop.") 280 self.quit = True 281 282 if self.command_channel in ready: 283 try: 284 command = self.command_channel.get() 285 except EOFError: 286 # Client connection shutting down 287 ready = [] 288 disconnect_client(self, fds) 289 continue 290 if command[0] == "terminateServer": 291 self.quit = True 292 continue 293 try: 294 serverlog("Running command %s" % command) 295 reply = self.cooker.command.runCommand(command, self) 296 serverlog("Sending reply %s" % repr(reply)) 297 self.command_channel_reply.send(reply) 298 serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname)) 299 except Exception as e: 300 stack = traceback.format_exc() 301 serverlog('Exception in server main event loop running command %s (%s)' % (command, stack)) 302 logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack)) 303 304 if self.xmlrpc in ready: 305 self.xmlrpc.handle_requests() 306 307 if not seendata and hasattr(self.cooker, "data"): 308 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') 309 if heartbeat_event: 310 try: 311 self.heartbeat_seconds = float(heartbeat_event) 312 except: 313 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) 314 315 self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT') 316 try: 317 if self.timeout: 318 self.timeout = float(self.timeout) 319 except: 320 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) 321 seendata = True 322 323 ready = self.idle_commands(.1, fds) 324 325 if self.idle: 326 self.idle.join() 327 328 serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname)) 329 # Remove the socket file so we don't get any more connections to avoid races 330 # The build directory could have been renamed so if the file isn't the one we created 331 # we shouldn't delete it. 332 try: 333 sockinode = os.stat(self.sockname)[stat.ST_INO] 334 if sockinode == self.sockinode: 335 os.unlink(self.sockname) 336 else: 337 serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode)) 338 except Exception as err: 339 serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err)) 340 self.sock.close() 341 342 try: 343 self.cooker.shutdown(True, idle=False) 344 self.cooker.notifier.stop() 345 self.cooker.confignotifier.stop() 346 except: 347 pass 348 349 self.cooker.post_serve() 350 351 if len(threading.enumerate()) != 1: 352 serverlog("More than one thread left?: " + str(threading.enumerate())) 353 354 # Flush logs before we release the lock 355 sys.stdout.flush() 356 sys.stderr.flush() 357 358 # Finally release the lockfile but warn about other processes holding it open 359 lock = self.bitbake_lock 360 lockfile = self.bitbake_lock_name 361 362 def get_lock_contents(lockfile): 363 try: 364 with open(lockfile, "r") as f: 365 return f.readlines() 366 except FileNotFoundError: 367 return None 368 369 lock.close() 370 lock = None 371 372 while not lock: 373 i = 0 374 lock = None 375 if not os.path.exists(os.path.basename(lockfile)): 376 serverlog("Lockfile directory gone, exiting.") 377 return 378 379 while not lock and i < 30: 380 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) 381 if not lock: 382 newlockcontents = get_lock_contents(lockfile) 383 if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]): 384 # A new server was started, the lockfile contents changed, we can exit 385 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) 386 return 387 time.sleep(0.1) 388 i += 1 389 if lock: 390 # We hold the lock so we can remove the file (hide stale pid data) 391 # via unlockfile. 392 bb.utils.unlockfile(lock) 393 serverlog("Exiting as we could obtain the lock") 394 return 395 396 if not lock: 397 procs = get_lockfile_process_msg(lockfile) 398 msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"] 399 if procs: 400 msg.append(":\n%s" % procs) 401 serverlog("".join(msg)) 402 403 def idle_thread(self): 404 def remove_idle_func(function): 405 with bb.utils.lock_timeout(self._idlefuncsLock): 406 del self._idlefuns[function] 407 self.idle_cond.notify_all() 408 409 while not self.quit: 410 nextsleep = 0.1 411 fds = [] 412 413 try: 414 self.cooker.process_inotify_updates() 415 except Exception as exc: 416 serverlog("Exception %s in inofify updates broke the idle_thread, exiting" % traceback.format_exc()) 417 self.quit = True 418 419 with bb.utils.lock_timeout(self._idlefuncsLock): 420 items = list(self._idlefuns.items()) 421 422 for function, data in items: 423 try: 424 retval = function(self, data, False) 425 if isinstance(retval, idleFinish): 426 serverlog("Removing idle function %s at idleFinish" % str(function)) 427 remove_idle_func(function) 428 self.cooker.command.finishAsyncCommand(retval.msg) 429 nextsleep = None 430 elif retval is False: 431 serverlog("Removing idle function %s" % str(function)) 432 remove_idle_func(function) 433 nextsleep = None 434 elif retval is True: 435 nextsleep = None 436 elif isinstance(retval, float) and nextsleep: 437 if (retval < nextsleep): 438 nextsleep = retval 439 elif nextsleep is None: 440 continue 441 else: 442 fds = fds + retval 443 except SystemExit: 444 raise 445 except Exception as exc: 446 if not isinstance(exc, bb.BBHandledException): 447 logger.exception('Running idle function') 448 remove_idle_func(function) 449 serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc()) 450 self.quit = True 451 452 # Create new heartbeat event? 453 now = time.time() 454 if bb.event._heartbeat_enabled and now >= self.next_heartbeat: 455 # We might have missed heartbeats. Just trigger once in 456 # that case and continue after the usual delay. 457 self.next_heartbeat += self.heartbeat_seconds 458 if self.next_heartbeat <= now: 459 self.next_heartbeat = now + self.heartbeat_seconds 460 if hasattr(self.cooker, "data"): 461 heartbeat = bb.event.HeartbeatEvent(now) 462 try: 463 bb.event.fire(heartbeat, self.cooker.data) 464 except Exception as exc: 465 if not isinstance(exc, bb.BBHandledException): 466 logger.exception('Running heartbeat function') 467 serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc()) 468 self.quit = True 469 if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat: 470 # Shorten timeout so that we we wake up in time for 471 # the heartbeat. 472 nextsleep = self.next_heartbeat - now 473 474 if nextsleep is not None: 475 select.select(fds,[],[],nextsleep)[0] 476 477 def idle_commands(self, delay, fds=None): 478 nextsleep = delay 479 if not fds: 480 fds = [] 481 482 if not self.idle: 483 self.idle = threading.Thread(target=self.idle_thread) 484 self.idle.start() 485 elif self.idle and not self.idle.is_alive(): 486 serverlog("Idle thread terminated, main thread exiting too") 487 bb.error("Idle thread terminated, main thread exiting too") 488 self.quit = True 489 490 if nextsleep is not None: 491 if self.xmlrpc: 492 nextsleep = self.xmlrpc.get_timeout(nextsleep) 493 try: 494 return select.select(fds,[],[],nextsleep)[0] 495 except InterruptedError: 496 # Ignore EINTR 497 return [] 498 else: 499 return select.select(fds,[],[],0)[0] 500 501 502class ServerCommunicator(): 503 def __init__(self, connection, recv): 504 self.connection = connection 505 self.recv = recv 506 507 def runCommand(self, command): 508 self.connection.send(command) 509 if not self.recv.poll(30): 510 logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime())) 511 if not self.recv.poll(30): 512 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime()) 513 ret, exc = self.recv.get() 514 # Should probably turn all exceptions in exc back into exceptions? 515 # For now, at least handle BBHandledException 516 if exc and ("BBHandledException" in exc or "SystemExit" in exc): 517 raise bb.BBHandledException() 518 return ret, exc 519 520 def updateFeatureSet(self, featureset): 521 _, error = self.runCommand(["setFeatures", featureset]) 522 if error: 523 logger.error("Unable to set the cooker to the correct featureset: %s" % error) 524 raise BaseException(error) 525 526 def getEventHandle(self): 527 handle, error = self.runCommand(["getUIHandlerNum"]) 528 if error: 529 logger.error("Unable to get UI Handler Number: %s" % error) 530 raise BaseException(error) 531 532 return handle 533 534 def terminateServer(self): 535 self.connection.send(['terminateServer']) 536 return 537 538class BitBakeProcessServerConnection(object): 539 def __init__(self, ui_channel, recv, eq, sock): 540 self.connection = ServerCommunicator(ui_channel, recv) 541 self.events = eq 542 # Save sock so it doesn't get gc'd for the life of our connection 543 self.socket_connection = sock 544 545 def terminate(self): 546 self.events.close() 547 self.socket_connection.close() 548 self.connection.connection.close() 549 self.connection.recv.close() 550 return 551 552start_log_format = '--- Starting bitbake server pid %s at %s ---' 553start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' 554 555class BitBakeServer(object): 556 557 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile): 558 559 self.server_timeout = server_timeout 560 self.xmlrpcinterface = xmlrpcinterface 561 self.featureset = featureset 562 self.sockname = sockname 563 self.bitbake_lock = lock 564 self.profile = profile 565 self.readypipe, self.readypipein = os.pipe() 566 567 # Place the log in the builddirectory alongside the lock file 568 logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log") 569 self.logfile = logfile 570 571 startdatetime = datetime.datetime.now() 572 bb.daemonize.createDaemon(self._startServer, logfile) 573 self.bitbake_lock.close() 574 os.close(self.readypipein) 575 576 ready = ConnectionReader(self.readypipe) 577 r = ready.poll(5) 578 if not r: 579 bb.note("Bitbake server didn't start within 5 seconds, waiting for 90") 580 r = ready.poll(90) 581 if r: 582 try: 583 r = ready.get() 584 except EOFError: 585 # Trap the child exiting/closing the pipe and error out 586 r = None 587 if not r or r[0] != "r": 588 ready.close() 589 bb.error("Unable to start bitbake server (%s)" % str(r)) 590 if os.path.exists(logfile): 591 logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)')) 592 started = False 593 lines = [] 594 lastlines = [] 595 with open(logfile, "r") as f: 596 for line in f: 597 if started: 598 lines.append(line) 599 else: 600 lastlines.append(line) 601 res = logstart_re.search(line.rstrip()) 602 if res: 603 ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format) 604 if ldatetime >= startdatetime: 605 started = True 606 lines.append(line) 607 if len(lastlines) > 60: 608 lastlines = lastlines[-60:] 609 if lines: 610 if len(lines) > 60: 611 bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:]))) 612 else: 613 bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines))) 614 elif lastlines: 615 bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines))) 616 else: 617 bb.error("%s doesn't exist" % logfile) 618 619 raise SystemExit(1) 620 621 ready.close() 622 623 def _startServer(self): 624 os.close(self.readypipe) 625 os.set_inheritable(self.bitbake_lock.fileno(), True) 626 os.set_inheritable(self.readypipein, True) 627 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") 628 os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) 629 630def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile): 631 632 import bb.cookerdata 633 import bb.cooker 634 635 serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format))) 636 637 try: 638 bitbake_lock = os.fdopen(lockfd, "w") 639 640 # Create server control socket 641 if os.path.exists(sockname): 642 serverlog("WARNING: removing existing socket file '%s'" % sockname) 643 os.unlink(sockname) 644 645 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 646 # AF_UNIX has path length issues so chdir here to workaround 647 cwd = os.getcwd() 648 try: 649 os.chdir(os.path.dirname(sockname)) 650 sock.bind(os.path.basename(sockname)) 651 finally: 652 os.chdir(cwd) 653 sock.listen(1) 654 655 server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface) 656 writer = ConnectionWriter(readypipeinfd) 657 try: 658 featureset = [] 659 cooker = bb.cooker.BBCooker(featureset, server) 660 cooker.configuration.profile = profile 661 except bb.BBHandledException: 662 return None 663 writer.send("r") 664 writer.close() 665 server.cooker = cooker 666 serverlog("Started bitbake server pid %d" % os.getpid()) 667 668 server.run() 669 finally: 670 # Flush any messages/errors to the logfile before exit 671 sys.stdout.flush() 672 sys.stderr.flush() 673 674def connectProcessServer(sockname, featureset): 675 # Connect to socket 676 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 677 # AF_UNIX has path length issues so chdir here to workaround 678 cwd = os.getcwd() 679 680 readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None 681 eq = command_chan_recv = command_chan = None 682 683 sock.settimeout(10) 684 685 try: 686 try: 687 os.chdir(os.path.dirname(sockname)) 688 finished = False 689 while not finished: 690 try: 691 sock.connect(os.path.basename(sockname)) 692 finished = True 693 except IOError as e: 694 if e.errno == errno.EWOULDBLOCK: 695 pass 696 raise 697 finally: 698 os.chdir(cwd) 699 700 # Send an fd for the remote to write events to 701 readfd, writefd = os.pipe() 702 eq = BBUIEventQueue(readfd) 703 # Send an fd for the remote to recieve commands from 704 readfd1, writefd1 = os.pipe() 705 command_chan = ConnectionWriter(writefd1) 706 # Send an fd for the remote to write commands results to 707 readfd2, writefd2 = os.pipe() 708 command_chan_recv = ConnectionReader(readfd2) 709 710 sendfds(sock, [writefd, readfd1, writefd2]) 711 712 server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock) 713 714 # Close the ends of the pipes we won't use 715 for i in [writefd, readfd1, writefd2]: 716 os.close(i) 717 718 server_connection.connection.updateFeatureSet(featureset) 719 720 except (Exception, SystemExit) as e: 721 if command_chan_recv: 722 command_chan_recv.close() 723 if command_chan: 724 command_chan.close() 725 for i in [writefd, readfd1, writefd2]: 726 try: 727 if i: 728 os.close(i) 729 except OSError: 730 pass 731 sock.close() 732 raise 733 734 return server_connection 735 736def sendfds(sock, fds): 737 '''Send an array of fds over an AF_UNIX socket.''' 738 fds = array.array('i', fds) 739 msg = bytes([len(fds) % 256]) 740 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) 741 742def recvfds(sock, size): 743 '''Receive an array of fds over an AF_UNIX socket.''' 744 a = array.array('i') 745 bytes_size = a.itemsize * size 746 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size)) 747 if not msg and not ancdata: 748 raise EOFError 749 try: 750 if len(ancdata) != 1: 751 raise RuntimeError('received %d items of ancdata' % 752 len(ancdata)) 753 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 754 if (cmsg_level == socket.SOL_SOCKET and 755 cmsg_type == socket.SCM_RIGHTS): 756 if len(cmsg_data) % a.itemsize != 0: 757 raise ValueError 758 a.frombytes(cmsg_data) 759 assert len(a) % 256 == msg[0] 760 return list(a) 761 except (ValueError, IndexError): 762 pass 763 raise RuntimeError('Invalid data received') 764 765class BBUIEventQueue: 766 def __init__(self, readfd): 767 768 self.eventQueue = [] 769 self.eventQueueLock = threading.Lock() 770 self.eventQueueNotify = threading.Event() 771 772 self.reader = ConnectionReader(readfd) 773 774 self.t = threading.Thread() 775 self.t.run = self.startCallbackHandler 776 self.t.start() 777 778 def getEvent(self): 779 with bb.utils.lock_timeout(self.eventQueueLock): 780 if len(self.eventQueue) == 0: 781 return None 782 783 item = self.eventQueue.pop(0) 784 if len(self.eventQueue) == 0: 785 self.eventQueueNotify.clear() 786 787 return item 788 789 def waitEvent(self, delay): 790 self.eventQueueNotify.wait(delay) 791 return self.getEvent() 792 793 def queue_event(self, event): 794 with bb.utils.lock_timeout(self.eventQueueLock): 795 self.eventQueue.append(event) 796 self.eventQueueNotify.set() 797 798 def send_event(self, event): 799 self.queue_event(pickle.loads(event)) 800 801 def startCallbackHandler(self): 802 bb.utils.set_process_name("UIEventQueue") 803 while True: 804 try: 805 ready = self.reader.wait(0.25) 806 if ready: 807 event = self.reader.get() 808 self.queue_event(event) 809 except (EOFError, OSError, TypeError): 810 # Easiest way to exit is to close the file descriptor to cause an exit 811 break 812 813 def close(self): 814 self.reader.close() 815 self.t.join() 816 817class ConnectionReader(object): 818 819 def __init__(self, fd): 820 self.reader = multiprocessing.connection.Connection(fd, writable=False) 821 self.rlock = multiprocessing.Lock() 822 823 def wait(self, timeout=None): 824 return multiprocessing.connection.wait([self.reader], timeout) 825 826 def poll(self, timeout=None): 827 return self.reader.poll(timeout) 828 829 def get(self): 830 with bb.utils.lock_timeout(self.rlock): 831 res = self.reader.recv_bytes() 832 return multiprocessing.reduction.ForkingPickler.loads(res) 833 834 def fileno(self): 835 return self.reader.fileno() 836 837 def close(self): 838 return self.reader.close() 839 840 841class ConnectionWriter(object): 842 843 def __init__(self, fd): 844 self.writer = multiprocessing.connection.Connection(fd, readable=False) 845 self.wlock = multiprocessing.Lock() 846 # Why bb.event needs this I have no idea 847 self.event = self 848 849 def _send(self, obj): 850 gc.disable() 851 with bb.utils.lock_timeout(self.wlock): 852 self.writer.send_bytes(obj) 853 gc.enable() 854 855 def send(self, obj): 856 obj = multiprocessing.reduction.ForkingPickler.dumps(obj) 857 # See notes/code in CookerParser 858 # We must not terminate holding this lock else processes will hang. 859 # For SIGTERM, raising afterwards avoids this. 860 # For SIGINT, we don't want to have written partial data to the pipe. 861 # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139 862 process = multiprocessing.current_process() 863 if process and hasattr(process, "queue_signals"): 864 with bb.utils.lock_timeout(process.signal_threadlock): 865 process.queue_signals = True 866 self._send(obj) 867 process.queue_signals = False 868 869 while len(process.signal_received) > 0: 870 sig = process.signal_received.pop() 871 process.handle_sig(sig, None) 872 else: 873 self._send(obj) 874 875 def fileno(self): 876 return self.writer.fileno() 877 878 def close(self): 879 return self.writer.close() 880