1# 2# BitBake Process based server. 3# 4# Copyright (C) 2010 Bob Foerster <robert@erafx.com> 5# 6# SPDX-License-Identifier: GPL-2.0-only 7# 8 9""" 10 This module implements a multiprocessing.Process based server for bitbake. 11""" 12 13import bb 14import bb.event 15import logging 16import multiprocessing 17import threading 18import array 19import os 20import sys 21import time 22import select 23import socket 24import subprocess 25import errno 26import re 27import datetime 28import pickle 29import traceback 30import gc 31import stat 32import bb.server.xmlrpcserver 33from bb import daemonize 34from multiprocessing import queues 35 36logger = logging.getLogger('BitBake') 37 38class ProcessTimeout(SystemExit): 39 pass 40 41def currenttime(): 42 return datetime.datetime.now().strftime('%H:%M:%S.%f') 43 44def serverlog(msg): 45 print(str(os.getpid()) + " " + currenttime() + " " + msg) 46 #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server 47 #sys.stdout.flush() 48 49# 50# When we have lockfile issues, try and find infomation about which process is 51# using the lockfile 52# 53def get_lockfile_process_msg(lockfile): 54 # Some systems may not have lsof available 55 procs = None 56 try: 57 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) 58 except subprocess.CalledProcessError: 59 # File was deleted? 60 pass 61 except OSError as e: 62 if e.errno != errno.ENOENT: 63 raise 64 if procs is None: 65 # Fall back to fuser if lsof is unavailable 66 try: 67 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) 68 except subprocess.CalledProcessError: 69 # File was deleted? 70 pass 71 except OSError as e: 72 if e.errno != errno.ENOENT: 73 raise 74 if procs: 75 return procs.decode("utf-8") 76 return None 77 78class idleFinish(): 79 def __init__(self, msg): 80 self.msg = msg 81 82class ProcessServer(): 83 profile_filename = "profile.log" 84 profile_processed_filename = "profile.log.processed" 85 86 def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface): 87 self.command_channel = False 88 self.command_channel_reply = False 89 self.quit = False 90 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. 91 self.next_heartbeat = time.time() 92 93 self.event_handle = None 94 self.hadanyui = False 95 self.haveui = False 96 self.maxuiwait = 30 97 self.xmlrpc = False 98 99 self.idle = None 100 # Need a lock for _idlefuns changes 101 self._idlefuns = {} 102 self._idlefuncsLock = threading.Lock() 103 self.idle_cond = threading.Condition(self._idlefuncsLock) 104 105 self.bitbake_lock = lock 106 self.bitbake_lock_name = lockname 107 self.sock = sock 108 self.sockname = sockname 109 # It is possible the directory may be renamed. Cache the inode of the socket file 110 # so we can tell if things changed. 111 self.sockinode = os.stat(self.sockname)[stat.ST_INO] 112 113 self.server_timeout = server_timeout 114 self.timeout = self.server_timeout 115 self.xmlrpcinterface = xmlrpcinterface 116 117 def register_idle_function(self, function, data): 118 """Register a function to be called while the server is idle""" 119 assert hasattr(function, '__call__') 120 with bb.utils.lock_timeout(self._idlefuncsLock): 121 self._idlefuns[function] = data 122 serverlog("Registering idle function %s" % str(function)) 123 124 def run(self): 125 126 if self.xmlrpcinterface[0]: 127 self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self) 128 129 serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port)) 130 131 try: 132 self.bitbake_lock.seek(0) 133 self.bitbake_lock.truncate() 134 if self.xmlrpc: 135 self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port)) 136 else: 137 self.bitbake_lock.write("%s\n" % (os.getpid())) 138 self.bitbake_lock.flush() 139 except Exception as e: 140 serverlog("Error writing to lock file: %s" % str(e)) 141 pass 142 143 if self.cooker.configuration.profile: 144 try: 145 import cProfile as profile 146 except: 147 import profile 148 prof = profile.Profile() 149 150 ret = profile.Profile.runcall(prof, self.main) 151 152 prof.dump_stats("profile.log") 153 bb.utils.process_profilelog("profile.log") 154 serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") 155 156 else: 157 ret = self.main() 158 159 return ret 160 161 def _idle_check(self): 162 return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None 163 164 def wait_for_idle(self, timeout=30): 165 # Wait for the idle loop to have cleared 166 with bb.utils.lock_timeout(self._idlefuncsLock): 167 return self.idle_cond.wait_for(self._idle_check, timeout) is not False 168 169 def set_async_cmd(self, cmd): 170 with bb.utils.lock_timeout(self._idlefuncsLock): 171 ret = self.idle_cond.wait_for(self._idle_check, 30) 172 if ret is False: 173 return False 174 self.cooker.command.currentAsyncCommand = cmd 175 return True 176 177 def clear_async_cmd(self): 178 with bb.utils.lock_timeout(self._idlefuncsLock): 179 self.cooker.command.currentAsyncCommand = None 180 self.idle_cond.notify_all() 181 182 def get_async_cmd(self): 183 with bb.utils.lock_timeout(self._idlefuncsLock): 184 return self.cooker.command.currentAsyncCommand 185 186 def main(self): 187 self.cooker.pre_serve() 188 189 bb.utils.set_process_name("Cooker") 190 191 ready = [] 192 newconnections = [] 193 194 self.controllersock = False 195 fds = [self.sock] 196 if self.xmlrpc: 197 fds.append(self.xmlrpc) 198 seendata = False 199 serverlog("Entering server connection loop") 200 serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname))) 201 202 def disconnect_client(self, fds): 203 serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname)) 204 if self.controllersock: 205 fds.remove(self.controllersock) 206 self.controllersock.close() 207 self.controllersock = False 208 if self.haveui: 209 # Wait for the idle loop to have cleared (30s max) 210 if not self.wait_for_idle(30): 211 serverlog("Idle loop didn't finish queued commands after 30s, exiting.") 212 self.quit = True 213 fds.remove(self.command_channel) 214 bb.event.unregister_UIHhandler(self.event_handle, True) 215 self.command_channel_reply.writer.close() 216 self.event_writer.writer.close() 217 self.command_channel.close() 218 self.command_channel = False 219 del self.event_writer 220 self.lastui = time.time() 221 self.cooker.clientComplete() 222 self.haveui = False 223 ready = select.select(fds,[],[],0)[0] 224 if newconnections and not self.quit: 225 serverlog("Starting new client") 226 conn = newconnections.pop(-1) 227 fds.append(conn) 228 self.controllersock = conn 229 elif not self.timeout and not ready: 230 serverlog("No timeout, exiting.") 231 self.quit = True 232 233 self.lastui = time.time() 234 while not self.quit: 235 if self.sock in ready: 236 while select.select([self.sock],[],[],0)[0]: 237 controllersock, address = self.sock.accept() 238 if self.controllersock: 239 serverlog("Queuing %s (%s)" % (str(ready), str(newconnections))) 240 newconnections.append(controllersock) 241 else: 242 serverlog("Accepting %s (%s)" % (str(ready), str(newconnections))) 243 self.controllersock = controllersock 244 fds.append(controllersock) 245 if self.controllersock in ready: 246 try: 247 serverlog("Processing Client") 248 ui_fds = recvfds(self.controllersock, 3) 249 serverlog("Connecting Client") 250 251 # Where to write events to 252 writer = ConnectionWriter(ui_fds[0]) 253 self.event_handle = bb.event.register_UIHhandler(writer, True) 254 self.event_writer = writer 255 256 # Where to read commands from 257 reader = ConnectionReader(ui_fds[1]) 258 fds.append(reader) 259 self.command_channel = reader 260 261 # Where to send command return values to 262 writer = ConnectionWriter(ui_fds[2]) 263 self.command_channel_reply = writer 264 265 self.haveui = True 266 self.hadanyui = True 267 268 except (EOFError, OSError): 269 disconnect_client(self, fds) 270 271 if not self.timeout == -1.0 and not self.haveui and self.timeout and \ 272 (self.lastui + self.timeout) < time.time(): 273 serverlog("Server timeout, exiting.") 274 self.quit = True 275 276 # If we don't see a UI connection within maxuiwait, its unlikely we're going to see 277 # one. We have had issue with processes hanging indefinitely so timing out UI-less 278 # servers is useful. 279 if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time(): 280 serverlog("No UI connection within max timeout, exiting to avoid infinite loop.") 281 self.quit = True 282 283 if self.command_channel in ready: 284 try: 285 command = self.command_channel.get() 286 except EOFError: 287 # Client connection shutting down 288 ready = [] 289 disconnect_client(self, fds) 290 continue 291 if command[0] == "terminateServer": 292 self.quit = True 293 continue 294 try: 295 serverlog("Running command %s" % command) 296 reply = self.cooker.command.runCommand(command, self) 297 serverlog("Sending reply %s" % repr(reply)) 298 self.command_channel_reply.send(reply) 299 serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname)) 300 except Exception as e: 301 stack = traceback.format_exc() 302 serverlog('Exception in server main event loop running command %s (%s)' % (command, stack)) 303 logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack)) 304 305 if self.xmlrpc in ready: 306 self.xmlrpc.handle_requests() 307 308 if not seendata and hasattr(self.cooker, "data"): 309 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') 310 if heartbeat_event: 311 try: 312 self.heartbeat_seconds = float(heartbeat_event) 313 except: 314 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) 315 316 self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT') 317 try: 318 if self.timeout: 319 self.timeout = float(self.timeout) 320 except: 321 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) 322 seendata = True 323 324 ready = self.idle_commands(.1, fds) 325 326 if self.idle: 327 self.idle.join() 328 329 serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname)) 330 # Remove the socket file so we don't get any more connections to avoid races 331 # The build directory could have been renamed so if the file isn't the one we created 332 # we shouldn't delete it. 333 try: 334 sockinode = os.stat(self.sockname)[stat.ST_INO] 335 if sockinode == self.sockinode: 336 os.unlink(self.sockname) 337 else: 338 serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode)) 339 except Exception as err: 340 serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err)) 341 self.sock.close() 342 343 try: 344 self.cooker.shutdown(True, idle=False) 345 self.cooker.notifier.stop() 346 self.cooker.confignotifier.stop() 347 except: 348 pass 349 350 self.cooker.post_serve() 351 352 if len(threading.enumerate()) != 1: 353 serverlog("More than one thread left?: " + str(threading.enumerate())) 354 355 # Flush logs before we release the lock 356 sys.stdout.flush() 357 sys.stderr.flush() 358 359 # Finally release the lockfile but warn about other processes holding it open 360 lock = self.bitbake_lock 361 lockfile = self.bitbake_lock_name 362 363 def get_lock_contents(lockfile): 364 try: 365 with open(lockfile, "r") as f: 366 return f.readlines() 367 except FileNotFoundError: 368 return None 369 370 lock.close() 371 lock = None 372 373 while not lock: 374 i = 0 375 lock = None 376 if not os.path.exists(os.path.basename(lockfile)): 377 serverlog("Lockfile directory gone, exiting.") 378 return 379 380 while not lock and i < 30: 381 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) 382 if not lock: 383 newlockcontents = get_lock_contents(lockfile) 384 if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]): 385 # A new server was started, the lockfile contents changed, we can exit 386 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) 387 return 388 time.sleep(0.1) 389 i += 1 390 if lock: 391 # We hold the lock so we can remove the file (hide stale pid data) 392 # via unlockfile. 393 bb.utils.unlockfile(lock) 394 serverlog("Exiting as we could obtain the lock") 395 return 396 397 if not lock: 398 procs = get_lockfile_process_msg(lockfile) 399 msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"] 400 if procs: 401 msg.append(":\n%s" % procs) 402 serverlog("".join(msg)) 403 404 def idle_thread(self): 405 if self.cooker.configuration.profile: 406 try: 407 import cProfile as profile 408 except: 409 import profile 410 prof = profile.Profile() 411 412 ret = profile.Profile.runcall(prof, self.idle_thread_internal) 413 414 prof.dump_stats("profile-mainloop.log") 415 bb.utils.process_profilelog("profile-mainloop.log") 416 serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed") 417 else: 418 self.idle_thread_internal() 419 420 def idle_thread_internal(self): 421 def remove_idle_func(function): 422 with bb.utils.lock_timeout(self._idlefuncsLock): 423 del self._idlefuns[function] 424 self.idle_cond.notify_all() 425 426 while not self.quit: 427 nextsleep = 0.1 428 fds = [] 429 430 with bb.utils.lock_timeout(self._idlefuncsLock): 431 items = list(self._idlefuns.items()) 432 433 for function, data in items: 434 try: 435 retval = function(self, data, False) 436 if isinstance(retval, idleFinish): 437 serverlog("Removing idle function %s at idleFinish" % str(function)) 438 remove_idle_func(function) 439 self.cooker.command.finishAsyncCommand(retval.msg) 440 nextsleep = None 441 elif retval is False: 442 serverlog("Removing idle function %s" % str(function)) 443 remove_idle_func(function) 444 nextsleep = None 445 elif retval is True: 446 nextsleep = None 447 elif isinstance(retval, float) and nextsleep: 448 if (retval < nextsleep): 449 nextsleep = retval 450 elif nextsleep is None: 451 continue 452 else: 453 fds = fds + retval 454 except SystemExit: 455 raise 456 except Exception as exc: 457 if not isinstance(exc, bb.BBHandledException): 458 logger.exception('Running idle function') 459 remove_idle_func(function) 460 serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc()) 461 self.quit = True 462 463 # Create new heartbeat event? 464 now = time.time() 465 if bb.event._heartbeat_enabled and now >= self.next_heartbeat: 466 # We might have missed heartbeats. Just trigger once in 467 # that case and continue after the usual delay. 468 self.next_heartbeat += self.heartbeat_seconds 469 if self.next_heartbeat <= now: 470 self.next_heartbeat = now + self.heartbeat_seconds 471 if hasattr(self.cooker, "data"): 472 heartbeat = bb.event.HeartbeatEvent(now) 473 try: 474 bb.event.fire(heartbeat, self.cooker.data) 475 except Exception as exc: 476 if not isinstance(exc, bb.BBHandledException): 477 logger.exception('Running heartbeat function') 478 serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc()) 479 self.quit = True 480 if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat: 481 # Shorten timeout so that we we wake up in time for 482 # the heartbeat. 483 nextsleep = self.next_heartbeat - now 484 485 if nextsleep is not None: 486 select.select(fds,[],[],nextsleep)[0] 487 488 def idle_commands(self, delay, fds=None): 489 nextsleep = delay 490 if not fds: 491 fds = [] 492 493 if not self.idle: 494 self.idle = threading.Thread(target=self.idle_thread) 495 self.idle.start() 496 elif self.idle and not self.idle.is_alive(): 497 serverlog("Idle thread terminated, main thread exiting too") 498 bb.error("Idle thread terminated, main thread exiting too") 499 self.quit = True 500 501 if nextsleep is not None: 502 if self.xmlrpc: 503 nextsleep = self.xmlrpc.get_timeout(nextsleep) 504 try: 505 return select.select(fds,[],[],nextsleep)[0] 506 except InterruptedError: 507 # Ignore EINTR 508 return [] 509 else: 510 return select.select(fds,[],[],0)[0] 511 512 513class ServerCommunicator(): 514 def __init__(self, connection, recv): 515 self.connection = connection 516 self.recv = recv 517 518 def runCommand(self, command): 519 try: 520 self.connection.send(command) 521 except BrokenPipeError as e: 522 raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e 523 if not self.recv.poll(30): 524 logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime())) 525 if not self.recv.poll(30): 526 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime()) 527 try: 528 ret, exc = self.recv.get() 529 except EOFError as e: 530 raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e 531 # Should probably turn all exceptions in exc back into exceptions? 532 # For now, at least handle BBHandledException 533 if exc and ("BBHandledException" in exc or "SystemExit" in exc): 534 raise bb.BBHandledException() 535 return ret, exc 536 537 def updateFeatureSet(self, featureset): 538 _, error = self.runCommand(["setFeatures", featureset]) 539 if error: 540 logger.error("Unable to set the cooker to the correct featureset: %s" % error) 541 raise BaseException(error) 542 543 def getEventHandle(self): 544 handle, error = self.runCommand(["getUIHandlerNum"]) 545 if error: 546 logger.error("Unable to get UI Handler Number: %s" % error) 547 raise BaseException(error) 548 549 return handle 550 551 def terminateServer(self): 552 self.connection.send(['terminateServer']) 553 return 554 555class BitBakeProcessServerConnection(object): 556 def __init__(self, ui_channel, recv, eq, sock): 557 self.connection = ServerCommunicator(ui_channel, recv) 558 self.events = eq 559 # Save sock so it doesn't get gc'd for the life of our connection 560 self.socket_connection = sock 561 562 def terminate(self): 563 self.events.close() 564 self.socket_connection.close() 565 self.connection.connection.close() 566 self.connection.recv.close() 567 return 568 569start_log_format = '--- Starting bitbake server pid %s at %s ---' 570start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' 571 572class BitBakeServer(object): 573 574 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile): 575 576 self.server_timeout = server_timeout 577 self.xmlrpcinterface = xmlrpcinterface 578 self.featureset = featureset 579 self.sockname = sockname 580 self.bitbake_lock = lock 581 self.profile = profile 582 self.readypipe, self.readypipein = os.pipe() 583 584 # Place the log in the builddirectory alongside the lock file 585 logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log") 586 self.logfile = logfile 587 588 startdatetime = datetime.datetime.now() 589 bb.daemonize.createDaemon(self._startServer, logfile) 590 self.bitbake_lock.close() 591 os.close(self.readypipein) 592 593 ready = ConnectionReader(self.readypipe) 594 r = ready.poll(5) 595 if not r: 596 bb.note("Bitbake server didn't start within 5 seconds, waiting for 90") 597 r = ready.poll(90) 598 if r: 599 try: 600 r = ready.get() 601 except EOFError: 602 # Trap the child exiting/closing the pipe and error out 603 r = None 604 if not r or r[0] != "r": 605 ready.close() 606 bb.error("Unable to start bitbake server (%s)" % str(r)) 607 if os.path.exists(logfile): 608 logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)')) 609 started = False 610 lines = [] 611 lastlines = [] 612 with open(logfile, "r") as f: 613 for line in f: 614 if started: 615 lines.append(line) 616 else: 617 lastlines.append(line) 618 res = logstart_re.search(line.rstrip()) 619 if res: 620 ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format) 621 if ldatetime >= startdatetime: 622 started = True 623 lines.append(line) 624 if len(lastlines) > 60: 625 lastlines = lastlines[-60:] 626 if lines: 627 if len(lines) > 60: 628 bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:]))) 629 else: 630 bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines))) 631 elif lastlines: 632 bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines))) 633 else: 634 bb.error("%s doesn't exist" % logfile) 635 636 raise SystemExit(1) 637 638 ready.close() 639 640 def _startServer(self): 641 os.close(self.readypipe) 642 os.set_inheritable(self.bitbake_lock.fileno(), True) 643 os.set_inheritable(self.readypipein, True) 644 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") 645 os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) 646 647def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile): 648 649 import bb.cookerdata 650 import bb.cooker 651 652 serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format))) 653 654 try: 655 bitbake_lock = os.fdopen(lockfd, "w") 656 657 # Create server control socket 658 if os.path.exists(sockname): 659 serverlog("WARNING: removing existing socket file '%s'" % sockname) 660 os.unlink(sockname) 661 662 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 663 # AF_UNIX has path length issues so chdir here to workaround 664 cwd = os.getcwd() 665 try: 666 os.chdir(os.path.dirname(sockname)) 667 sock.bind(os.path.basename(sockname)) 668 finally: 669 os.chdir(cwd) 670 sock.listen(1) 671 672 server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface) 673 writer = ConnectionWriter(readypipeinfd) 674 try: 675 featureset = [] 676 cooker = bb.cooker.BBCooker(featureset, server) 677 cooker.configuration.profile = profile 678 except bb.BBHandledException: 679 return None 680 writer.send("r") 681 writer.close() 682 server.cooker = cooker 683 serverlog("Started bitbake server pid %d" % os.getpid()) 684 685 server.run() 686 finally: 687 # Flush any messages/errors to the logfile before exit 688 sys.stdout.flush() 689 sys.stderr.flush() 690 691def connectProcessServer(sockname, featureset): 692 # Connect to socket 693 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 694 # AF_UNIX has path length issues so chdir here to workaround 695 cwd = os.getcwd() 696 697 readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None 698 eq = command_chan_recv = command_chan = None 699 700 sock.settimeout(10) 701 702 try: 703 try: 704 os.chdir(os.path.dirname(sockname)) 705 finished = False 706 while not finished: 707 try: 708 sock.connect(os.path.basename(sockname)) 709 finished = True 710 except IOError as e: 711 if e.errno == errno.EWOULDBLOCK: 712 pass 713 raise 714 finally: 715 os.chdir(cwd) 716 717 # Send an fd for the remote to write events to 718 readfd, writefd = os.pipe() 719 eq = BBUIEventQueue(readfd) 720 # Send an fd for the remote to recieve commands from 721 readfd1, writefd1 = os.pipe() 722 command_chan = ConnectionWriter(writefd1) 723 # Send an fd for the remote to write commands results to 724 readfd2, writefd2 = os.pipe() 725 command_chan_recv = ConnectionReader(readfd2) 726 727 sendfds(sock, [writefd, readfd1, writefd2]) 728 729 server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock) 730 731 # Close the ends of the pipes we won't use 732 for i in [writefd, readfd1, writefd2]: 733 os.close(i) 734 735 server_connection.connection.updateFeatureSet(featureset) 736 737 except (Exception, SystemExit) as e: 738 if command_chan_recv: 739 command_chan_recv.close() 740 if command_chan: 741 command_chan.close() 742 for i in [writefd, readfd1, writefd2]: 743 try: 744 if i: 745 os.close(i) 746 except OSError: 747 pass 748 sock.close() 749 raise 750 751 return server_connection 752 753def sendfds(sock, fds): 754 '''Send an array of fds over an AF_UNIX socket.''' 755 fds = array.array('i', fds) 756 msg = bytes([len(fds) % 256]) 757 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) 758 759def recvfds(sock, size): 760 '''Receive an array of fds over an AF_UNIX socket.''' 761 a = array.array('i') 762 bytes_size = a.itemsize * size 763 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size)) 764 if not msg and not ancdata: 765 raise EOFError 766 try: 767 if len(ancdata) != 1: 768 raise RuntimeError('received %d items of ancdata' % 769 len(ancdata)) 770 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 771 if (cmsg_level == socket.SOL_SOCKET and 772 cmsg_type == socket.SCM_RIGHTS): 773 if len(cmsg_data) % a.itemsize != 0: 774 raise ValueError 775 a.frombytes(cmsg_data) 776 assert len(a) % 256 == msg[0] 777 return list(a) 778 except (ValueError, IndexError): 779 pass 780 raise RuntimeError('Invalid data received') 781 782class BBUIEventQueue: 783 def __init__(self, readfd): 784 785 self.eventQueue = [] 786 self.eventQueueLock = threading.Lock() 787 self.eventQueueNotify = threading.Event() 788 789 self.reader = ConnectionReader(readfd) 790 791 self.t = threading.Thread() 792 self.t.run = self.startCallbackHandler 793 self.t.start() 794 795 def getEvent(self): 796 with bb.utils.lock_timeout(self.eventQueueLock): 797 if len(self.eventQueue) == 0: 798 return None 799 800 item = self.eventQueue.pop(0) 801 if len(self.eventQueue) == 0: 802 self.eventQueueNotify.clear() 803 804 return item 805 806 def waitEvent(self, delay): 807 self.eventQueueNotify.wait(delay) 808 return self.getEvent() 809 810 def queue_event(self, event): 811 with bb.utils.lock_timeout(self.eventQueueLock): 812 self.eventQueue.append(event) 813 self.eventQueueNotify.set() 814 815 def send_event(self, event): 816 self.queue_event(pickle.loads(event)) 817 818 def startCallbackHandler(self): 819 bb.utils.set_process_name("UIEventQueue") 820 while True: 821 try: 822 ready = self.reader.wait(0.25) 823 if ready: 824 event = self.reader.get() 825 self.queue_event(event) 826 except (EOFError, OSError, TypeError): 827 # Easiest way to exit is to close the file descriptor to cause an exit 828 break 829 830 def close(self): 831 self.reader.close() 832 self.t.join() 833 834class ConnectionReader(object): 835 836 def __init__(self, fd): 837 self.reader = multiprocessing.connection.Connection(fd, writable=False) 838 self.rlock = multiprocessing.Lock() 839 840 def wait(self, timeout=None): 841 return multiprocessing.connection.wait([self.reader], timeout) 842 843 def poll(self, timeout=None): 844 return self.reader.poll(timeout) 845 846 def get(self): 847 with bb.utils.lock_timeout(self.rlock): 848 res = self.reader.recv_bytes() 849 return multiprocessing.reduction.ForkingPickler.loads(res) 850 851 def fileno(self): 852 return self.reader.fileno() 853 854 def close(self): 855 return self.reader.close() 856 857 858class ConnectionWriter(object): 859 860 def __init__(self, fd): 861 self.writer = multiprocessing.connection.Connection(fd, readable=False) 862 self.wlock = multiprocessing.Lock() 863 # Why bb.event needs this I have no idea 864 self.event = self 865 866 def _send(self, obj): 867 gc.disable() 868 with bb.utils.lock_timeout(self.wlock): 869 self.writer.send_bytes(obj) 870 gc.enable() 871 872 def send(self, obj): 873 obj = multiprocessing.reduction.ForkingPickler.dumps(obj) 874 # See notes/code in CookerParser 875 # We must not terminate holding this lock else processes will hang. 876 # For SIGTERM, raising afterwards avoids this. 877 # For SIGINT, we don't want to have written partial data to the pipe. 878 # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139 879 process = multiprocessing.current_process() 880 if process and hasattr(process, "queue_signals"): 881 with bb.utils.lock_timeout(process.signal_threadlock): 882 process.queue_signals = True 883 self._send(obj) 884 process.queue_signals = False 885 886 while len(process.signal_received) > 0: 887 sig = process.signal_received.pop() 888 process.handle_sig(sig, None) 889 else: 890 self._send(obj) 891 892 def fileno(self): 893 return self.writer.fileno() 894 895 def close(self): 896 return self.writer.close() 897