1#!/usr/bin/env python3 2# 3# Copyright BitBake Contributors 4# 5# SPDX-License-Identifier: GPL-2.0-only 6# 7 8import os 9import sys 10import warnings 11warnings.simplefilter("default") 12sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) 13from bb import fetch2 14import logging 15import bb 16import select 17import errno 18import signal 19import pickle 20import traceback 21import queue 22import shlex 23import subprocess 24import fcntl 25from multiprocessing import Lock 26from threading import Thread 27 28# Remove when we have a minimum of python 3.10 29if not hasattr(fcntl, 'F_SETPIPE_SZ'): 30 fcntl.F_SETPIPE_SZ = 1031 31 32bb.utils.check_system_locale() 33 34# Users shouldn't be running this code directly 35if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): 36 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") 37 sys.exit(1) 38 39profiling = False 40if sys.argv[1].startswith("decafbadbad"): 41 profiling = True 42 try: 43 import cProfile as profile 44 except: 45 import profile 46 47# Unbuffer stdout to avoid log truncation in the event 48# of an unorderly exit as well as to provide timely 49# updates to log files for use with tail 50try: 51 if sys.stdout.name == '<stdout>': 52 fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) 53 fl |= os.O_SYNC 54 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) 55 #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) 56except: 57 pass 58 59logger = logging.getLogger("BitBake") 60 61worker_pipe = sys.stdout.fileno() 62bb.utils.nonblockingfd(worker_pipe) 63# Try to make the pipe buffers larger as it is much more efficient. If we can't 64# e.g. out of buffer space (/proc/sys/fs/pipe-user-pages-soft) then just pass over. 65try: 66 fcntl.fcntl(worker_pipe, fcntl.F_SETPIPE_SZ, 512 * 1024) 67except: 68 pass 69# Need to guard against multiprocessing being used in child processes 70# and multiple processes trying to write to the parent at the same time 71worker_pipe_lock = None 72 73handler = bb.event.LogHandler() 74logger.addHandler(handler) 75 76if 0: 77 # Code to write out a log file of all events passing through the worker 78 logfilename = "/tmp/workerlogfile" 79 format_str = "%(levelname)s: %(message)s" 80 conlogformat = bb.msg.BBLogFormatter(format_str) 81 consolelog = logging.FileHandler(logfilename) 82 consolelog.setFormatter(conlogformat) 83 logger.addHandler(consolelog) 84 85worker_queue = queue.Queue() 86 87def worker_fire(event, d): 88 data = b"<event>" + pickle.dumps(event) + b"</event>" 89 worker_fire_prepickled(data) 90 91def worker_fire_prepickled(event): 92 global worker_queue 93 94 worker_queue.put(event) 95 96# 97# We can end up with write contention with the cooker, it can be trying to send commands 98# and we can be trying to send event data back. Therefore use a separate thread for writing 99# back data to cooker. 100# 101worker_thread_exit = False 102 103def worker_flush(worker_queue): 104 worker_queue_int = bytearray() 105 global worker_pipe, worker_thread_exit 106 107 while True: 108 try: 109 worker_queue_int.extend(worker_queue.get(True, 1)) 110 except queue.Empty: 111 pass 112 while (worker_queue_int or not worker_queue.empty()): 113 try: 114 (_, ready, _) = select.select([], [worker_pipe], [], 1) 115 if not worker_queue.empty(): 116 worker_queue_int.extend(worker_queue.get()) 117 written = os.write(worker_pipe, worker_queue_int) 118 del worker_queue_int[0:written] 119 except (IOError, OSError) as e: 120 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: 121 raise 122 if worker_thread_exit and worker_queue.empty() and not worker_queue_int: 123 return 124 125worker_thread = Thread(target=worker_flush, args=(worker_queue,)) 126worker_thread.start() 127 128def worker_child_fire(event, d): 129 global worker_pipe 130 global worker_pipe_lock 131 132 data = b"<event>" + pickle.dumps(event) + b"</event>" 133 try: 134 with bb.utils.lock_timeout(worker_pipe_lock): 135 while(len(data)): 136 written = worker_pipe.write(data) 137 data = data[written:] 138 except IOError: 139 sigterm_handler(None, None) 140 raise 141 142bb.event.worker_fire = worker_fire 143 144lf = None 145#lf = open("/tmp/workercommandlog", "w+") 146def workerlog_write(msg): 147 if lf: 148 lf.write(msg) 149 lf.flush() 150 151def sigterm_handler(signum, frame): 152 signal.signal(signal.SIGTERM, signal.SIG_DFL) 153 os.killpg(0, signal.SIGTERM) 154 sys.exit() 155 156def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask): 157 158 fn = runtask['fn'] 159 task = runtask['task'] 160 taskname = runtask['taskname'] 161 taskhash = runtask['taskhash'] 162 unihash = runtask['unihash'] 163 appends = runtask['appends'] 164 layername = runtask['layername'] 165 taskdepdata = runtask['taskdepdata'] 166 quieterrors = runtask['quieterrors'] 167 # We need to setup the environment BEFORE the fork, since 168 # a fork() or exec*() activates PSEUDO... 169 170 envbackup = {} 171 fakeroot = False 172 fakeenv = {} 173 umask = None 174 175 uid = os.getuid() 176 gid = os.getgid() 177 178 taskdep = runtask['taskdep'] 179 if 'umask' in taskdep and taskname in taskdep['umask']: 180 umask = taskdep['umask'][taskname] 181 elif workerdata["umask"]: 182 umask = workerdata["umask"] 183 if umask: 184 # umask might come in as a number or text string.. 185 try: 186 umask = int(umask, 8) 187 except TypeError: 188 pass 189 190 dry_run = cfg.dry_run or runtask['dry_run'] 191 192 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built 193 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: 194 fakeroot = True 195 envvars = (runtask['fakerootenv'] or "").split() 196 for key, value in (var.split('=',1) for var in envvars): 197 envbackup[key] = os.environ.get(key) 198 os.environ[key] = value 199 fakeenv[key] = value 200 201 fakedirs = (runtask['fakerootdirs'] or "").split() 202 for p in fakedirs: 203 bb.utils.mkdirhier(p) 204 logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % 205 (fn, taskname, ', '.join(fakedirs))) 206 else: 207 envvars = (runtask['fakerootnoenv'] or "").split() 208 for key, value in (var.split('=',1) for var in envvars): 209 envbackup[key] = os.environ.get(key) 210 os.environ[key] = value 211 fakeenv[key] = value 212 213 sys.stdout.flush() 214 sys.stderr.flush() 215 216 try: 217 pipein, pipeout = os.pipe() 218 pipein = os.fdopen(pipein, 'rb', 4096) 219 pipeout = os.fdopen(pipeout, 'wb', 0) 220 pid = os.fork() 221 except OSError as e: 222 logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) 223 sys.exit(1) 224 225 if pid == 0: 226 def child(): 227 global worker_pipe 228 global worker_pipe_lock 229 pipein.close() 230 231 bb.utils.signal_on_parent_exit("SIGTERM") 232 233 # Save out the PID so that the event can include it the 234 # events 235 bb.event.worker_pid = os.getpid() 236 bb.event.worker_fire = worker_child_fire 237 worker_pipe = pipeout 238 worker_pipe_lock = Lock() 239 240 # Make the child the process group leader and ensure no 241 # child process will be controlled by the current terminal 242 # This ensures signals sent to the controlling terminal like Ctrl+C 243 # don't stop the child processes. 244 os.setsid() 245 246 signal.signal(signal.SIGTERM, sigterm_handler) 247 # Let SIGHUP exit as SIGTERM 248 signal.signal(signal.SIGHUP, sigterm_handler) 249 250 # No stdin & stdout 251 # stdout is used as a status report channel and must not be used by child processes. 252 dumbio = os.open(os.devnull, os.O_RDWR) 253 os.dup2(dumbio, sys.stdin.fileno()) 254 os.dup2(dumbio, sys.stdout.fileno()) 255 256 if umask is not None: 257 os.umask(umask) 258 259 try: 260 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) 261 the_data = databuilder.mcdata[mc] 262 the_data.setVar("BB_WORKERCONTEXT", "1") 263 the_data.setVar("BB_TASKDEPDATA", taskdepdata) 264 the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", "")) 265 if cfg.limited_deps: 266 the_data.setVar("BB_LIMITEDDEPS", "1") 267 the_data.setVar("BUILDNAME", workerdata["buildname"]) 268 the_data.setVar("DATE", workerdata["date"]) 269 the_data.setVar("TIME", workerdata["time"]) 270 for varname, value in extraconfigdata.items(): 271 the_data.setVar(varname, value) 272 273 bb.parse.siggen.set_taskdata(workerdata["sigdata"]) 274 if "newhashes" in workerdata: 275 bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) 276 ret = 0 277 278 the_data = databuilder.parseRecipe(fn, appends, layername) 279 the_data.setVar('BB_TASKHASH', taskhash) 280 the_data.setVar('BB_UNIHASH', unihash) 281 bb.parse.siggen.setup_datacache_from_datastore(fn, the_data) 282 283 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) 284 285 if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')): 286 if bb.utils.is_local_uid(uid): 287 logger.debug("Attempting to disable network for %s" % taskname) 288 bb.utils.disable_network(uid, gid) 289 else: 290 logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid)) 291 292 # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 293 # successfully. We also need to unset anything from the environment which shouldn't be there 294 exports = bb.data.exported_vars(the_data) 295 296 bb.utils.empty_environment() 297 for e, v in exports: 298 os.environ[e] = v 299 300 for e in fakeenv: 301 os.environ[e] = fakeenv[e] 302 the_data.setVar(e, fakeenv[e]) 303 the_data.setVarFlag(e, 'export', "1") 304 305 task_exports = the_data.getVarFlag(taskname, 'exports') 306 if task_exports: 307 for e in task_exports.split(): 308 the_data.setVarFlag(e, 'export', '1') 309 v = the_data.getVar(e) 310 if v is not None: 311 os.environ[e] = v 312 313 if quieterrors: 314 the_data.setVarFlag(taskname, "quieterrors", "1") 315 316 except Exception: 317 if not quieterrors: 318 logger.critical(traceback.format_exc()) 319 os._exit(1) 320 321 sys.stdout.flush() 322 sys.stderr.flush() 323 324 try: 325 if dry_run: 326 return 0 327 try: 328 ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) 329 finally: 330 if fakeroot: 331 fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD")) 332 subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE) 333 return ret 334 except: 335 os._exit(1) 336 if not profiling: 337 os._exit(child()) 338 else: 339 profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) 340 prof = profile.Profile() 341 try: 342 ret = profile.Profile.runcall(prof, child) 343 finally: 344 prof.dump_stats(profname) 345 bb.utils.process_profilelog(profname) 346 os._exit(ret) 347 else: 348 for key, value in iter(envbackup.items()): 349 if value is None: 350 del os.environ[key] 351 else: 352 os.environ[key] = value 353 354 return pid, pipein, pipeout 355 356class runQueueWorkerPipe(): 357 """ 358 Abstraction for a pipe between a worker thread and the worker server 359 """ 360 def __init__(self, pipein, pipeout): 361 self.input = pipein 362 if pipeout: 363 pipeout.close() 364 bb.utils.nonblockingfd(self.input) 365 self.queue = bytearray() 366 367 def read(self): 368 start = len(self.queue) 369 try: 370 self.queue.extend(self.input.read(512*1024) or b"") 371 except (OSError, IOError) as e: 372 if e.errno != errno.EAGAIN: 373 raise 374 375 end = len(self.queue) 376 index = self.queue.find(b"</event>") 377 while index != -1: 378 msg = self.queue[:index+8] 379 assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1 380 worker_fire_prepickled(msg) 381 self.queue = self.queue[index+8:] 382 index = self.queue.find(b"</event>") 383 return (end > start) 384 385 def close(self): 386 while self.read(): 387 continue 388 if len(self.queue) > 0: 389 print("Warning, worker child left partial message: %s" % self.queue) 390 self.input.close() 391 392normalexit = False 393 394class BitbakeWorker(object): 395 def __init__(self, din): 396 self.input = din 397 bb.utils.nonblockingfd(self.input) 398 self.queue = bytearray() 399 self.cookercfg = None 400 self.databuilder = None 401 self.data = None 402 self.extraconfigdata = None 403 self.build_pids = {} 404 self.build_pipes = {} 405 406 signal.signal(signal.SIGTERM, self.sigterm_exception) 407 # Let SIGHUP exit as SIGTERM 408 signal.signal(signal.SIGHUP, self.sigterm_exception) 409 if "beef" in sys.argv[1]: 410 bb.utils.set_process_name("Worker (Fakeroot)") 411 else: 412 bb.utils.set_process_name("Worker") 413 414 def sigterm_exception(self, signum, stackframe): 415 if signum == signal.SIGTERM: 416 bb.warn("Worker received SIGTERM, shutting down...") 417 elif signum == signal.SIGHUP: 418 bb.warn("Worker received SIGHUP, shutting down...") 419 self.handle_finishnow(None) 420 signal.signal(signal.SIGTERM, signal.SIG_DFL) 421 os.kill(os.getpid(), signal.SIGTERM) 422 423 def serve(self): 424 while True: 425 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) 426 if self.input in ready: 427 try: 428 r = self.input.read() 429 if len(r) == 0: 430 # EOF on pipe, server must have terminated 431 self.sigterm_exception(signal.SIGTERM, None) 432 self.queue.extend(r) 433 except (OSError, IOError): 434 pass 435 if len(self.queue): 436 self.handle_item(b"cookerconfig", self.handle_cookercfg) 437 self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) 438 self.handle_item(b"workerdata", self.handle_workerdata) 439 self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) 440 self.handle_item(b"runtask", self.handle_runtask) 441 self.handle_item(b"finishnow", self.handle_finishnow) 442 self.handle_item(b"ping", self.handle_ping) 443 self.handle_item(b"quit", self.handle_quit) 444 445 for pipe in self.build_pipes: 446 if self.build_pipes[pipe].input in ready: 447 self.build_pipes[pipe].read() 448 if len(self.build_pids): 449 while self.process_waitpid(): 450 continue 451 452 def handle_item(self, item, func): 453 opening_tag = b"<" + item + b">" 454 if not self.queue.startswith(opening_tag): 455 return 456 457 tag_len = len(opening_tag) 458 if len(self.queue) < tag_len + 4: 459 # we need to receive more data 460 return 461 header = self.queue[tag_len:tag_len + 4] 462 payload_len = int.from_bytes(header, 'big') 463 # closing tag has length (tag_len + 1) 464 if len(self.queue) < tag_len * 2 + 1 + payload_len: 465 # we need to receive more data 466 return 467 468 index = self.queue.find(b"</" + item + b">") 469 if index != -1: 470 try: 471 func(self.queue[(tag_len + 4):index]) 472 except pickle.UnpicklingError: 473 workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) 474 raise 475 self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):] 476 477 def handle_cookercfg(self, data): 478 self.cookercfg = pickle.loads(data) 479 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) 480 self.databuilder.parseBaseConfiguration(worker=True) 481 self.data = self.databuilder.data 482 483 def handle_extraconfigdata(self, data): 484 self.extraconfigdata = pickle.loads(data) 485 486 def handle_workerdata(self, data): 487 self.workerdata = pickle.loads(data) 488 bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"] 489 bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"] 490 bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] 491 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] 492 for mc in self.databuilder.mcdata: 493 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) 494 self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) 495 self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe") 496 497 def handle_newtaskhashes(self, data): 498 self.workerdata["newhashes"] = pickle.loads(data) 499 500 def handle_ping(self, _): 501 workerlog_write("Handling ping\n") 502 503 logger.warning("Pong from bitbake-worker!") 504 505 def handle_quit(self, data): 506 workerlog_write("Handling quit\n") 507 508 global normalexit 509 normalexit = True 510 sys.exit(0) 511 512 def handle_runtask(self, data): 513 runtask = pickle.loads(data) 514 515 fn = runtask['fn'] 516 task = runtask['task'] 517 taskname = runtask['taskname'] 518 519 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) 520 521 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask) 522 self.build_pids[pid] = task 523 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) 524 525 def process_waitpid(self): 526 """ 527 Return none is there are no processes awaiting result collection, otherwise 528 collect the process exit codes and close the information pipe. 529 """ 530 try: 531 pid, status = os.waitpid(-1, os.WNOHANG) 532 if pid == 0 or os.WIFSTOPPED(status): 533 return False 534 except OSError: 535 return False 536 537 workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) 538 539 if os.WIFEXITED(status): 540 status = os.WEXITSTATUS(status) 541 elif os.WIFSIGNALED(status): 542 # Per shell conventions for $?, when a process exits due to 543 # a signal, we return an exit code of 128 + SIGNUM 544 status = 128 + os.WTERMSIG(status) 545 546 task = self.build_pids[pid] 547 del self.build_pids[pid] 548 549 self.build_pipes[pid].close() 550 del self.build_pipes[pid] 551 552 worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>") 553 554 return True 555 556 def handle_finishnow(self, _): 557 if self.build_pids: 558 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) 559 for k, v in iter(self.build_pids.items()): 560 try: 561 os.kill(-k, signal.SIGTERM) 562 os.waitpid(-1, 0) 563 except: 564 pass 565 for pipe in self.build_pipes: 566 self.build_pipes[pipe].read() 567 568try: 569 worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) 570 if not profiling: 571 worker.serve() 572 else: 573 profname = "profile-worker.log" 574 prof = profile.Profile() 575 try: 576 profile.Profile.runcall(prof, worker.serve) 577 finally: 578 prof.dump_stats(profname) 579 bb.utils.process_profilelog(profname) 580except BaseException as e: 581 if not normalexit: 582 import traceback 583 sys.stderr.write(traceback.format_exc()) 584 sys.stderr.write(str(e)) 585finally: 586 worker_thread_exit = True 587 worker_thread.join() 588 589workerlog_write("exiting") 590if not normalexit: 591 sys.exit(1) 592sys.exit(0) 593