1#!/usr/bin/env python3 2# 3# Copyright BitBake Contributors 4# 5# SPDX-License-Identifier: GPL-2.0-only 6# 7 8import os 9import sys 10import warnings 11warnings.simplefilter("default") 12sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) 13from bb import fetch2 14import logging 15import bb 16import select 17import errno 18import signal 19import pickle 20import traceback 21import queue 22import shlex 23import subprocess 24from multiprocessing import Lock 25from threading import Thread 26 27bb.utils.check_system_locale() 28 29# Users shouldn't be running this code directly 30if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): 31 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") 32 sys.exit(1) 33 34profiling = False 35if sys.argv[1].startswith("decafbadbad"): 36 profiling = True 37 try: 38 import cProfile as profile 39 except: 40 import profile 41 42# Unbuffer stdout to avoid log truncation in the event 43# of an unorderly exit as well as to provide timely 44# updates to log files for use with tail 45try: 46 if sys.stdout.name == '<stdout>': 47 import fcntl 48 fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) 49 fl |= os.O_SYNC 50 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) 51 #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) 52except: 53 pass 54 55logger = logging.getLogger("BitBake") 56 57worker_pipe = sys.stdout.fileno() 58bb.utils.nonblockingfd(worker_pipe) 59# Need to guard against multiprocessing being used in child processes 60# and multiple processes trying to write to the parent at the same time 61worker_pipe_lock = None 62 63handler = bb.event.LogHandler() 64logger.addHandler(handler) 65 66if 0: 67 # Code to write out a log file of all events passing through the worker 68 logfilename = "/tmp/workerlogfile" 69 format_str = "%(levelname)s: %(message)s" 70 conlogformat = bb.msg.BBLogFormatter(format_str) 71 consolelog = logging.FileHandler(logfilename) 72 consolelog.setFormatter(conlogformat) 73 logger.addHandler(consolelog) 74 75worker_queue = queue.Queue() 76 77def worker_fire(event, d): 78 data = b"<event>" + pickle.dumps(event) + b"</event>" 79 worker_fire_prepickled(data) 80 81def worker_fire_prepickled(event): 82 global worker_queue 83 84 worker_queue.put(event) 85 86# 87# We can end up with write contention with the cooker, it can be trying to send commands 88# and we can be trying to send event data back. Therefore use a separate thread for writing 89# back data to cooker. 90# 91worker_thread_exit = False 92 93def worker_flush(worker_queue): 94 worker_queue_int = bytearray() 95 global worker_pipe, worker_thread_exit 96 97 while True: 98 try: 99 worker_queue_int.extend(worker_queue.get(True, 1)) 100 except queue.Empty: 101 pass 102 while (worker_queue_int or not worker_queue.empty()): 103 try: 104 (_, ready, _) = select.select([], [worker_pipe], [], 1) 105 if not worker_queue.empty(): 106 worker_queue_int.extend(worker_queue.get()) 107 written = os.write(worker_pipe, worker_queue_int) 108 worker_queue_int = worker_queue_int[written:] 109 except (IOError, OSError) as e: 110 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: 111 raise 112 if worker_thread_exit and worker_queue.empty() and not worker_queue_int: 113 return 114 115worker_thread = Thread(target=worker_flush, args=(worker_queue,)) 116worker_thread.start() 117 118def worker_child_fire(event, d): 119 global worker_pipe 120 global worker_pipe_lock 121 122 data = b"<event>" + pickle.dumps(event) + b"</event>" 123 try: 124 with bb.utils.lock_timeout(worker_pipe_lock): 125 while(len(data)): 126 written = worker_pipe.write(data) 127 data = data[written:] 128 except IOError: 129 sigterm_handler(None, None) 130 raise 131 132bb.event.worker_fire = worker_fire 133 134lf = None 135#lf = open("/tmp/workercommandlog", "w+") 136def workerlog_write(msg): 137 if lf: 138 lf.write(msg) 139 lf.flush() 140 141def sigterm_handler(signum, frame): 142 signal.signal(signal.SIGTERM, signal.SIG_DFL) 143 os.killpg(0, signal.SIGTERM) 144 sys.exit() 145 146def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask): 147 148 fn = runtask['fn'] 149 task = runtask['task'] 150 taskname = runtask['taskname'] 151 taskhash = runtask['taskhash'] 152 unihash = runtask['unihash'] 153 appends = runtask['appends'] 154 layername = runtask['layername'] 155 taskdepdata = runtask['taskdepdata'] 156 quieterrors = runtask['quieterrors'] 157 # We need to setup the environment BEFORE the fork, since 158 # a fork() or exec*() activates PSEUDO... 159 160 envbackup = {} 161 fakeroot = False 162 fakeenv = {} 163 umask = None 164 165 uid = os.getuid() 166 gid = os.getgid() 167 168 taskdep = runtask['taskdep'] 169 if 'umask' in taskdep and taskname in taskdep['umask']: 170 umask = taskdep['umask'][taskname] 171 elif workerdata["umask"]: 172 umask = workerdata["umask"] 173 if umask: 174 # umask might come in as a number or text string.. 175 try: 176 umask = int(umask, 8) 177 except TypeError: 178 pass 179 180 dry_run = cfg.dry_run or runtask['dry_run'] 181 182 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built 183 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: 184 fakeroot = True 185 envvars = (runtask['fakerootenv'] or "").split() 186 for key, value in (var.split('=',1) for var in envvars): 187 envbackup[key] = os.environ.get(key) 188 os.environ[key] = value 189 fakeenv[key] = value 190 191 fakedirs = (runtask['fakerootdirs'] or "").split() 192 for p in fakedirs: 193 bb.utils.mkdirhier(p) 194 logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % 195 (fn, taskname, ', '.join(fakedirs))) 196 else: 197 envvars = (runtask['fakerootnoenv'] or "").split() 198 for key, value in (var.split('=',1) for var in envvars): 199 envbackup[key] = os.environ.get(key) 200 os.environ[key] = value 201 fakeenv[key] = value 202 203 sys.stdout.flush() 204 sys.stderr.flush() 205 206 try: 207 pipein, pipeout = os.pipe() 208 pipein = os.fdopen(pipein, 'rb', 4096) 209 pipeout = os.fdopen(pipeout, 'wb', 0) 210 pid = os.fork() 211 except OSError as e: 212 logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) 213 sys.exit(1) 214 215 if pid == 0: 216 def child(): 217 global worker_pipe 218 global worker_pipe_lock 219 pipein.close() 220 221 bb.utils.signal_on_parent_exit("SIGTERM") 222 223 # Save out the PID so that the event can include it the 224 # events 225 bb.event.worker_pid = os.getpid() 226 bb.event.worker_fire = worker_child_fire 227 worker_pipe = pipeout 228 worker_pipe_lock = Lock() 229 230 # Make the child the process group leader and ensure no 231 # child process will be controlled by the current terminal 232 # This ensures signals sent to the controlling terminal like Ctrl+C 233 # don't stop the child processes. 234 os.setsid() 235 236 signal.signal(signal.SIGTERM, sigterm_handler) 237 # Let SIGHUP exit as SIGTERM 238 signal.signal(signal.SIGHUP, sigterm_handler) 239 240 # No stdin & stdout 241 # stdout is used as a status report channel and must not be used by child processes. 242 dumbio = os.open(os.devnull, os.O_RDWR) 243 os.dup2(dumbio, sys.stdin.fileno()) 244 os.dup2(dumbio, sys.stdout.fileno()) 245 246 if umask is not None: 247 os.umask(umask) 248 249 try: 250 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) 251 the_data = databuilder.mcdata[mc] 252 the_data.setVar("BB_WORKERCONTEXT", "1") 253 the_data.setVar("BB_TASKDEPDATA", taskdepdata) 254 the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", "")) 255 if cfg.limited_deps: 256 the_data.setVar("BB_LIMITEDDEPS", "1") 257 the_data.setVar("BUILDNAME", workerdata["buildname"]) 258 the_data.setVar("DATE", workerdata["date"]) 259 the_data.setVar("TIME", workerdata["time"]) 260 for varname, value in extraconfigdata.items(): 261 the_data.setVar(varname, value) 262 263 bb.parse.siggen.set_taskdata(workerdata["sigdata"]) 264 if "newhashes" in workerdata: 265 bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) 266 ret = 0 267 268 the_data = databuilder.parseRecipe(fn, appends, layername) 269 the_data.setVar('BB_TASKHASH', taskhash) 270 the_data.setVar('BB_UNIHASH', unihash) 271 bb.parse.siggen.setup_datacache_from_datastore(fn, the_data) 272 273 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) 274 275 if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')): 276 if bb.utils.is_local_uid(uid): 277 logger.debug("Attempting to disable network for %s" % taskname) 278 bb.utils.disable_network(uid, gid) 279 else: 280 logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid)) 281 282 # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 283 # successfully. We also need to unset anything from the environment which shouldn't be there 284 exports = bb.data.exported_vars(the_data) 285 286 bb.utils.empty_environment() 287 for e, v in exports: 288 os.environ[e] = v 289 290 for e in fakeenv: 291 os.environ[e] = fakeenv[e] 292 the_data.setVar(e, fakeenv[e]) 293 the_data.setVarFlag(e, 'export', "1") 294 295 task_exports = the_data.getVarFlag(taskname, 'exports') 296 if task_exports: 297 for e in task_exports.split(): 298 the_data.setVarFlag(e, 'export', '1') 299 v = the_data.getVar(e) 300 if v is not None: 301 os.environ[e] = v 302 303 if quieterrors: 304 the_data.setVarFlag(taskname, "quieterrors", "1") 305 306 except Exception: 307 if not quieterrors: 308 logger.critical(traceback.format_exc()) 309 os._exit(1) 310 311 sys.stdout.flush() 312 sys.stderr.flush() 313 314 try: 315 if dry_run: 316 return 0 317 try: 318 ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) 319 finally: 320 if fakeroot: 321 fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD")) 322 subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE) 323 return ret 324 except: 325 os._exit(1) 326 if not profiling: 327 os._exit(child()) 328 else: 329 profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) 330 prof = profile.Profile() 331 try: 332 ret = profile.Profile.runcall(prof, child) 333 finally: 334 prof.dump_stats(profname) 335 bb.utils.process_profilelog(profname) 336 os._exit(ret) 337 else: 338 for key, value in iter(envbackup.items()): 339 if value is None: 340 del os.environ[key] 341 else: 342 os.environ[key] = value 343 344 return pid, pipein, pipeout 345 346class runQueueWorkerPipe(): 347 """ 348 Abstraction for a pipe between a worker thread and the worker server 349 """ 350 def __init__(self, pipein, pipeout): 351 self.input = pipein 352 if pipeout: 353 pipeout.close() 354 bb.utils.nonblockingfd(self.input) 355 self.queue = bytearray() 356 357 def read(self): 358 start = len(self.queue) 359 try: 360 self.queue.extend(self.input.read(102400) or b"") 361 except (OSError, IOError) as e: 362 if e.errno != errno.EAGAIN: 363 raise 364 365 end = len(self.queue) 366 index = self.queue.find(b"</event>") 367 while index != -1: 368 msg = self.queue[:index+8] 369 assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1 370 worker_fire_prepickled(msg) 371 self.queue = self.queue[index+8:] 372 index = self.queue.find(b"</event>") 373 return (end > start) 374 375 def close(self): 376 while self.read(): 377 continue 378 if len(self.queue) > 0: 379 print("Warning, worker child left partial message: %s" % self.queue) 380 self.input.close() 381 382normalexit = False 383 384class BitbakeWorker(object): 385 def __init__(self, din): 386 self.input = din 387 bb.utils.nonblockingfd(self.input) 388 self.queue = bytearray() 389 self.cookercfg = None 390 self.databuilder = None 391 self.data = None 392 self.extraconfigdata = None 393 self.build_pids = {} 394 self.build_pipes = {} 395 396 signal.signal(signal.SIGTERM, self.sigterm_exception) 397 # Let SIGHUP exit as SIGTERM 398 signal.signal(signal.SIGHUP, self.sigterm_exception) 399 if "beef" in sys.argv[1]: 400 bb.utils.set_process_name("Worker (Fakeroot)") 401 else: 402 bb.utils.set_process_name("Worker") 403 404 def sigterm_exception(self, signum, stackframe): 405 if signum == signal.SIGTERM: 406 bb.warn("Worker received SIGTERM, shutting down...") 407 elif signum == signal.SIGHUP: 408 bb.warn("Worker received SIGHUP, shutting down...") 409 self.handle_finishnow(None) 410 signal.signal(signal.SIGTERM, signal.SIG_DFL) 411 os.kill(os.getpid(), signal.SIGTERM) 412 413 def serve(self): 414 while True: 415 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) 416 if self.input in ready: 417 try: 418 r = self.input.read() 419 if len(r) == 0: 420 # EOF on pipe, server must have terminated 421 self.sigterm_exception(signal.SIGTERM, None) 422 self.queue.extend(r) 423 except (OSError, IOError): 424 pass 425 if len(self.queue): 426 self.handle_item(b"cookerconfig", self.handle_cookercfg) 427 self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) 428 self.handle_item(b"workerdata", self.handle_workerdata) 429 self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) 430 self.handle_item(b"runtask", self.handle_runtask) 431 self.handle_item(b"finishnow", self.handle_finishnow) 432 self.handle_item(b"ping", self.handle_ping) 433 self.handle_item(b"quit", self.handle_quit) 434 435 for pipe in self.build_pipes: 436 if self.build_pipes[pipe].input in ready: 437 self.build_pipes[pipe].read() 438 if len(self.build_pids): 439 while self.process_waitpid(): 440 continue 441 442 def handle_item(self, item, func): 443 opening_tag = b"<" + item + b">" 444 if not self.queue.startswith(opening_tag): 445 return 446 447 tag_len = len(opening_tag) 448 if len(self.queue) < tag_len + 4: 449 # we need to receive more data 450 return 451 header = self.queue[tag_len:tag_len + 4] 452 payload_len = int.from_bytes(header, 'big') 453 # closing tag has length (tag_len + 1) 454 if len(self.queue) < tag_len * 2 + 1 + payload_len: 455 # we need to receive more data 456 return 457 458 index = self.queue.find(b"</" + item + b">") 459 if index != -1: 460 try: 461 func(self.queue[(tag_len + 4):index]) 462 except pickle.UnpicklingError: 463 workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) 464 raise 465 self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):] 466 467 def handle_cookercfg(self, data): 468 self.cookercfg = pickle.loads(data) 469 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) 470 self.databuilder.parseBaseConfiguration(worker=True) 471 self.data = self.databuilder.data 472 473 def handle_extraconfigdata(self, data): 474 self.extraconfigdata = pickle.loads(data) 475 476 def handle_workerdata(self, data): 477 self.workerdata = pickle.loads(data) 478 bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"] 479 bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"] 480 bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] 481 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] 482 for mc in self.databuilder.mcdata: 483 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) 484 self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) 485 self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe") 486 487 def handle_newtaskhashes(self, data): 488 self.workerdata["newhashes"] = pickle.loads(data) 489 490 def handle_ping(self, _): 491 workerlog_write("Handling ping\n") 492 493 logger.warning("Pong from bitbake-worker!") 494 495 def handle_quit(self, data): 496 workerlog_write("Handling quit\n") 497 498 global normalexit 499 normalexit = True 500 sys.exit(0) 501 502 def handle_runtask(self, data): 503 runtask = pickle.loads(data) 504 505 fn = runtask['fn'] 506 task = runtask['task'] 507 taskname = runtask['taskname'] 508 509 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) 510 511 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask) 512 self.build_pids[pid] = task 513 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) 514 515 def process_waitpid(self): 516 """ 517 Return none is there are no processes awaiting result collection, otherwise 518 collect the process exit codes and close the information pipe. 519 """ 520 try: 521 pid, status = os.waitpid(-1, os.WNOHANG) 522 if pid == 0 or os.WIFSTOPPED(status): 523 return False 524 except OSError: 525 return False 526 527 workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) 528 529 if os.WIFEXITED(status): 530 status = os.WEXITSTATUS(status) 531 elif os.WIFSIGNALED(status): 532 # Per shell conventions for $?, when a process exits due to 533 # a signal, we return an exit code of 128 + SIGNUM 534 status = 128 + os.WTERMSIG(status) 535 536 task = self.build_pids[pid] 537 del self.build_pids[pid] 538 539 self.build_pipes[pid].close() 540 del self.build_pipes[pid] 541 542 worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>") 543 544 return True 545 546 def handle_finishnow(self, _): 547 if self.build_pids: 548 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) 549 for k, v in iter(self.build_pids.items()): 550 try: 551 os.kill(-k, signal.SIGTERM) 552 os.waitpid(-1, 0) 553 except: 554 pass 555 for pipe in self.build_pipes: 556 self.build_pipes[pipe].read() 557 558try: 559 worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) 560 if not profiling: 561 worker.serve() 562 else: 563 profname = "profile-worker.log" 564 prof = profile.Profile() 565 try: 566 profile.Profile.runcall(prof, worker.serve) 567 finally: 568 prof.dump_stats(profname) 569 bb.utils.process_profilelog(profname) 570except BaseException as e: 571 if not normalexit: 572 import traceback 573 sys.stderr.write(traceback.format_exc()) 574 sys.stderr.write(str(e)) 575finally: 576 worker_thread_exit = True 577 worker_thread.join() 578 579workerlog_write("exiting") 580if not normalexit: 581 sys.exit(1) 582sys.exit(0) 583