1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: GPL-2.0-only 4# 5 6import os 7import sys 8import warnings 9sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) 10from bb import fetch2 11import logging 12import bb 13import select 14import errno 15import signal 16import pickle 17import traceback 18import queue 19from multiprocessing import Lock 20from threading import Thread 21 22if sys.getfilesystemencoding() != "utf-8": 23 sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.") 24 25# Users shouldn't be running this code directly 26if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): 27 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") 28 sys.exit(1) 29 30profiling = False 31if sys.argv[1].startswith("decafbadbad"): 32 profiling = True 33 try: 34 import cProfile as profile 35 except: 36 import profile 37 38# Unbuffer stdout to avoid log truncation in the event 39# of an unorderly exit as well as to provide timely 40# updates to log files for use with tail 41try: 42 if sys.stdout.name == '<stdout>': 43 import fcntl 44 fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) 45 fl |= os.O_SYNC 46 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) 47 #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) 48except: 49 pass 50 51logger = logging.getLogger("BitBake") 52 53worker_pipe = sys.stdout.fileno() 54bb.utils.nonblockingfd(worker_pipe) 55# Need to guard against multiprocessing being used in child processes 56# and multiple processes trying to write to the parent at the same time 57worker_pipe_lock = None 58 59handler = bb.event.LogHandler() 60logger.addHandler(handler) 61 62if 0: 63 # Code to write out a log file of all events passing through the worker 64 logfilename = "/tmp/workerlogfile" 65 format_str = "%(levelname)s: %(message)s" 66 conlogformat = bb.msg.BBLogFormatter(format_str) 67 consolelog = logging.FileHandler(logfilename) 68 bb.msg.addDefaultlogFilter(consolelog) 69 consolelog.setFormatter(conlogformat) 70 logger.addHandler(consolelog) 71 72worker_queue = queue.Queue() 73 74def worker_fire(event, d): 75 data = b"<event>" + pickle.dumps(event) + b"</event>" 76 worker_fire_prepickled(data) 77 78def worker_fire_prepickled(event): 79 global worker_queue 80 81 worker_queue.put(event) 82 83# 84# We can end up with write contention with the cooker, it can be trying to send commands 85# and we can be trying to send event data back. Therefore use a separate thread for writing 86# back data to cooker. 87# 88worker_thread_exit = False 89 90def worker_flush(worker_queue): 91 worker_queue_int = b"" 92 global worker_pipe, worker_thread_exit 93 94 while True: 95 try: 96 worker_queue_int = worker_queue_int + worker_queue.get(True, 1) 97 except queue.Empty: 98 pass 99 while (worker_queue_int or not worker_queue.empty()): 100 try: 101 (_, ready, _) = select.select([], [worker_pipe], [], 1) 102 if not worker_queue.empty(): 103 worker_queue_int = worker_queue_int + worker_queue.get() 104 written = os.write(worker_pipe, worker_queue_int) 105 worker_queue_int = worker_queue_int[written:] 106 except (IOError, OSError) as e: 107 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: 108 raise 109 if worker_thread_exit and worker_queue.empty() and not worker_queue_int: 110 return 111 112worker_thread = Thread(target=worker_flush, args=(worker_queue,)) 113worker_thread.start() 114 115def worker_child_fire(event, d): 116 global worker_pipe 117 global worker_pipe_lock 118 119 data = b"<event>" + pickle.dumps(event) + b"</event>" 120 try: 121 worker_pipe_lock.acquire() 122 worker_pipe.write(data) 123 worker_pipe_lock.release() 124 except IOError: 125 sigterm_handler(None, None) 126 raise 127 128bb.event.worker_fire = worker_fire 129 130lf = None 131#lf = open("/tmp/workercommandlog", "w+") 132def workerlog_write(msg): 133 if lf: 134 lf.write(msg) 135 lf.flush() 136 137def sigterm_handler(signum, frame): 138 signal.signal(signal.SIGTERM, signal.SIG_DFL) 139 os.killpg(0, signal.SIGTERM) 140 sys.exit() 141 142def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False): 143 # We need to setup the environment BEFORE the fork, since 144 # a fork() or exec*() activates PSEUDO... 145 146 envbackup = {} 147 fakeenv = {} 148 umask = None 149 150 taskdep = workerdata["taskdeps"][fn] 151 if 'umask' in taskdep and taskname in taskdep['umask']: 152 # umask might come in as a number or text string.. 153 try: 154 umask = int(taskdep['umask'][taskname],8) 155 except TypeError: 156 umask = taskdep['umask'][taskname] 157 158 dry_run = cfg.dry_run or dry_run_exec 159 160 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built 161 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: 162 envvars = (workerdata["fakerootenv"][fn] or "").split() 163 for key, value in (var.split('=') for var in envvars): 164 envbackup[key] = os.environ.get(key) 165 os.environ[key] = value 166 fakeenv[key] = value 167 168 fakedirs = (workerdata["fakerootdirs"][fn] or "").split() 169 for p in fakedirs: 170 bb.utils.mkdirhier(p) 171 logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % 172 (fn, taskname, ', '.join(fakedirs))) 173 else: 174 envvars = (workerdata["fakerootnoenv"][fn] or "").split() 175 for key, value in (var.split('=') for var in envvars): 176 envbackup[key] = os.environ.get(key) 177 os.environ[key] = value 178 fakeenv[key] = value 179 180 sys.stdout.flush() 181 sys.stderr.flush() 182 183 try: 184 pipein, pipeout = os.pipe() 185 pipein = os.fdopen(pipein, 'rb', 4096) 186 pipeout = os.fdopen(pipeout, 'wb', 0) 187 pid = os.fork() 188 except OSError as e: 189 logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) 190 sys.exit(1) 191 192 if pid == 0: 193 def child(): 194 global worker_pipe 195 global worker_pipe_lock 196 pipein.close() 197 198 signal.signal(signal.SIGTERM, sigterm_handler) 199 # Let SIGHUP exit as SIGTERM 200 signal.signal(signal.SIGHUP, sigterm_handler) 201 bb.utils.signal_on_parent_exit("SIGTERM") 202 203 # Save out the PID so that the event can include it the 204 # events 205 bb.event.worker_pid = os.getpid() 206 bb.event.worker_fire = worker_child_fire 207 worker_pipe = pipeout 208 worker_pipe_lock = Lock() 209 210 # Make the child the process group leader and ensure no 211 # child process will be controlled by the current terminal 212 # This ensures signals sent to the controlling terminal like Ctrl+C 213 # don't stop the child processes. 214 os.setsid() 215 # No stdin 216 newsi = os.open(os.devnull, os.O_RDWR) 217 os.dup2(newsi, sys.stdin.fileno()) 218 219 if umask: 220 os.umask(umask) 221 222 try: 223 bb_cache = bb.cache.NoCache(databuilder) 224 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) 225 the_data = databuilder.mcdata[mc] 226 the_data.setVar("BB_WORKERCONTEXT", "1") 227 the_data.setVar("BB_TASKDEPDATA", taskdepdata) 228 if cfg.limited_deps: 229 the_data.setVar("BB_LIMITEDDEPS", "1") 230 the_data.setVar("BUILDNAME", workerdata["buildname"]) 231 the_data.setVar("DATE", workerdata["date"]) 232 the_data.setVar("TIME", workerdata["time"]) 233 for varname, value in extraconfigdata.items(): 234 the_data.setVar(varname, value) 235 236 bb.parse.siggen.set_taskdata(workerdata["sigdata"]) 237 ret = 0 238 239 the_data = bb_cache.loadDataFull(fn, appends) 240 the_data.setVar('BB_TASKHASH', taskhash) 241 the_data.setVar('BB_UNIHASH', unihash) 242 243 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) 244 245 # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 246 # successfully. We also need to unset anything from the environment which shouldn't be there 247 exports = bb.data.exported_vars(the_data) 248 249 bb.utils.empty_environment() 250 for e, v in exports: 251 os.environ[e] = v 252 253 for e in fakeenv: 254 os.environ[e] = fakeenv[e] 255 the_data.setVar(e, fakeenv[e]) 256 the_data.setVarFlag(e, 'export', "1") 257 258 task_exports = the_data.getVarFlag(taskname, 'exports') 259 if task_exports: 260 for e in task_exports.split(): 261 the_data.setVarFlag(e, 'export', '1') 262 v = the_data.getVar(e) 263 if v is not None: 264 os.environ[e] = v 265 266 if quieterrors: 267 the_data.setVarFlag(taskname, "quieterrors", "1") 268 269 except Exception: 270 if not quieterrors: 271 logger.critical(traceback.format_exc()) 272 os._exit(1) 273 try: 274 if dry_run: 275 return 0 276 return bb.build.exec_task(fn, taskname, the_data, cfg.profile) 277 except: 278 os._exit(1) 279 if not profiling: 280 os._exit(child()) 281 else: 282 profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) 283 prof = profile.Profile() 284 try: 285 ret = profile.Profile.runcall(prof, child) 286 finally: 287 prof.dump_stats(profname) 288 bb.utils.process_profilelog(profname) 289 os._exit(ret) 290 else: 291 for key, value in iter(envbackup.items()): 292 if value is None: 293 del os.environ[key] 294 else: 295 os.environ[key] = value 296 297 return pid, pipein, pipeout 298 299class runQueueWorkerPipe(): 300 """ 301 Abstraction for a pipe between a worker thread and the worker server 302 """ 303 def __init__(self, pipein, pipeout): 304 self.input = pipein 305 if pipeout: 306 pipeout.close() 307 bb.utils.nonblockingfd(self.input) 308 self.queue = b"" 309 310 def read(self): 311 start = len(self.queue) 312 try: 313 self.queue = self.queue + (self.input.read(102400) or b"") 314 except (OSError, IOError) as e: 315 if e.errno != errno.EAGAIN: 316 raise 317 318 end = len(self.queue) 319 index = self.queue.find(b"</event>") 320 while index != -1: 321 worker_fire_prepickled(self.queue[:index+8]) 322 self.queue = self.queue[index+8:] 323 index = self.queue.find(b"</event>") 324 return (end > start) 325 326 def close(self): 327 while self.read(): 328 continue 329 if len(self.queue) > 0: 330 print("Warning, worker child left partial message: %s" % self.queue) 331 self.input.close() 332 333normalexit = False 334 335class BitbakeWorker(object): 336 def __init__(self, din): 337 self.input = din 338 bb.utils.nonblockingfd(self.input) 339 self.queue = b"" 340 self.cookercfg = None 341 self.databuilder = None 342 self.data = None 343 self.extraconfigdata = None 344 self.build_pids = {} 345 self.build_pipes = {} 346 347 signal.signal(signal.SIGTERM, self.sigterm_exception) 348 # Let SIGHUP exit as SIGTERM 349 signal.signal(signal.SIGHUP, self.sigterm_exception) 350 if "beef" in sys.argv[1]: 351 bb.utils.set_process_name("Worker (Fakeroot)") 352 else: 353 bb.utils.set_process_name("Worker") 354 355 def sigterm_exception(self, signum, stackframe): 356 if signum == signal.SIGTERM: 357 bb.warn("Worker received SIGTERM, shutting down...") 358 elif signum == signal.SIGHUP: 359 bb.warn("Worker received SIGHUP, shutting down...") 360 self.handle_finishnow(None) 361 signal.signal(signal.SIGTERM, signal.SIG_DFL) 362 os.kill(os.getpid(), signal.SIGTERM) 363 364 def serve(self): 365 while True: 366 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) 367 if self.input in ready: 368 try: 369 r = self.input.read() 370 if len(r) == 0: 371 # EOF on pipe, server must have terminated 372 self.sigterm_exception(signal.SIGTERM, None) 373 self.queue = self.queue + r 374 except (OSError, IOError): 375 pass 376 if len(self.queue): 377 self.handle_item(b"cookerconfig", self.handle_cookercfg) 378 self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) 379 self.handle_item(b"workerdata", self.handle_workerdata) 380 self.handle_item(b"runtask", self.handle_runtask) 381 self.handle_item(b"finishnow", self.handle_finishnow) 382 self.handle_item(b"ping", self.handle_ping) 383 self.handle_item(b"quit", self.handle_quit) 384 385 for pipe in self.build_pipes: 386 if self.build_pipes[pipe].input in ready: 387 self.build_pipes[pipe].read() 388 if len(self.build_pids): 389 while self.process_waitpid(): 390 continue 391 392 393 def handle_item(self, item, func): 394 if self.queue.startswith(b"<" + item + b">"): 395 index = self.queue.find(b"</" + item + b">") 396 while index != -1: 397 func(self.queue[(len(item) + 2):index]) 398 self.queue = self.queue[(index + len(item) + 3):] 399 index = self.queue.find(b"</" + item + b">") 400 401 def handle_cookercfg(self, data): 402 self.cookercfg = pickle.loads(data) 403 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) 404 self.databuilder.parseBaseConfiguration() 405 self.data = self.databuilder.data 406 407 def handle_extraconfigdata(self, data): 408 self.extraconfigdata = pickle.loads(data) 409 410 def handle_workerdata(self, data): 411 self.workerdata = pickle.loads(data) 412 bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"] 413 bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"] 414 bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"] 415 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] 416 for mc in self.databuilder.mcdata: 417 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) 418 419 def handle_ping(self, _): 420 workerlog_write("Handling ping\n") 421 422 logger.warning("Pong from bitbake-worker!") 423 424 def handle_quit(self, data): 425 workerlog_write("Handling quit\n") 426 427 global normalexit 428 normalexit = True 429 sys.exit(0) 430 431 def handle_runtask(self, data): 432 fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) 433 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) 434 435 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) 436 437 self.build_pids[pid] = task 438 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) 439 440 def process_waitpid(self): 441 """ 442 Return none is there are no processes awaiting result collection, otherwise 443 collect the process exit codes and close the information pipe. 444 """ 445 try: 446 pid, status = os.waitpid(-1, os.WNOHANG) 447 if pid == 0 or os.WIFSTOPPED(status): 448 return False 449 except OSError: 450 return False 451 452 workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) 453 454 if os.WIFEXITED(status): 455 status = os.WEXITSTATUS(status) 456 elif os.WIFSIGNALED(status): 457 # Per shell conventions for $?, when a process exits due to 458 # a signal, we return an exit code of 128 + SIGNUM 459 status = 128 + os.WTERMSIG(status) 460 461 task = self.build_pids[pid] 462 del self.build_pids[pid] 463 464 self.build_pipes[pid].close() 465 del self.build_pipes[pid] 466 467 worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>") 468 469 return True 470 471 def handle_finishnow(self, _): 472 if self.build_pids: 473 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) 474 for k, v in iter(self.build_pids.items()): 475 try: 476 os.kill(-k, signal.SIGTERM) 477 os.waitpid(-1, 0) 478 except: 479 pass 480 for pipe in self.build_pipes: 481 self.build_pipes[pipe].read() 482 483try: 484 worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) 485 if not profiling: 486 worker.serve() 487 else: 488 profname = "profile-worker.log" 489 prof = profile.Profile() 490 try: 491 profile.Profile.runcall(prof, worker.serve) 492 finally: 493 prof.dump_stats(profname) 494 bb.utils.process_profilelog(profname) 495except BaseException as e: 496 if not normalexit: 497 import traceback 498 sys.stderr.write(traceback.format_exc()) 499 sys.stderr.write(str(e)) 500 501worker_thread_exit = True 502worker_thread.join() 503 504workerlog_write("exitting") 505sys.exit(0) 506