1# 2# BitBake Process based server. 3# 4# Copyright (C) 2010 Bob Foerster <robert@erafx.com> 5# 6# SPDX-License-Identifier: GPL-2.0-only 7# 8 9""" 10 This module implements a multiprocessing.Process based server for bitbake. 11""" 12 13import bb 14import bb.event 15import logging 16import multiprocessing 17import threading 18import array 19import os 20import sys 21import time 22import select 23import socket 24import subprocess 25import errno 26import re 27import datetime 28import pickle 29import traceback 30import gc 31import stat 32import bb.server.xmlrpcserver 33from bb import daemonize 34from multiprocessing import queues 35 36logger = logging.getLogger('BitBake') 37 38class ProcessTimeout(SystemExit): 39 pass 40 41def currenttime(): 42 return datetime.datetime.now().strftime('%H:%M:%S.%f') 43 44def serverlog(msg): 45 print(str(os.getpid()) + " " + currenttime() + " " + msg) 46 #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server 47 #sys.stdout.flush() 48 49# 50# When we have lockfile issues, try and find infomation about which process is 51# using the lockfile 52# 53def get_lockfile_process_msg(lockfile): 54 # Some systems may not have lsof available 55 procs = None 56 try: 57 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) 58 except subprocess.CalledProcessError: 59 # File was deleted? 60 pass 61 except OSError as e: 62 if e.errno != errno.ENOENT: 63 raise 64 if procs is None: 65 # Fall back to fuser if lsof is unavailable 66 try: 67 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) 68 except subprocess.CalledProcessError: 69 # File was deleted? 70 pass 71 except OSError as e: 72 if e.errno != errno.ENOENT: 73 raise 74 if procs: 75 return procs.decode("utf-8") 76 return None 77 78class idleFinish(): 79 def __init__(self, msg): 80 self.msg = msg 81 82class ProcessServer(): 83 profile_filename = "profile.log" 84 profile_processed_filename = "profile.log.processed" 85 86 def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface): 87 self.command_channel = False 88 self.command_channel_reply = False 89 self.quit = False 90 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. 91 self.next_heartbeat = time.time() 92 93 self.event_handle = None 94 self.hadanyui = False 95 self.haveui = False 96 self.maxuiwait = 30 97 self.xmlrpc = False 98 99 self.idle = None 100 # Need a lock for _idlefuns changes 101 self._idlefuns = {} 102 self._idlefuncsLock = threading.Lock() 103 self.idle_cond = threading.Condition(self._idlefuncsLock) 104 105 self.bitbake_lock = lock 106 self.bitbake_lock_name = lockname 107 self.sock = sock 108 self.sockname = sockname 109 # It is possible the directory may be renamed. Cache the inode of the socket file 110 # so we can tell if things changed. 111 self.sockinode = os.stat(self.sockname)[stat.ST_INO] 112 113 self.server_timeout = server_timeout 114 self.timeout = self.server_timeout 115 self.xmlrpcinterface = xmlrpcinterface 116 117 def register_idle_function(self, function, data): 118 """Register a function to be called while the server is idle""" 119 assert hasattr(function, '__call__') 120 with bb.utils.lock_timeout(self._idlefuncsLock): 121 self._idlefuns[function] = data 122 serverlog("Registering idle function %s" % str(function)) 123 124 def run(self): 125 126 if self.xmlrpcinterface[0]: 127 self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self) 128 129 serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port)) 130 131 try: 132 self.bitbake_lock.seek(0) 133 self.bitbake_lock.truncate() 134 if self.xmlrpc: 135 self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port)) 136 else: 137 self.bitbake_lock.write("%s\n" % (os.getpid())) 138 self.bitbake_lock.flush() 139 except Exception as e: 140 serverlog("Error writing to lock file: %s" % str(e)) 141 pass 142 143 if self.cooker.configuration.profile: 144 try: 145 import cProfile as profile 146 except: 147 import profile 148 prof = profile.Profile() 149 150 ret = profile.Profile.runcall(prof, self.main) 151 152 prof.dump_stats("profile.log") 153 bb.utils.process_profilelog("profile.log") 154 serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") 155 156 else: 157 ret = self.main() 158 159 return ret 160 161 def _idle_check(self): 162 return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None 163 164 def wait_for_idle(self, timeout=30): 165 # Wait for the idle loop to have cleared 166 with bb.utils.lock_timeout(self._idlefuncsLock): 167 return self.idle_cond.wait_for(self._idle_check, timeout) is not False 168 169 def set_async_cmd(self, cmd): 170 with bb.utils.lock_timeout(self._idlefuncsLock): 171 ret = self.idle_cond.wait_for(self._idle_check, 30) 172 if ret is False: 173 return False 174 self.cooker.command.currentAsyncCommand = cmd 175 return True 176 177 def clear_async_cmd(self): 178 with bb.utils.lock_timeout(self._idlefuncsLock): 179 self.cooker.command.currentAsyncCommand = None 180 self.idle_cond.notify_all() 181 182 def get_async_cmd(self): 183 with bb.utils.lock_timeout(self._idlefuncsLock): 184 return self.cooker.command.currentAsyncCommand 185 186 def main(self): 187 self.cooker.pre_serve() 188 189 bb.utils.set_process_name("Cooker") 190 191 ready = [] 192 newconnections = [] 193 194 self.controllersock = False 195 fds = [self.sock] 196 if self.xmlrpc: 197 fds.append(self.xmlrpc) 198 seendata = False 199 serverlog("Entering server connection loop") 200 serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname))) 201 202 def disconnect_client(self, fds): 203 serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname)) 204 if self.controllersock: 205 fds.remove(self.controllersock) 206 self.controllersock.close() 207 self.controllersock = False 208 if self.haveui: 209 # Wait for the idle loop to have cleared (30s max) 210 if not self.wait_for_idle(30): 211 serverlog("Idle loop didn't finish queued commands after 30s, exiting.") 212 self.quit = True 213 fds.remove(self.command_channel) 214 bb.event.unregister_UIHhandler(self.event_handle, True) 215 self.command_channel_reply.writer.close() 216 self.event_writer.writer.close() 217 self.command_channel.close() 218 self.command_channel = False 219 del self.event_writer 220 self.lastui = time.time() 221 self.cooker.clientComplete() 222 self.haveui = False 223 ready = select.select(fds,[],[],0)[0] 224 if newconnections and not self.quit: 225 serverlog("Starting new client") 226 conn = newconnections.pop(-1) 227 fds.append(conn) 228 self.controllersock = conn 229 elif not self.timeout and not ready: 230 serverlog("No timeout, exiting.") 231 self.quit = True 232 233 self.lastui = time.time() 234 while not self.quit: 235 if self.sock in ready: 236 while select.select([self.sock],[],[],0)[0]: 237 controllersock, address = self.sock.accept() 238 if self.controllersock: 239 serverlog("Queuing %s (%s)" % (str(ready), str(newconnections))) 240 newconnections.append(controllersock) 241 else: 242 serverlog("Accepting %s (%s)" % (str(ready), str(newconnections))) 243 self.controllersock = controllersock 244 fds.append(controllersock) 245 if self.controllersock in ready: 246 try: 247 serverlog("Processing Client") 248 ui_fds = recvfds(self.controllersock, 3) 249 serverlog("Connecting Client") 250 251 # Where to write events to 252 writer = ConnectionWriter(ui_fds[0]) 253 self.event_handle = bb.event.register_UIHhandler(writer, True) 254 self.event_writer = writer 255 256 # Where to read commands from 257 reader = ConnectionReader(ui_fds[1]) 258 fds.append(reader) 259 self.command_channel = reader 260 261 # Where to send command return values to 262 writer = ConnectionWriter(ui_fds[2]) 263 self.command_channel_reply = writer 264 265 self.haveui = True 266 self.hadanyui = True 267 268 except (EOFError, OSError): 269 disconnect_client(self, fds) 270 271 if not self.timeout == -1.0 and not self.haveui and self.timeout and \ 272 (self.lastui + self.timeout) < time.time(): 273 serverlog("Server timeout, exiting.") 274 self.quit = True 275 276 # If we don't see a UI connection within maxuiwait, its unlikely we're going to see 277 # one. We have had issue with processes hanging indefinitely so timing out UI-less 278 # servers is useful. 279 if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time(): 280 serverlog("No UI connection within max timeout, exiting to avoid infinite loop.") 281 self.quit = True 282 283 if self.command_channel in ready: 284 try: 285 command = self.command_channel.get() 286 except EOFError: 287 # Client connection shutting down 288 ready = [] 289 disconnect_client(self, fds) 290 continue 291 if command[0] == "terminateServer": 292 self.quit = True 293 continue 294 try: 295 serverlog("Running command %s" % command) 296 reply = self.cooker.command.runCommand(command, self) 297 serverlog("Sending reply %s" % repr(reply)) 298 self.command_channel_reply.send(reply) 299 serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname)) 300 except Exception as e: 301 stack = traceback.format_exc() 302 serverlog('Exception in server main event loop running command %s (%s)' % (command, stack)) 303 logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack)) 304 305 if self.xmlrpc in ready: 306 self.xmlrpc.handle_requests() 307 308 if not seendata and hasattr(self.cooker, "data"): 309 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') 310 if heartbeat_event: 311 try: 312 self.heartbeat_seconds = float(heartbeat_event) 313 except: 314 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) 315 316 self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT') 317 try: 318 if self.timeout: 319 self.timeout = float(self.timeout) 320 except: 321 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) 322 seendata = True 323 324 if not self.idle: 325 self.idle = threading.Thread(target=self.idle_thread) 326 self.idle.start() 327 elif self.idle and not self.idle.is_alive(): 328 serverlog("Idle thread terminated, main thread exiting too") 329 bb.error("Idle thread terminated, main thread exiting too") 330 self.quit = True 331 332 nextsleep = 1.0 333 if self.xmlrpc: 334 nextsleep = self.xmlrpc.get_timeout(nextsleep) 335 try: 336 ready = select.select(fds,[],[],nextsleep)[0] 337 except InterruptedError: 338 # Ignore EINTR 339 ready = [] 340 341 if self.idle: 342 self.idle.join() 343 344 serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname)) 345 # Remove the socket file so we don't get any more connections to avoid races 346 # The build directory could have been renamed so if the file isn't the one we created 347 # we shouldn't delete it. 348 try: 349 sockinode = os.stat(self.sockname)[stat.ST_INO] 350 if sockinode == self.sockinode: 351 os.unlink(self.sockname) 352 else: 353 serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode)) 354 except Exception as err: 355 serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err)) 356 self.sock.close() 357 358 try: 359 self.cooker.shutdown(True, idle=False) 360 self.cooker.notifier.stop() 361 self.cooker.confignotifier.stop() 362 except: 363 pass 364 365 self.cooker.post_serve() 366 367 if len(threading.enumerate()) != 1: 368 serverlog("More than one thread left?: " + str(threading.enumerate())) 369 370 # Flush logs before we release the lock 371 sys.stdout.flush() 372 sys.stderr.flush() 373 374 # Finally release the lockfile but warn about other processes holding it open 375 lock = self.bitbake_lock 376 lockfile = self.bitbake_lock_name 377 378 def get_lock_contents(lockfile): 379 try: 380 with open(lockfile, "r") as f: 381 return f.readlines() 382 except FileNotFoundError: 383 return None 384 385 lock.close() 386 lock = None 387 388 while not lock: 389 i = 0 390 lock = None 391 if not os.path.exists(os.path.basename(lockfile)): 392 serverlog("Lockfile directory gone, exiting.") 393 return 394 395 while not lock and i < 30: 396 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) 397 if not lock: 398 newlockcontents = get_lock_contents(lockfile) 399 if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]): 400 # A new server was started, the lockfile contents changed, we can exit 401 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) 402 return 403 time.sleep(0.1) 404 i += 1 405 if lock: 406 # We hold the lock so we can remove the file (hide stale pid data) 407 # via unlockfile. 408 bb.utils.unlockfile(lock) 409 serverlog("Exiting as we could obtain the lock") 410 return 411 412 if not lock: 413 procs = get_lockfile_process_msg(lockfile) 414 msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"] 415 if procs: 416 msg.append(":\n%s" % procs) 417 serverlog("".join(msg)) 418 419 def idle_thread(self): 420 if self.cooker.configuration.profile: 421 try: 422 import cProfile as profile 423 except: 424 import profile 425 prof = profile.Profile() 426 427 ret = profile.Profile.runcall(prof, self.idle_thread_internal) 428 429 prof.dump_stats("profile-mainloop.log") 430 bb.utils.process_profilelog("profile-mainloop.log") 431 serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed") 432 else: 433 self.idle_thread_internal() 434 435 def idle_thread_internal(self): 436 def remove_idle_func(function): 437 with bb.utils.lock_timeout(self._idlefuncsLock): 438 del self._idlefuns[function] 439 self.idle_cond.notify_all() 440 441 while not self.quit: 442 nextsleep = 1.0 443 fds = [] 444 445 with bb.utils.lock_timeout(self._idlefuncsLock): 446 items = list(self._idlefuns.items()) 447 448 for function, data in items: 449 try: 450 retval = function(self, data, False) 451 if isinstance(retval, idleFinish): 452 serverlog("Removing idle function %s at idleFinish" % str(function)) 453 remove_idle_func(function) 454 self.cooker.command.finishAsyncCommand(retval.msg) 455 nextsleep = None 456 elif retval is False: 457 serverlog("Removing idle function %s" % str(function)) 458 remove_idle_func(function) 459 nextsleep = None 460 elif retval is True: 461 nextsleep = None 462 elif isinstance(retval, float) and nextsleep: 463 if (retval < nextsleep): 464 nextsleep = retval 465 elif nextsleep is None: 466 continue 467 else: 468 fds = fds + retval 469 except SystemExit: 470 raise 471 except Exception as exc: 472 if not isinstance(exc, bb.BBHandledException): 473 logger.exception('Running idle function') 474 remove_idle_func(function) 475 serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc()) 476 self.quit = True 477 478 # Create new heartbeat event? 479 now = time.time() 480 if items and bb.event._heartbeat_enabled and now >= self.next_heartbeat: 481 # We might have missed heartbeats. Just trigger once in 482 # that case and continue after the usual delay. 483 self.next_heartbeat += self.heartbeat_seconds 484 if self.next_heartbeat <= now: 485 self.next_heartbeat = now + self.heartbeat_seconds 486 if hasattr(self.cooker, "data"): 487 heartbeat = bb.event.HeartbeatEvent(now) 488 try: 489 bb.event.fire(heartbeat, self.cooker.data) 490 except Exception as exc: 491 if not isinstance(exc, bb.BBHandledException): 492 logger.exception('Running heartbeat function') 493 serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc()) 494 self.quit = True 495 if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat: 496 # Shorten timeout so that we we wake up in time for 497 # the heartbeat. 498 nextsleep = self.next_heartbeat - now 499 500 if nextsleep is not None: 501 select.select(fds,[],[],nextsleep)[0] 502 503class ServerCommunicator(): 504 def __init__(self, connection, recv): 505 self.connection = connection 506 self.recv = recv 507 508 def runCommand(self, command): 509 try: 510 self.connection.send(command) 511 except BrokenPipeError as e: 512 raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e 513 if not self.recv.poll(30): 514 logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime())) 515 if not self.recv.poll(30): 516 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime()) 517 try: 518 ret, exc = self.recv.get() 519 except EOFError as e: 520 raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e 521 # Should probably turn all exceptions in exc back into exceptions? 522 # For now, at least handle BBHandledException 523 if exc and ("BBHandledException" in exc or "SystemExit" in exc): 524 raise bb.BBHandledException() 525 return ret, exc 526 527 def updateFeatureSet(self, featureset): 528 _, error = self.runCommand(["setFeatures", featureset]) 529 if error: 530 logger.error("Unable to set the cooker to the correct featureset: %s" % error) 531 raise BaseException(error) 532 533 def getEventHandle(self): 534 handle, error = self.runCommand(["getUIHandlerNum"]) 535 if error: 536 logger.error("Unable to get UI Handler Number: %s" % error) 537 raise BaseException(error) 538 539 return handle 540 541 def terminateServer(self): 542 self.connection.send(['terminateServer']) 543 return 544 545class BitBakeProcessServerConnection(object): 546 def __init__(self, ui_channel, recv, eq, sock): 547 self.connection = ServerCommunicator(ui_channel, recv) 548 self.events = eq 549 # Save sock so it doesn't get gc'd for the life of our connection 550 self.socket_connection = sock 551 552 def terminate(self): 553 self.events.close() 554 self.socket_connection.close() 555 self.connection.connection.close() 556 self.connection.recv.close() 557 return 558 559start_log_format = '--- Starting bitbake server pid %s at %s ---' 560start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' 561 562class BitBakeServer(object): 563 564 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile): 565 566 self.server_timeout = server_timeout 567 self.xmlrpcinterface = xmlrpcinterface 568 self.featureset = featureset 569 self.sockname = sockname 570 self.bitbake_lock = lock 571 self.profile = profile 572 self.readypipe, self.readypipein = os.pipe() 573 574 # Place the log in the builddirectory alongside the lock file 575 logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log") 576 self.logfile = logfile 577 578 startdatetime = datetime.datetime.now() 579 bb.daemonize.createDaemon(self._startServer, logfile) 580 self.bitbake_lock.close() 581 os.close(self.readypipein) 582 583 ready = ConnectionReader(self.readypipe) 584 r = ready.poll(5) 585 if not r: 586 bb.note("Bitbake server didn't start within 5 seconds, waiting for 90") 587 r = ready.poll(90) 588 if r: 589 try: 590 r = ready.get() 591 except EOFError: 592 # Trap the child exiting/closing the pipe and error out 593 r = None 594 if not r or r[0] != "r": 595 ready.close() 596 bb.error("Unable to start bitbake server (%s)" % str(r)) 597 if os.path.exists(logfile): 598 logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)')) 599 started = False 600 lines = [] 601 lastlines = [] 602 with open(logfile, "r") as f: 603 for line in f: 604 if started: 605 lines.append(line) 606 else: 607 lastlines.append(line) 608 res = logstart_re.search(line.rstrip()) 609 if res: 610 ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format) 611 if ldatetime >= startdatetime: 612 started = True 613 lines.append(line) 614 if len(lastlines) > 60: 615 lastlines = lastlines[-60:] 616 if lines: 617 if len(lines) > 60: 618 bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:]))) 619 else: 620 bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines))) 621 elif lastlines: 622 bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines))) 623 else: 624 bb.error("%s doesn't exist" % logfile) 625 626 raise SystemExit(1) 627 628 ready.close() 629 630 def _startServer(self): 631 os.close(self.readypipe) 632 os.set_inheritable(self.bitbake_lock.fileno(), True) 633 os.set_inheritable(self.readypipein, True) 634 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") 635 os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) 636 637def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile): 638 639 import bb.cookerdata 640 import bb.cooker 641 642 serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format))) 643 644 try: 645 bitbake_lock = os.fdopen(lockfd, "w") 646 647 # Create server control socket 648 if os.path.exists(sockname): 649 serverlog("WARNING: removing existing socket file '%s'" % sockname) 650 os.unlink(sockname) 651 652 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 653 # AF_UNIX has path length issues so chdir here to workaround 654 cwd = os.getcwd() 655 try: 656 os.chdir(os.path.dirname(sockname)) 657 sock.bind(os.path.basename(sockname)) 658 finally: 659 os.chdir(cwd) 660 sock.listen(1) 661 662 server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface) 663 writer = ConnectionWriter(readypipeinfd) 664 try: 665 featureset = [] 666 cooker = bb.cooker.BBCooker(featureset, server) 667 cooker.configuration.profile = profile 668 except bb.BBHandledException: 669 return None 670 writer.send("r") 671 writer.close() 672 server.cooker = cooker 673 serverlog("Started bitbake server pid %d" % os.getpid()) 674 675 server.run() 676 finally: 677 # Flush any messages/errors to the logfile before exit 678 sys.stdout.flush() 679 sys.stderr.flush() 680 681def connectProcessServer(sockname, featureset): 682 # Connect to socket 683 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 684 # AF_UNIX has path length issues so chdir here to workaround 685 cwd = os.getcwd() 686 687 readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None 688 eq = command_chan_recv = command_chan = None 689 690 sock.settimeout(10) 691 692 try: 693 try: 694 os.chdir(os.path.dirname(sockname)) 695 finished = False 696 while not finished: 697 try: 698 sock.connect(os.path.basename(sockname)) 699 finished = True 700 except IOError as e: 701 if e.errno == errno.EWOULDBLOCK: 702 pass 703 raise 704 finally: 705 os.chdir(cwd) 706 707 # Send an fd for the remote to write events to 708 readfd, writefd = os.pipe() 709 eq = BBUIEventQueue(readfd) 710 # Send an fd for the remote to recieve commands from 711 readfd1, writefd1 = os.pipe() 712 command_chan = ConnectionWriter(writefd1) 713 # Send an fd for the remote to write commands results to 714 readfd2, writefd2 = os.pipe() 715 command_chan_recv = ConnectionReader(readfd2) 716 717 sendfds(sock, [writefd, readfd1, writefd2]) 718 719 server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock) 720 721 # Close the ends of the pipes we won't use 722 for i in [writefd, readfd1, writefd2]: 723 os.close(i) 724 725 server_connection.connection.updateFeatureSet(featureset) 726 727 except (Exception, SystemExit) as e: 728 if command_chan_recv: 729 command_chan_recv.close() 730 if command_chan: 731 command_chan.close() 732 for i in [writefd, readfd1, writefd2]: 733 try: 734 if i: 735 os.close(i) 736 except OSError: 737 pass 738 sock.close() 739 raise 740 741 return server_connection 742 743def sendfds(sock, fds): 744 '''Send an array of fds over an AF_UNIX socket.''' 745 fds = array.array('i', fds) 746 msg = bytes([len(fds) % 256]) 747 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) 748 749def recvfds(sock, size): 750 '''Receive an array of fds over an AF_UNIX socket.''' 751 a = array.array('i') 752 bytes_size = a.itemsize * size 753 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size)) 754 if not msg and not ancdata: 755 raise EOFError 756 try: 757 if len(ancdata) != 1: 758 raise RuntimeError('received %d items of ancdata' % 759 len(ancdata)) 760 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 761 if (cmsg_level == socket.SOL_SOCKET and 762 cmsg_type == socket.SCM_RIGHTS): 763 if len(cmsg_data) % a.itemsize != 0: 764 raise ValueError 765 a.frombytes(cmsg_data) 766 assert len(a) % 256 == msg[0] 767 return list(a) 768 except (ValueError, IndexError): 769 pass 770 raise RuntimeError('Invalid data received') 771 772class BBUIEventQueue: 773 def __init__(self, readfd): 774 775 self.eventQueue = [] 776 self.eventQueueLock = threading.Lock() 777 self.eventQueueNotify = threading.Event() 778 779 self.reader = ConnectionReader(readfd) 780 781 self.t = threading.Thread() 782 self.t.run = self.startCallbackHandler 783 self.t.start() 784 785 def getEvent(self): 786 with bb.utils.lock_timeout(self.eventQueueLock): 787 if len(self.eventQueue) == 0: 788 return None 789 790 item = self.eventQueue.pop(0) 791 if len(self.eventQueue) == 0: 792 self.eventQueueNotify.clear() 793 794 return item 795 796 def waitEvent(self, delay): 797 self.eventQueueNotify.wait(delay) 798 return self.getEvent() 799 800 def queue_event(self, event): 801 with bb.utils.lock_timeout(self.eventQueueLock): 802 self.eventQueue.append(event) 803 self.eventQueueNotify.set() 804 805 def send_event(self, event): 806 self.queue_event(pickle.loads(event)) 807 808 def startCallbackHandler(self): 809 bb.utils.set_process_name("UIEventQueue") 810 while True: 811 try: 812 ready = self.reader.wait(0.25) 813 if ready: 814 event = self.reader.get() 815 self.queue_event(event) 816 except (EOFError, OSError, TypeError): 817 # Easiest way to exit is to close the file descriptor to cause an exit 818 break 819 820 def close(self): 821 self.reader.close() 822 self.t.join() 823 824class ConnectionReader(object): 825 826 def __init__(self, fd): 827 self.reader = multiprocessing.connection.Connection(fd, writable=False) 828 self.rlock = multiprocessing.Lock() 829 830 def wait(self, timeout=None): 831 return multiprocessing.connection.wait([self.reader], timeout) 832 833 def poll(self, timeout=None): 834 return self.reader.poll(timeout) 835 836 def get(self): 837 with bb.utils.lock_timeout(self.rlock): 838 res = self.reader.recv_bytes() 839 return multiprocessing.reduction.ForkingPickler.loads(res) 840 841 def fileno(self): 842 return self.reader.fileno() 843 844 def close(self): 845 return self.reader.close() 846 847 848class ConnectionWriter(object): 849 850 def __init__(self, fd): 851 self.writer = multiprocessing.connection.Connection(fd, readable=False) 852 self.wlock = multiprocessing.Lock() 853 # Why bb.event needs this I have no idea 854 self.event = self 855 856 def _send(self, obj): 857 gc.disable() 858 with bb.utils.lock_timeout(self.wlock): 859 self.writer.send_bytes(obj) 860 gc.enable() 861 862 def send(self, obj): 863 obj = multiprocessing.reduction.ForkingPickler.dumps(obj) 864 # See notes/code in CookerParser 865 # We must not terminate holding this lock else processes will hang. 866 # For SIGTERM, raising afterwards avoids this. 867 # For SIGINT, we don't want to have written partial data to the pipe. 868 # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139 869 process = multiprocessing.current_process() 870 if process and hasattr(process, "queue_signals"): 871 with bb.utils.lock_timeout(process.signal_threadlock): 872 process.queue_signals = True 873 self._send(obj) 874 process.queue_signals = False 875 876 while len(process.signal_received) > 0: 877 sig = process.signal_received.pop() 878 process.handle_sig(sig, None) 879 else: 880 self._send(obj) 881 882 def fileno(self): 883 return self.writer.fileno() 884 885 def close(self): 886 return self.writer.close() 887