1#!/usr/bin/env python3 2# 3# Copyright BitBake Contributors 4# 5# SPDX-License-Identifier: GPL-2.0-only 6# 7 8import os 9import sys 10import warnings 11warnings.simplefilter("default") 12warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*use.of.fork.*may.lead.to.deadlocks.in.the.child.*") 13sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) 14from bb import fetch2 15import logging 16import bb 17import select 18import errno 19import signal 20import pickle 21import traceback 22import queue 23import shlex 24import subprocess 25import fcntl 26from multiprocessing import Lock 27from threading import Thread 28 29# Remove when we have a minimum of python 3.10 30if not hasattr(fcntl, 'F_SETPIPE_SZ'): 31 fcntl.F_SETPIPE_SZ = 1031 32 33bb.utils.check_system_locale() 34 35# Users shouldn't be running this code directly 36if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): 37 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") 38 sys.exit(1) 39 40profiling = False 41if sys.argv[1].startswith("decafbadbad"): 42 profiling = True 43 try: 44 import cProfile as profile 45 except: 46 import profile 47 48# Unbuffer stdout to avoid log truncation in the event 49# of an unorderly exit as well as to provide timely 50# updates to log files for use with tail 51try: 52 if sys.stdout.name == '<stdout>': 53 fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) 54 fl |= os.O_SYNC 55 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) 56 #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) 57except: 58 pass 59 60logger = logging.getLogger("BitBake") 61 62worker_pipe = sys.stdout.fileno() 63bb.utils.nonblockingfd(worker_pipe) 64# Try to make the pipe buffers larger as it is much more efficient. If we can't 65# e.g. out of buffer space (/proc/sys/fs/pipe-user-pages-soft) then just pass over. 66try: 67 fcntl.fcntl(worker_pipe, fcntl.F_SETPIPE_SZ, 512 * 1024) 68except: 69 pass 70# Need to guard against multiprocessing being used in child processes 71# and multiple processes trying to write to the parent at the same time 72worker_pipe_lock = None 73 74handler = bb.event.LogHandler() 75logger.addHandler(handler) 76 77if 0: 78 # Code to write out a log file of all events passing through the worker 79 logfilename = "/tmp/workerlogfile" 80 format_str = "%(levelname)s: %(message)s" 81 conlogformat = bb.msg.BBLogFormatter(format_str) 82 consolelog = logging.FileHandler(logfilename) 83 consolelog.setFormatter(conlogformat) 84 logger.addHandler(consolelog) 85 86worker_queue = queue.Queue() 87 88def worker_fire(event, d): 89 data = b"<event>" + pickle.dumps(event) + b"</event>" 90 worker_fire_prepickled(data) 91 92def worker_fire_prepickled(event): 93 global worker_queue 94 95 worker_queue.put(event) 96 97# 98# We can end up with write contention with the cooker, it can be trying to send commands 99# and we can be trying to send event data back. Therefore use a separate thread for writing 100# back data to cooker. 101# 102worker_thread_exit = False 103 104def worker_flush(worker_queue): 105 worker_queue_int = bytearray() 106 global worker_pipe, worker_thread_exit 107 108 while True: 109 try: 110 worker_queue_int.extend(worker_queue.get(True, 1)) 111 except queue.Empty: 112 pass 113 while (worker_queue_int or not worker_queue.empty()): 114 try: 115 (_, ready, _) = select.select([], [worker_pipe], [], 1) 116 if not worker_queue.empty(): 117 worker_queue_int.extend(worker_queue.get()) 118 written = os.write(worker_pipe, worker_queue_int) 119 del worker_queue_int[0:written] 120 except (IOError, OSError) as e: 121 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: 122 raise 123 if worker_thread_exit and worker_queue.empty() and not worker_queue_int: 124 return 125 126worker_thread = Thread(target=worker_flush, args=(worker_queue,)) 127worker_thread.start() 128 129def worker_child_fire(event, d): 130 global worker_pipe 131 global worker_pipe_lock 132 133 data = b"<event>" + pickle.dumps(event) + b"</event>" 134 try: 135 with bb.utils.lock_timeout(worker_pipe_lock): 136 while(len(data)): 137 written = worker_pipe.write(data) 138 data = data[written:] 139 except IOError: 140 sigterm_handler(None, None) 141 raise 142 143bb.event.worker_fire = worker_fire 144 145lf = None 146#lf = open("/tmp/workercommandlog", "w+") 147def workerlog_write(msg): 148 if lf: 149 lf.write(msg) 150 lf.flush() 151 152def sigterm_handler(signum, frame): 153 signal.signal(signal.SIGTERM, signal.SIG_DFL) 154 os.killpg(0, signal.SIGTERM) 155 sys.exit() 156 157def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask): 158 159 fn = runtask['fn'] 160 task = runtask['task'] 161 taskname = runtask['taskname'] 162 taskhash = runtask['taskhash'] 163 unihash = runtask['unihash'] 164 appends = runtask['appends'] 165 layername = runtask['layername'] 166 taskdepdata = runtask['taskdepdata'] 167 quieterrors = runtask['quieterrors'] 168 # We need to setup the environment BEFORE the fork, since 169 # a fork() or exec*() activates PSEUDO... 170 171 envbackup = {} 172 fakeroot = False 173 fakeenv = {} 174 umask = None 175 176 uid = os.getuid() 177 gid = os.getgid() 178 179 taskdep = runtask['taskdep'] 180 if 'umask' in taskdep and taskname in taskdep['umask']: 181 umask = taskdep['umask'][taskname] 182 elif workerdata["umask"]: 183 umask = workerdata["umask"] 184 if umask: 185 # umask might come in as a number or text string.. 186 try: 187 umask = int(umask, 8) 188 except TypeError: 189 pass 190 191 dry_run = cfg.dry_run or runtask['dry_run'] 192 193 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built 194 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: 195 fakeroot = True 196 envvars = (runtask['fakerootenv'] or "").split() 197 for key, value in (var.split('=',1) for var in envvars): 198 envbackup[key] = os.environ.get(key) 199 os.environ[key] = value 200 fakeenv[key] = value 201 202 fakedirs = (runtask['fakerootdirs'] or "").split() 203 for p in fakedirs: 204 bb.utils.mkdirhier(p) 205 logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % 206 (fn, taskname, ', '.join(fakedirs))) 207 else: 208 envvars = (runtask['fakerootnoenv'] or "").split() 209 for key, value in (var.split('=',1) for var in envvars): 210 envbackup[key] = os.environ.get(key) 211 os.environ[key] = value 212 fakeenv[key] = value 213 214 sys.stdout.flush() 215 sys.stderr.flush() 216 217 try: 218 pipein, pipeout = os.pipe() 219 pipein = os.fdopen(pipein, 'rb', 4096) 220 pipeout = os.fdopen(pipeout, 'wb', 0) 221 pid = os.fork() 222 except OSError as e: 223 logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) 224 sys.exit(1) 225 226 if pid == 0: 227 def child(): 228 global worker_pipe 229 global worker_pipe_lock 230 pipein.close() 231 232 bb.utils.signal_on_parent_exit("SIGTERM") 233 234 # Save out the PID so that the event can include it the 235 # events 236 bb.event.worker_pid = os.getpid() 237 bb.event.worker_fire = worker_child_fire 238 worker_pipe = pipeout 239 worker_pipe_lock = Lock() 240 241 # Make the child the process group leader and ensure no 242 # child process will be controlled by the current terminal 243 # This ensures signals sent to the controlling terminal like Ctrl+C 244 # don't stop the child processes. 245 os.setsid() 246 247 signal.signal(signal.SIGTERM, sigterm_handler) 248 # Let SIGHUP exit as SIGTERM 249 signal.signal(signal.SIGHUP, sigterm_handler) 250 251 # No stdin & stdout 252 # stdout is used as a status report channel and must not be used by child processes. 253 dumbio = os.open(os.devnull, os.O_RDWR) 254 os.dup2(dumbio, sys.stdin.fileno()) 255 os.dup2(dumbio, sys.stdout.fileno()) 256 257 if umask is not None: 258 os.umask(umask) 259 260 try: 261 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) 262 the_data = databuilder.mcdata[mc] 263 the_data.setVar("BB_WORKERCONTEXT", "1") 264 the_data.setVar("BB_TASKDEPDATA", taskdepdata) 265 the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", "")) 266 if cfg.limited_deps: 267 the_data.setVar("BB_LIMITEDDEPS", "1") 268 the_data.setVar("BUILDNAME", workerdata["buildname"]) 269 the_data.setVar("DATE", workerdata["date"]) 270 the_data.setVar("TIME", workerdata["time"]) 271 for varname, value in extraconfigdata.items(): 272 the_data.setVar(varname, value) 273 274 bb.parse.siggen.set_taskdata(workerdata["sigdata"]) 275 if "newhashes" in workerdata: 276 bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) 277 ret = 0 278 279 the_data = databuilder.parseRecipe(fn, appends, layername) 280 the_data.setVar('BB_TASKHASH', taskhash) 281 the_data.setVar('BB_UNIHASH', unihash) 282 bb.parse.siggen.setup_datacache_from_datastore(fn, the_data) 283 284 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) 285 286 if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')): 287 if bb.utils.is_local_uid(uid): 288 logger.debug("Attempting to disable network for %s" % taskname) 289 bb.utils.disable_network(uid, gid) 290 else: 291 logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid)) 292 293 # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 294 # successfully. We also need to unset anything from the environment which shouldn't be there 295 exports = bb.data.exported_vars(the_data) 296 297 bb.utils.empty_environment() 298 for e, v in exports: 299 os.environ[e] = v 300 301 for e in fakeenv: 302 os.environ[e] = fakeenv[e] 303 the_data.setVar(e, fakeenv[e]) 304 the_data.setVarFlag(e, 'export', "1") 305 306 task_exports = the_data.getVarFlag(taskname, 'exports') 307 if task_exports: 308 for e in task_exports.split(): 309 the_data.setVarFlag(e, 'export', '1') 310 v = the_data.getVar(e) 311 if v is not None: 312 os.environ[e] = v 313 314 if quieterrors: 315 the_data.setVarFlag(taskname, "quieterrors", "1") 316 317 except Exception: 318 if not quieterrors: 319 logger.critical(traceback.format_exc()) 320 os._exit(1) 321 322 sys.stdout.flush() 323 sys.stderr.flush() 324 325 try: 326 if dry_run: 327 return 0 328 try: 329 ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) 330 finally: 331 if fakeroot: 332 fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD")) 333 subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE) 334 return ret 335 except: 336 os._exit(1) 337 if not profiling: 338 os._exit(child()) 339 else: 340 profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) 341 prof = profile.Profile() 342 try: 343 ret = profile.Profile.runcall(prof, child) 344 finally: 345 prof.dump_stats(profname) 346 bb.utils.process_profilelog(profname) 347 os._exit(ret) 348 else: 349 for key, value in iter(envbackup.items()): 350 if value is None: 351 del os.environ[key] 352 else: 353 os.environ[key] = value 354 355 return pid, pipein, pipeout 356 357class runQueueWorkerPipe(): 358 """ 359 Abstraction for a pipe between a worker thread and the worker server 360 """ 361 def __init__(self, pipein, pipeout): 362 self.input = pipein 363 if pipeout: 364 pipeout.close() 365 bb.utils.nonblockingfd(self.input) 366 self.queue = bytearray() 367 368 def read(self): 369 start = len(self.queue) 370 try: 371 self.queue.extend(self.input.read(512*1024) or b"") 372 except (OSError, IOError) as e: 373 if e.errno != errno.EAGAIN: 374 raise 375 376 end = len(self.queue) 377 index = self.queue.find(b"</event>") 378 while index != -1: 379 msg = self.queue[:index+8] 380 assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1 381 worker_fire_prepickled(msg) 382 self.queue = self.queue[index+8:] 383 index = self.queue.find(b"</event>") 384 return (end > start) 385 386 def close(self): 387 while self.read(): 388 continue 389 if len(self.queue) > 0: 390 print("Warning, worker child left partial message: %s" % self.queue) 391 self.input.close() 392 393normalexit = False 394 395class BitbakeWorker(object): 396 def __init__(self, din): 397 self.input = din 398 bb.utils.nonblockingfd(self.input) 399 self.queue = bytearray() 400 self.cookercfg = None 401 self.databuilder = None 402 self.data = None 403 self.extraconfigdata = None 404 self.build_pids = {} 405 self.build_pipes = {} 406 407 signal.signal(signal.SIGTERM, self.sigterm_exception) 408 # Let SIGHUP exit as SIGTERM 409 signal.signal(signal.SIGHUP, self.sigterm_exception) 410 if "beef" in sys.argv[1]: 411 bb.utils.set_process_name("Worker (Fakeroot)") 412 else: 413 bb.utils.set_process_name("Worker") 414 415 def sigterm_exception(self, signum, stackframe): 416 if signum == signal.SIGTERM: 417 bb.warn("Worker received SIGTERM, shutting down...") 418 elif signum == signal.SIGHUP: 419 bb.warn("Worker received SIGHUP, shutting down...") 420 self.handle_finishnow(None) 421 signal.signal(signal.SIGTERM, signal.SIG_DFL) 422 os.kill(os.getpid(), signal.SIGTERM) 423 424 def serve(self): 425 while True: 426 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) 427 if self.input in ready: 428 try: 429 r = self.input.read() 430 if len(r) == 0: 431 # EOF on pipe, server must have terminated 432 self.sigterm_exception(signal.SIGTERM, None) 433 self.queue.extend(r) 434 except (OSError, IOError): 435 pass 436 if len(self.queue): 437 self.handle_item(b"cookerconfig", self.handle_cookercfg) 438 self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) 439 self.handle_item(b"workerdata", self.handle_workerdata) 440 self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) 441 self.handle_item(b"runtask", self.handle_runtask) 442 self.handle_item(b"finishnow", self.handle_finishnow) 443 self.handle_item(b"ping", self.handle_ping) 444 self.handle_item(b"quit", self.handle_quit) 445 446 for pipe in self.build_pipes: 447 if self.build_pipes[pipe].input in ready: 448 self.build_pipes[pipe].read() 449 if len(self.build_pids): 450 while self.process_waitpid(): 451 continue 452 453 def handle_item(self, item, func): 454 opening_tag = b"<" + item + b">" 455 if not self.queue.startswith(opening_tag): 456 return 457 458 tag_len = len(opening_tag) 459 if len(self.queue) < tag_len + 4: 460 # we need to receive more data 461 return 462 header = self.queue[tag_len:tag_len + 4] 463 payload_len = int.from_bytes(header, 'big') 464 # closing tag has length (tag_len + 1) 465 if len(self.queue) < tag_len * 2 + 1 + payload_len: 466 # we need to receive more data 467 return 468 469 index = self.queue.find(b"</" + item + b">") 470 if index != -1: 471 try: 472 func(self.queue[(tag_len + 4):index]) 473 except pickle.UnpicklingError: 474 workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) 475 raise 476 self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):] 477 478 def handle_cookercfg(self, data): 479 self.cookercfg = pickle.loads(data) 480 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) 481 self.databuilder.parseBaseConfiguration(worker=True) 482 self.data = self.databuilder.data 483 484 def handle_extraconfigdata(self, data): 485 self.extraconfigdata = pickle.loads(data) 486 487 def handle_workerdata(self, data): 488 self.workerdata = pickle.loads(data) 489 bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"] 490 bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"] 491 bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] 492 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] 493 for mc in self.databuilder.mcdata: 494 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) 495 self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) 496 self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe") 497 498 def handle_newtaskhashes(self, data): 499 self.workerdata["newhashes"] = pickle.loads(data) 500 501 def handle_ping(self, _): 502 workerlog_write("Handling ping\n") 503 504 logger.warning("Pong from bitbake-worker!") 505 506 def handle_quit(self, data): 507 workerlog_write("Handling quit\n") 508 509 global normalexit 510 normalexit = True 511 sys.exit(0) 512 513 def handle_runtask(self, data): 514 runtask = pickle.loads(data) 515 516 fn = runtask['fn'] 517 task = runtask['task'] 518 taskname = runtask['taskname'] 519 520 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) 521 522 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask) 523 self.build_pids[pid] = task 524 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) 525 526 def process_waitpid(self): 527 """ 528 Return none is there are no processes awaiting result collection, otherwise 529 collect the process exit codes and close the information pipe. 530 """ 531 try: 532 pid, status = os.waitpid(-1, os.WNOHANG) 533 if pid == 0 or os.WIFSTOPPED(status): 534 return False 535 except OSError: 536 return False 537 538 workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) 539 540 if os.WIFEXITED(status): 541 status = os.WEXITSTATUS(status) 542 elif os.WIFSIGNALED(status): 543 # Per shell conventions for $?, when a process exits due to 544 # a signal, we return an exit code of 128 + SIGNUM 545 status = 128 + os.WTERMSIG(status) 546 547 task = self.build_pids[pid] 548 del self.build_pids[pid] 549 550 self.build_pipes[pid].close() 551 del self.build_pipes[pid] 552 553 worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>") 554 555 return True 556 557 def handle_finishnow(self, _): 558 if self.build_pids: 559 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) 560 for k, v in iter(self.build_pids.items()): 561 try: 562 os.kill(-k, signal.SIGTERM) 563 os.waitpid(-1, 0) 564 except: 565 pass 566 for pipe in self.build_pipes: 567 self.build_pipes[pipe].read() 568 569try: 570 worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) 571 if not profiling: 572 worker.serve() 573 else: 574 profname = "profile-worker.log" 575 prof = profile.Profile() 576 try: 577 profile.Profile.runcall(prof, worker.serve) 578 finally: 579 prof.dump_stats(profname) 580 bb.utils.process_profilelog(profname) 581except BaseException as e: 582 if not normalexit: 583 import traceback 584 sys.stderr.write(traceback.format_exc()) 585 sys.stderr.write(str(e)) 586finally: 587 worker_thread_exit = True 588 worker_thread.join() 589 590workerlog_write("exiting") 591if not normalexit: 592 sys.exit(1) 593sys.exit(0) 594