1#!/usr/bin/env python3 2# 3# Copyright BitBake Contributors 4# 5# SPDX-License-Identifier: GPL-2.0-only 6# 7 8import os 9import sys 10import warnings 11warnings.simplefilter("default") 12warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*use.of.fork.*may.lead.to.deadlocks.in.the.child.*") 13sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) 14from bb import fetch2 15import logging 16import bb 17import select 18import errno 19import signal 20import pickle 21import traceback 22import queue 23import shlex 24import subprocess 25import fcntl 26from multiprocessing import Lock 27from threading import Thread 28 29# Remove when we have a minimum of python 3.10 30if not hasattr(fcntl, 'F_SETPIPE_SZ'): 31 fcntl.F_SETPIPE_SZ = 1031 32 33bb.utils.check_system_locale() 34 35# Users shouldn't be running this code directly 36if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): 37 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") 38 sys.exit(1) 39 40profiling = False 41if sys.argv[1].startswith("decafbadbad"): 42 profiling = True 43 try: 44 import cProfile as profile 45 except: 46 import profile 47 48# Unbuffer stdout to avoid log truncation in the event 49# of an unorderly exit as well as to provide timely 50# updates to log files for use with tail 51try: 52 if sys.stdout.name == '<stdout>': 53 fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) 54 fl |= os.O_SYNC 55 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) 56 #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) 57except: 58 pass 59 60logger = logging.getLogger("BitBake") 61 62worker_pipe = sys.stdout.fileno() 63bb.utils.nonblockingfd(worker_pipe) 64# Try to make the pipe buffers larger as it is much more efficient. If we can't 65# e.g. out of buffer space (/proc/sys/fs/pipe-user-pages-soft) then just pass over. 66try: 67 fcntl.fcntl(worker_pipe, fcntl.F_SETPIPE_SZ, 512 * 1024) 68except: 69 pass 70# Need to guard against multiprocessing being used in child processes 71# and multiple processes trying to write to the parent at the same time 72worker_pipe_lock = None 73 74handler = bb.event.LogHandler() 75logger.addHandler(handler) 76 77if 0: 78 # Code to write out a log file of all events passing through the worker 79 logfilename = "/tmp/workerlogfile" 80 format_str = "%(levelname)s: %(message)s" 81 conlogformat = bb.msg.BBLogFormatter(format_str) 82 consolelog = logging.FileHandler(logfilename) 83 consolelog.setFormatter(conlogformat) 84 logger.addHandler(consolelog) 85 86worker_queue = queue.Queue() 87 88def worker_fire(event, d): 89 data = b"<event>" + pickle.dumps(event) + b"</event>" 90 worker_fire_prepickled(data) 91 92def worker_fire_prepickled(event): 93 global worker_queue 94 95 worker_queue.put(event) 96 97# 98# We can end up with write contention with the cooker, it can be trying to send commands 99# and we can be trying to send event data back. Therefore use a separate thread for writing 100# back data to cooker. 101# 102worker_thread_exit = False 103 104def worker_flush(worker_queue): 105 worker_queue_int = bytearray() 106 global worker_pipe, worker_thread_exit 107 108 while True: 109 try: 110 worker_queue_int.extend(worker_queue.get(True, 1)) 111 except queue.Empty: 112 pass 113 while (worker_queue_int or not worker_queue.empty()): 114 try: 115 (_, ready, _) = select.select([], [worker_pipe], [], 1) 116 if not worker_queue.empty(): 117 worker_queue_int.extend(worker_queue.get()) 118 written = os.write(worker_pipe, worker_queue_int) 119 del worker_queue_int[0:written] 120 except (IOError, OSError) as e: 121 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: 122 raise 123 if worker_thread_exit and worker_queue.empty() and not worker_queue_int: 124 return 125 126worker_thread = Thread(target=worker_flush, args=(worker_queue,)) 127worker_thread.start() 128 129def worker_child_fire(event, d): 130 global worker_pipe 131 global worker_pipe_lock 132 133 data = b"<event>" + pickle.dumps(event) + b"</event>" 134 try: 135 with bb.utils.lock_timeout(worker_pipe_lock): 136 while(len(data)): 137 written = worker_pipe.write(data) 138 data = data[written:] 139 except IOError: 140 sigterm_handler(None, None) 141 raise 142 143bb.event.worker_fire = worker_fire 144 145lf = None 146#lf = open("/tmp/workercommandlog", "w+") 147def workerlog_write(msg): 148 if lf: 149 lf.write(msg) 150 lf.flush() 151 152def sigterm_handler(signum, frame): 153 signal.signal(signal.SIGTERM, signal.SIG_DFL) 154 os.killpg(0, signal.SIGTERM) 155 sys.exit() 156 157def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask): 158 159 fn = runtask['fn'] 160 task = runtask['task'] 161 taskname = runtask['taskname'] 162 taskhash = runtask['taskhash'] 163 unihash = runtask['unihash'] 164 appends = runtask['appends'] 165 layername = runtask['layername'] 166 taskdepdata = runtask['taskdepdata'] 167 quieterrors = runtask['quieterrors'] 168 # We need to setup the environment BEFORE the fork, since 169 # a fork() or exec*() activates PSEUDO... 170 171 envbackup = {} 172 fakeroot = False 173 fakeenv = {} 174 umask = None 175 176 uid = os.getuid() 177 gid = os.getgid() 178 179 taskdep = runtask['taskdep'] 180 if 'umask' in taskdep and taskname in taskdep['umask']: 181 umask = taskdep['umask'][taskname] 182 elif workerdata["umask"]: 183 umask = workerdata["umask"] 184 if umask: 185 # Convert to a python numeric value as it could be a string 186 umask = bb.utils.to_filemode(umask) 187 188 dry_run = cfg.dry_run or runtask['dry_run'] 189 190 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built 191 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: 192 fakeroot = True 193 envvars = (runtask['fakerootenv'] or "").split() 194 for key, value in (var.split('=',1) for var in envvars): 195 envbackup[key] = os.environ.get(key) 196 os.environ[key] = value 197 fakeenv[key] = value 198 199 fakedirs = (runtask['fakerootdirs'] or "").split() 200 for p in fakedirs: 201 bb.utils.mkdirhier(p) 202 logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % 203 (fn, taskname, ', '.join(fakedirs))) 204 else: 205 envvars = (runtask['fakerootnoenv'] or "").split() 206 for key, value in (var.split('=',1) for var in envvars): 207 envbackup[key] = os.environ.get(key) 208 os.environ[key] = value 209 fakeenv[key] = value 210 211 sys.stdout.flush() 212 sys.stderr.flush() 213 214 try: 215 pipein, pipeout = os.pipe() 216 pipein = os.fdopen(pipein, 'rb', 4096) 217 pipeout = os.fdopen(pipeout, 'wb', 0) 218 pid = os.fork() 219 except OSError as e: 220 logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) 221 sys.exit(1) 222 223 if pid == 0: 224 def child(): 225 global worker_pipe 226 global worker_pipe_lock 227 pipein.close() 228 229 bb.utils.signal_on_parent_exit("SIGTERM") 230 231 # Save out the PID so that the event can include it the 232 # events 233 bb.event.worker_pid = os.getpid() 234 bb.event.worker_fire = worker_child_fire 235 worker_pipe = pipeout 236 worker_pipe_lock = Lock() 237 238 # Make the child the process group leader and ensure no 239 # child process will be controlled by the current terminal 240 # This ensures signals sent to the controlling terminal like Ctrl+C 241 # don't stop the child processes. 242 os.setsid() 243 244 signal.signal(signal.SIGTERM, sigterm_handler) 245 # Let SIGHUP exit as SIGTERM 246 signal.signal(signal.SIGHUP, sigterm_handler) 247 248 # No stdin & stdout 249 # stdout is used as a status report channel and must not be used by child processes. 250 dumbio = os.open(os.devnull, os.O_RDWR) 251 os.dup2(dumbio, sys.stdin.fileno()) 252 os.dup2(dumbio, sys.stdout.fileno()) 253 254 if umask is not None: 255 os.umask(umask) 256 257 try: 258 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) 259 the_data = databuilder.mcdata[mc] 260 the_data.setVar("BB_WORKERCONTEXT", "1") 261 the_data.setVar("BB_TASKDEPDATA", taskdepdata) 262 the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", "")) 263 if cfg.limited_deps: 264 the_data.setVar("BB_LIMITEDDEPS", "1") 265 the_data.setVar("BUILDNAME", workerdata["buildname"]) 266 the_data.setVar("DATE", workerdata["date"]) 267 the_data.setVar("TIME", workerdata["time"]) 268 for varname, value in extraconfigdata.items(): 269 the_data.setVar(varname, value) 270 271 bb.parse.siggen.set_taskdata(workerdata["sigdata"]) 272 if "newhashes" in workerdata: 273 bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) 274 ret = 0 275 276 the_data = databuilder.parseRecipe(fn, appends, layername) 277 the_data.setVar('BB_TASKHASH', taskhash) 278 the_data.setVar('BB_UNIHASH', unihash) 279 bb.parse.siggen.setup_datacache_from_datastore(fn, the_data) 280 281 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) 282 283 if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')): 284 if bb.utils.is_local_uid(uid): 285 logger.debug("Attempting to disable network for %s" % taskname) 286 bb.utils.disable_network(uid, gid) 287 else: 288 logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid)) 289 290 # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 291 # successfully. We also need to unset anything from the environment which shouldn't be there 292 exports = bb.data.exported_vars(the_data) 293 294 bb.utils.empty_environment() 295 for e, v in exports: 296 os.environ[e] = v 297 298 for e in fakeenv: 299 os.environ[e] = fakeenv[e] 300 the_data.setVar(e, fakeenv[e]) 301 the_data.setVarFlag(e, 'export', "1") 302 303 task_exports = the_data.getVarFlag(taskname, 'exports') 304 if task_exports: 305 for e in task_exports.split(): 306 the_data.setVarFlag(e, 'export', '1') 307 v = the_data.getVar(e) 308 if v is not None: 309 os.environ[e] = v 310 311 if quieterrors: 312 the_data.setVarFlag(taskname, "quieterrors", "1") 313 314 except Exception: 315 if not quieterrors: 316 logger.critical(traceback.format_exc()) 317 os._exit(1) 318 319 sys.stdout.flush() 320 sys.stderr.flush() 321 322 try: 323 if dry_run: 324 return 0 325 try: 326 ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) 327 finally: 328 if fakeroot: 329 fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD")) 330 subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE) 331 return ret 332 except: 333 os._exit(1) 334 if not profiling: 335 os._exit(child()) 336 else: 337 profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) 338 prof = profile.Profile() 339 try: 340 ret = profile.Profile.runcall(prof, child) 341 finally: 342 prof.dump_stats(profname) 343 bb.utils.process_profilelog(profname) 344 os._exit(ret) 345 else: 346 for key, value in iter(envbackup.items()): 347 if value is None: 348 del os.environ[key] 349 else: 350 os.environ[key] = value 351 352 return pid, pipein, pipeout 353 354class runQueueWorkerPipe(): 355 """ 356 Abstraction for a pipe between a worker thread and the worker server 357 """ 358 def __init__(self, pipein, pipeout): 359 self.input = pipein 360 if pipeout: 361 pipeout.close() 362 bb.utils.nonblockingfd(self.input) 363 self.queue = bytearray() 364 365 def read(self): 366 start = len(self.queue) 367 try: 368 self.queue.extend(self.input.read(512*1024) or b"") 369 except (OSError, IOError) as e: 370 if e.errno != errno.EAGAIN: 371 raise 372 373 end = len(self.queue) 374 index = self.queue.find(b"</event>") 375 while index != -1: 376 msg = self.queue[:index+8] 377 assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1 378 worker_fire_prepickled(msg) 379 self.queue = self.queue[index+8:] 380 index = self.queue.find(b"</event>") 381 return (end > start) 382 383 def close(self): 384 while self.read(): 385 continue 386 if len(self.queue) > 0: 387 print("Warning, worker child left partial message: %s" % self.queue) 388 self.input.close() 389 390normalexit = False 391 392class BitbakeWorker(object): 393 def __init__(self, din): 394 self.input = din 395 bb.utils.nonblockingfd(self.input) 396 self.queue = bytearray() 397 self.cookercfg = None 398 self.databuilder = None 399 self.data = None 400 self.extraconfigdata = None 401 self.build_pids = {} 402 self.build_pipes = {} 403 404 signal.signal(signal.SIGTERM, self.sigterm_exception) 405 # Let SIGHUP exit as SIGTERM 406 signal.signal(signal.SIGHUP, self.sigterm_exception) 407 if "beef" in sys.argv[1]: 408 bb.utils.set_process_name("Worker (Fakeroot)") 409 else: 410 bb.utils.set_process_name("Worker") 411 412 def sigterm_exception(self, signum, stackframe): 413 if signum == signal.SIGTERM: 414 bb.warn("Worker received SIGTERM, shutting down...") 415 elif signum == signal.SIGHUP: 416 bb.warn("Worker received SIGHUP, shutting down...") 417 self.handle_finishnow(None) 418 signal.signal(signal.SIGTERM, signal.SIG_DFL) 419 os.kill(os.getpid(), signal.SIGTERM) 420 421 def serve(self): 422 while True: 423 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) 424 if self.input in ready: 425 try: 426 r = self.input.read() 427 if len(r) == 0: 428 # EOF on pipe, server must have terminated 429 self.sigterm_exception(signal.SIGTERM, None) 430 self.queue.extend(r) 431 except (OSError, IOError): 432 pass 433 if len(self.queue): 434 self.handle_item(b"cookerconfig", self.handle_cookercfg) 435 self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) 436 self.handle_item(b"workerdata", self.handle_workerdata) 437 self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) 438 self.handle_item(b"runtask", self.handle_runtask) 439 self.handle_item(b"finishnow", self.handle_finishnow) 440 self.handle_item(b"ping", self.handle_ping) 441 self.handle_item(b"quit", self.handle_quit) 442 443 for pipe in self.build_pipes: 444 if self.build_pipes[pipe].input in ready: 445 self.build_pipes[pipe].read() 446 if len(self.build_pids): 447 while self.process_waitpid(): 448 continue 449 450 def handle_item(self, item, func): 451 opening_tag = b"<" + item + b">" 452 if not self.queue.startswith(opening_tag): 453 return 454 455 tag_len = len(opening_tag) 456 if len(self.queue) < tag_len + 4: 457 # we need to receive more data 458 return 459 header = self.queue[tag_len:tag_len + 4] 460 payload_len = int.from_bytes(header, 'big') 461 # closing tag has length (tag_len + 1) 462 if len(self.queue) < tag_len * 2 + 1 + payload_len: 463 # we need to receive more data 464 return 465 466 index = self.queue.find(b"</" + item + b">") 467 if index != -1: 468 try: 469 func(self.queue[(tag_len + 4):index]) 470 except pickle.UnpicklingError: 471 workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) 472 raise 473 self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):] 474 475 def handle_cookercfg(self, data): 476 self.cookercfg = pickle.loads(data) 477 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) 478 self.databuilder.parseBaseConfiguration(worker=True) 479 self.data = self.databuilder.data 480 481 def handle_extraconfigdata(self, data): 482 self.extraconfigdata = pickle.loads(data) 483 484 def handle_workerdata(self, data): 485 self.workerdata = pickle.loads(data) 486 bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"] 487 bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"] 488 bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] 489 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] 490 for mc in self.databuilder.mcdata: 491 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) 492 self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) 493 self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe") 494 495 def handle_newtaskhashes(self, data): 496 self.workerdata["newhashes"] = pickle.loads(data) 497 498 def handle_ping(self, _): 499 workerlog_write("Handling ping\n") 500 501 logger.warning("Pong from bitbake-worker!") 502 503 def handle_quit(self, data): 504 workerlog_write("Handling quit\n") 505 506 global normalexit 507 normalexit = True 508 sys.exit(0) 509 510 def handle_runtask(self, data): 511 runtask = pickle.loads(data) 512 513 fn = runtask['fn'] 514 task = runtask['task'] 515 taskname = runtask['taskname'] 516 517 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) 518 519 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask) 520 self.build_pids[pid] = task 521 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) 522 523 def process_waitpid(self): 524 """ 525 Return none is there are no processes awaiting result collection, otherwise 526 collect the process exit codes and close the information pipe. 527 """ 528 try: 529 pid, status = os.waitpid(-1, os.WNOHANG) 530 if pid == 0 or os.WIFSTOPPED(status): 531 return False 532 except OSError: 533 return False 534 535 workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) 536 537 if os.WIFEXITED(status): 538 status = os.WEXITSTATUS(status) 539 elif os.WIFSIGNALED(status): 540 # Per shell conventions for $?, when a process exits due to 541 # a signal, we return an exit code of 128 + SIGNUM 542 status = 128 + os.WTERMSIG(status) 543 544 task = self.build_pids[pid] 545 del self.build_pids[pid] 546 547 self.build_pipes[pid].close() 548 del self.build_pipes[pid] 549 550 worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>") 551 552 return True 553 554 def handle_finishnow(self, _): 555 if self.build_pids: 556 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) 557 for k, v in iter(self.build_pids.items()): 558 try: 559 os.kill(-k, signal.SIGTERM) 560 os.waitpid(-1, 0) 561 except: 562 pass 563 for pipe in self.build_pipes: 564 self.build_pipes[pipe].read() 565 566try: 567 worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) 568 if not profiling: 569 worker.serve() 570 else: 571 profname = "profile-worker.log" 572 prof = profile.Profile() 573 try: 574 profile.Profile.runcall(prof, worker.serve) 575 finally: 576 prof.dump_stats(profname) 577 bb.utils.process_profilelog(profname) 578except BaseException as e: 579 if not normalexit: 580 import traceback 581 sys.stderr.write(traceback.format_exc()) 582 sys.stderr.write(str(e)) 583finally: 584 worker_thread_exit = True 585 worker_thread.join() 586 587workerlog_write("exiting") 588if not normalexit: 589 sys.exit(1) 590sys.exit(0) 591