xref: /openbmc/openbmc/poky/bitbake/bin/bitbake-worker (revision f376f21a8c3687f1d30191905f79aedd2171fe4a)
1#!/usr/bin/env python3
2#
3# Copyright BitBake Contributors
4#
5# SPDX-License-Identifier: GPL-2.0-only
6#
7
8import os
9import sys
10import warnings
11warnings.simplefilter("default")
12warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*use.of.fork.*may.lead.to.deadlocks.in.the.child.*")
13sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
14from bb import fetch2
15import logging
16import bb
17import select
18import errno
19import signal
20import pickle
21import traceback
22import queue
23import shlex
24import subprocess
25import fcntl
26from multiprocessing import Lock
27from threading import Thread
28
29# Remove when we have a minimum of python 3.10
30if not hasattr(fcntl, 'F_SETPIPE_SZ'):
31    fcntl.F_SETPIPE_SZ = 1031
32
33bb.utils.check_system_locale()
34
35# Users shouldn't be running this code directly
36if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
37    print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
38    sys.exit(1)
39
40profiling = False
41if sys.argv[1].startswith("decafbadbad"):
42    profiling = True
43    try:
44        import cProfile as profile
45    except:
46        import profile
47
48# Unbuffer stdout to avoid log truncation in the event
49# of an unorderly exit as well as to provide timely
50# updates to log files for use with tail
51try:
52    if sys.stdout.name == '<stdout>':
53        fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
54        fl |= os.O_SYNC
55        fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
56        #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
57except:
58    pass
59
60logger = logging.getLogger("BitBake")
61
62worker_pipe = sys.stdout.fileno()
63bb.utils.nonblockingfd(worker_pipe)
64# Try to make the pipe buffers larger as it is much more efficient. If we can't
65# e.g. out of buffer space (/proc/sys/fs/pipe-user-pages-soft) then just pass over.
66try:
67    fcntl.fcntl(worker_pipe, fcntl.F_SETPIPE_SZ, 512 * 1024)
68except:
69    pass
70# Need to guard against multiprocessing being used in child processes
71# and multiple processes trying to write to the parent at the same time
72worker_pipe_lock = None
73
74handler = bb.event.LogHandler()
75logger.addHandler(handler)
76
77if 0:
78    # Code to write out a log file of all events passing through the worker
79    logfilename = "/tmp/workerlogfile"
80    format_str = "%(levelname)s: %(message)s"
81    conlogformat = bb.msg.BBLogFormatter(format_str)
82    consolelog = logging.FileHandler(logfilename)
83    consolelog.setFormatter(conlogformat)
84    logger.addHandler(consolelog)
85
86worker_queue = queue.Queue()
87
88def worker_fire(event, d):
89    data = b"<event>" + pickle.dumps(event) + b"</event>"
90    worker_fire_prepickled(data)
91
92def worker_fire_prepickled(event):
93    global worker_queue
94
95    worker_queue.put(event)
96
97#
98# We can end up with write contention with the cooker, it can be trying to send commands
99# and we can be trying to send event data back. Therefore use a separate thread for writing
100# back data to cooker.
101#
102worker_thread_exit = False
103
104def worker_flush(worker_queue):
105    worker_queue_int = bytearray()
106    global worker_pipe, worker_thread_exit
107
108    while True:
109        try:
110            worker_queue_int.extend(worker_queue.get(True, 1))
111        except queue.Empty:
112            pass
113        while (worker_queue_int or not worker_queue.empty()):
114            try:
115                (_, ready, _) = select.select([], [worker_pipe], [], 1)
116                if not worker_queue.empty():
117                    worker_queue_int.extend(worker_queue.get())
118                written = os.write(worker_pipe, worker_queue_int)
119                del worker_queue_int[0:written]
120            except (IOError, OSError) as e:
121                if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
122                    raise
123        if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
124            return
125
126worker_thread = Thread(target=worker_flush, args=(worker_queue,))
127worker_thread.start()
128
129def worker_child_fire(event, d):
130    global worker_pipe
131    global worker_pipe_lock
132
133    data = b"<event>" + pickle.dumps(event) + b"</event>"
134    try:
135        with bb.utils.lock_timeout(worker_pipe_lock):
136            while(len(data)):
137                written = worker_pipe.write(data)
138                data = data[written:]
139    except IOError:
140        sigterm_handler(None, None)
141        raise
142
143bb.event.worker_fire = worker_fire
144
145lf = None
146#lf = open("/tmp/workercommandlog", "w+")
147def workerlog_write(msg):
148    if lf:
149        lf.write(msg)
150        lf.flush()
151
152def sigterm_handler(signum, frame):
153    signal.signal(signal.SIGTERM, signal.SIG_DFL)
154    os.killpg(0, signal.SIGTERM)
155    sys.exit()
156
157def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask):
158
159    fn = runtask['fn']
160    task = runtask['task']
161    taskname = runtask['taskname']
162    taskhash = runtask['taskhash']
163    unihash = runtask['unihash']
164    appends = runtask['appends']
165    layername = runtask['layername']
166    taskdepdata = runtask['taskdepdata']
167    quieterrors = runtask['quieterrors']
168    # We need to setup the environment BEFORE the fork, since
169    # a fork() or exec*() activates PSEUDO...
170
171    envbackup = {}
172    fakeroot = False
173    fakeenv = {}
174    umask = None
175
176    uid = os.getuid()
177    gid = os.getgid()
178
179    taskdep = runtask['taskdep']
180    if 'umask' in taskdep and taskname in taskdep['umask']:
181        umask = taskdep['umask'][taskname]
182    elif workerdata["umask"]:
183        umask = workerdata["umask"]
184    if umask:
185        # Convert to a python numeric value as it could be a string
186        umask = bb.utils.to_filemode(umask)
187
188    dry_run = cfg.dry_run or runtask['dry_run']
189
190    # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
191    if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
192        fakeroot = True
193        envvars = (runtask['fakerootenv'] or "").split()
194        for key, value in (var.split('=',1) for var in envvars):
195            envbackup[key] = os.environ.get(key)
196            os.environ[key] = value
197            fakeenv[key] = value
198
199        fakedirs = (runtask['fakerootdirs'] or "").split()
200        for p in fakedirs:
201            bb.utils.mkdirhier(p)
202        logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' %
203                        (fn, taskname, ', '.join(fakedirs)))
204    else:
205        envvars = (runtask['fakerootnoenv'] or "").split()
206        for key, value in (var.split('=',1) for var in envvars):
207            envbackup[key] = os.environ.get(key)
208            os.environ[key] = value
209            fakeenv[key] = value
210
211    sys.stdout.flush()
212    sys.stderr.flush()
213
214    try:
215        pipein, pipeout = os.pipe()
216        pipein = os.fdopen(pipein, 'rb', 4096)
217        pipeout = os.fdopen(pipeout, 'wb', 0)
218        pid = os.fork()
219    except OSError as e:
220        logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
221        sys.exit(1)
222
223    if pid == 0:
224        def child():
225            global worker_pipe
226            global worker_pipe_lock
227            pipein.close()
228
229            bb.utils.signal_on_parent_exit("SIGTERM")
230
231            # Save out the PID so that the event can include it the
232            # events
233            bb.event.worker_pid = os.getpid()
234            bb.event.worker_fire = worker_child_fire
235            worker_pipe = pipeout
236            worker_pipe_lock = Lock()
237
238            # Make the child the process group leader and ensure no
239            # child process will be controlled by the current terminal
240            # This ensures signals sent to the controlling terminal like Ctrl+C
241            # don't stop the child processes.
242            os.setsid()
243
244            signal.signal(signal.SIGTERM, sigterm_handler)
245            # Let SIGHUP exit as SIGTERM
246            signal.signal(signal.SIGHUP, sigterm_handler)
247
248            # No stdin & stdout
249            # stdout is used as a status report channel and must not be used by child processes.
250            dumbio = os.open(os.devnull, os.O_RDWR)
251            os.dup2(dumbio, sys.stdin.fileno())
252            os.dup2(dumbio, sys.stdout.fileno())
253
254            if umask is not None:
255                os.umask(umask)
256
257            try:
258                (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
259                the_data = databuilder.mcdata[mc]
260                the_data.setVar("BB_WORKERCONTEXT", "1")
261                the_data.setVar("BB_TASKDEPDATA", taskdepdata)
262                the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", ""))
263                if cfg.limited_deps:
264                    the_data.setVar("BB_LIMITEDDEPS", "1")
265                the_data.setVar("BUILDNAME", workerdata["buildname"])
266                the_data.setVar("DATE", workerdata["date"])
267                the_data.setVar("TIME", workerdata["time"])
268                for varname, value in extraconfigdata.items():
269                    the_data.setVar(varname, value)
270
271                bb.parse.siggen.set_taskdata(workerdata["sigdata"])
272                if "newhashes" in workerdata:
273                    bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
274                ret = 0
275
276                the_data = databuilder.parseRecipe(fn, appends, layername)
277                the_data.setVar('BB_TASKHASH', taskhash)
278                the_data.setVar('BB_UNIHASH', unihash)
279                bb.parse.siggen.setup_datacache_from_datastore(fn, the_data)
280
281                bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
282
283                if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')):
284                    if bb.utils.is_local_uid(uid):
285                        logger.debug("Attempting to disable network for %s" % taskname)
286                        bb.utils.disable_network(uid, gid)
287                    else:
288                        logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid))
289
290                # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
291                # successfully. We also need to unset anything from the environment which shouldn't be there
292                exports = bb.data.exported_vars(the_data)
293
294                bb.utils.empty_environment()
295                for e, v in exports:
296                    os.environ[e] = v
297
298                for e in fakeenv:
299                    os.environ[e] = fakeenv[e]
300                    the_data.setVar(e, fakeenv[e])
301                    the_data.setVarFlag(e, 'export', "1")
302
303                task_exports = the_data.getVarFlag(taskname, 'exports')
304                if task_exports:
305                    for e in task_exports.split():
306                        the_data.setVarFlag(e, 'export', '1')
307                        v = the_data.getVar(e)
308                        if v is not None:
309                            os.environ[e] = v
310
311                if quieterrors:
312                    the_data.setVarFlag(taskname, "quieterrors", "1")
313
314            except Exception:
315                if not quieterrors:
316                    logger.critical(traceback.format_exc())
317                os._exit(1)
318
319            sys.stdout.flush()
320            sys.stderr.flush()
321
322            try:
323                if dry_run:
324                    return 0
325                try:
326                    ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
327                finally:
328                    if fakeroot:
329                        fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD"))
330                        subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE)
331                return ret
332            except:
333                os._exit(1)
334        if not profiling:
335            os._exit(child())
336        else:
337            profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
338            prof = profile.Profile()
339            try:
340                ret = profile.Profile.runcall(prof, child)
341            finally:
342                prof.dump_stats(profname)
343                bb.utils.process_profilelog(profname)
344                os._exit(ret)
345    else:
346        for key, value in iter(envbackup.items()):
347            if value is None:
348                del os.environ[key]
349            else:
350                os.environ[key] = value
351
352    return pid, pipein, pipeout
353
354class runQueueWorkerPipe():
355    """
356    Abstraction for a pipe between a worker thread and the worker server
357    """
358    def __init__(self, pipein, pipeout):
359        self.input = pipein
360        if pipeout:
361            pipeout.close()
362        bb.utils.nonblockingfd(self.input)
363        self.queue = bytearray()
364
365    def read(self):
366        start = len(self.queue)
367        try:
368            self.queue.extend(self.input.read(512*1024) or b"")
369        except (OSError, IOError) as e:
370            if e.errno != errno.EAGAIN:
371                raise
372
373        end = len(self.queue)
374        index = self.queue.find(b"</event>")
375        while index != -1:
376            msg = self.queue[:index+8]
377            assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1
378            worker_fire_prepickled(msg)
379            self.queue = self.queue[index+8:]
380            index = self.queue.find(b"</event>")
381        return (end > start)
382
383    def close(self):
384        while self.read():
385            continue
386        if len(self.queue) > 0:
387            print("Warning, worker child left partial message: %s" % self.queue)
388        self.input.close()
389
390normalexit = False
391
392class BitbakeWorker(object):
393    def __init__(self, din):
394        self.input = din
395        bb.utils.nonblockingfd(self.input)
396        self.queue = bytearray()
397        self.cookercfg = None
398        self.databuilder = None
399        self.data = None
400        self.extraconfigdata = None
401        self.build_pids = {}
402        self.build_pipes = {}
403
404        signal.signal(signal.SIGTERM, self.sigterm_exception)
405        # Let SIGHUP exit as SIGTERM
406        signal.signal(signal.SIGHUP, self.sigterm_exception)
407        if "beef" in sys.argv[1]:
408            bb.utils.set_process_name("Worker (Fakeroot)")
409        else:
410            bb.utils.set_process_name("Worker")
411
412    def sigterm_exception(self, signum, stackframe):
413        if signum == signal.SIGTERM:
414            bb.warn("Worker received SIGTERM, shutting down...")
415        elif signum == signal.SIGHUP:
416            bb.warn("Worker received SIGHUP, shutting down...")
417        self.handle_finishnow(None)
418        signal.signal(signal.SIGTERM, signal.SIG_DFL)
419        os.kill(os.getpid(), signal.SIGTERM)
420
421    def serve(self):
422        while True:
423            (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
424            if self.input in ready:
425                try:
426                    r = self.input.read()
427                    if len(r) == 0:
428                        # EOF on pipe, server must have terminated
429                        self.sigterm_exception(signal.SIGTERM, None)
430                    self.queue.extend(r)
431                except (OSError, IOError):
432                    pass
433            if len(self.queue):
434                self.handle_item(b"cookerconfig", self.handle_cookercfg)
435                self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
436                self.handle_item(b"workerdata", self.handle_workerdata)
437                self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
438                self.handle_item(b"runtask", self.handle_runtask)
439                self.handle_item(b"finishnow", self.handle_finishnow)
440                self.handle_item(b"ping", self.handle_ping)
441                self.handle_item(b"quit", self.handle_quit)
442
443            for pipe in self.build_pipes:
444                if self.build_pipes[pipe].input in ready:
445                    self.build_pipes[pipe].read()
446            if len(self.build_pids):
447                while self.process_waitpid():
448                    continue
449
450    def handle_item(self, item, func):
451        opening_tag = b"<" + item + b">"
452        if not self.queue.startswith(opening_tag):
453            return
454
455        tag_len = len(opening_tag)
456        if len(self.queue) < tag_len + 4:
457            # we need to receive more data
458            return
459        header = self.queue[tag_len:tag_len + 4]
460        payload_len = int.from_bytes(header, 'big')
461        # closing tag has length (tag_len + 1)
462        if len(self.queue) < tag_len * 2 + 1 + payload_len:
463            # we need to receive more data
464            return
465
466        index = self.queue.find(b"</" + item + b">")
467        if index != -1:
468            try:
469                func(self.queue[(tag_len + 4):index])
470            except pickle.UnpicklingError:
471                workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
472                raise
473            self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
474
475    def handle_cookercfg(self, data):
476        self.cookercfg = pickle.loads(data)
477        self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
478        self.databuilder.parseBaseConfiguration(worker=True)
479        self.data = self.databuilder.data
480
481    def handle_extraconfigdata(self, data):
482        self.extraconfigdata = pickle.loads(data)
483
484    def handle_workerdata(self, data):
485        self.workerdata = pickle.loads(data)
486        bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"]
487        bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"]
488        bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
489        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
490        for mc in self.databuilder.mcdata:
491            self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
492            self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
493            self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe")
494
495    def handle_newtaskhashes(self, data):
496        self.workerdata["newhashes"] = pickle.loads(data)
497
498    def handle_ping(self, _):
499        workerlog_write("Handling ping\n")
500
501        logger.warning("Pong from bitbake-worker!")
502
503    def handle_quit(self, data):
504        workerlog_write("Handling quit\n")
505
506        global normalexit
507        normalexit = True
508        sys.exit(0)
509
510    def handle_runtask(self, data):
511        runtask = pickle.loads(data)
512
513        fn = runtask['fn']
514        task = runtask['task']
515        taskname = runtask['taskname']
516
517        workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
518
519        pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask)
520        self.build_pids[pid] = task
521        self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
522
523    def process_waitpid(self):
524        """
525        Return none is there are no processes awaiting result collection, otherwise
526        collect the process exit codes and close the information pipe.
527        """
528        try:
529            pid, status = os.waitpid(-1, os.WNOHANG)
530            if pid == 0 or os.WIFSTOPPED(status):
531                return False
532        except OSError:
533            return False
534
535        workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
536
537        if os.WIFEXITED(status):
538            status = os.WEXITSTATUS(status)
539        elif os.WIFSIGNALED(status):
540            # Per shell conventions for $?, when a process exits due to
541            # a signal, we return an exit code of 128 + SIGNUM
542            status = 128 + os.WTERMSIG(status)
543
544        task = self.build_pids[pid]
545        del self.build_pids[pid]
546
547        self.build_pipes[pid].close()
548        del self.build_pipes[pid]
549
550        worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
551
552        return True
553
554    def handle_finishnow(self, _):
555        if self.build_pids:
556            logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
557            for k, v in iter(self.build_pids.items()):
558                try:
559                    os.kill(-k, signal.SIGTERM)
560                    os.waitpid(-1, 0)
561                except:
562                    pass
563        for pipe in self.build_pipes:
564            self.build_pipes[pipe].read()
565
566try:
567    worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
568    if not profiling:
569        worker.serve()
570    else:
571        profname = "profile-worker.log"
572        prof = profile.Profile()
573        try:
574            profile.Profile.runcall(prof, worker.serve)
575        finally:
576            prof.dump_stats(profname)
577            bb.utils.process_profilelog(profname)
578except BaseException as e:
579    if not normalexit:
580        import traceback
581        sys.stderr.write(traceback.format_exc())
582        sys.stderr.write(str(e))
583finally:
584    worker_thread_exit = True
585    worker_thread.join()
586
587workerlog_write("exiting")
588if not normalexit:
589    sys.exit(1)
590sys.exit(0)
591