xref: /openbmc/openbmc/poky/bitbake/bin/bitbake-worker (revision c9537f57ab488bf5d90132917b0184e2527970a5)
1#!/usr/bin/env python3
2#
3# Copyright BitBake Contributors
4#
5# SPDX-License-Identifier: GPL-2.0-only
6#
7
8import os
9import sys
10import warnings
11warnings.simplefilter("default")
12warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*use.of.fork.*may.lead.to.deadlocks.in.the.child.*")
13sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
14from bb import fetch2
15import logging
16import bb
17import select
18import errno
19import signal
20import pickle
21import traceback
22import queue
23import shlex
24import subprocess
25import fcntl
26from multiprocessing import Lock
27from threading import Thread
28
29# Remove when we have a minimum of python 3.10
30if not hasattr(fcntl, 'F_SETPIPE_SZ'):
31    fcntl.F_SETPIPE_SZ = 1031
32
33bb.utils.check_system_locale()
34
35# Users shouldn't be running this code directly
36if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
37    print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
38    sys.exit(1)
39
40profiling = False
41if sys.argv[1].startswith("decafbadbad"):
42    profiling = True
43    try:
44        import cProfile as profile
45    except:
46        import profile
47
48# Unbuffer stdout to avoid log truncation in the event
49# of an unorderly exit as well as to provide timely
50# updates to log files for use with tail
51try:
52    if sys.stdout.name == '<stdout>':
53        fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
54        fl |= os.O_SYNC
55        fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
56        #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
57except:
58    pass
59
60logger = logging.getLogger("BitBake")
61
62worker_pipe = sys.stdout.fileno()
63bb.utils.nonblockingfd(worker_pipe)
64# Try to make the pipe buffers larger as it is much more efficient. If we can't
65# e.g. out of buffer space (/proc/sys/fs/pipe-user-pages-soft) then just pass over.
66try:
67    fcntl.fcntl(worker_pipe, fcntl.F_SETPIPE_SZ, 512 * 1024)
68except:
69    pass
70# Need to guard against multiprocessing being used in child processes
71# and multiple processes trying to write to the parent at the same time
72worker_pipe_lock = None
73
74handler = bb.event.LogHandler()
75logger.addHandler(handler)
76
77if 0:
78    # Code to write out a log file of all events passing through the worker
79    logfilename = "/tmp/workerlogfile"
80    format_str = "%(levelname)s: %(message)s"
81    conlogformat = bb.msg.BBLogFormatter(format_str)
82    consolelog = logging.FileHandler(logfilename)
83    consolelog.setFormatter(conlogformat)
84    logger.addHandler(consolelog)
85
86worker_queue = queue.Queue()
87
88def worker_fire(event, d):
89    data = b"<event>" + pickle.dumps(event) + b"</event>"
90    worker_fire_prepickled(data)
91
92def worker_fire_prepickled(event):
93    global worker_queue
94
95    worker_queue.put(event)
96
97#
98# We can end up with write contention with the cooker, it can be trying to send commands
99# and we can be trying to send event data back. Therefore use a separate thread for writing
100# back data to cooker.
101#
102worker_thread_exit = False
103
104def worker_flush(worker_queue):
105    worker_queue_int = bytearray()
106    global worker_pipe, worker_thread_exit
107
108    while True:
109        try:
110            worker_queue_int.extend(worker_queue.get(True, 1))
111        except queue.Empty:
112            pass
113        while (worker_queue_int or not worker_queue.empty()):
114            try:
115                (_, ready, _) = select.select([], [worker_pipe], [], 1)
116                if not worker_queue.empty():
117                    worker_queue_int.extend(worker_queue.get())
118                written = os.write(worker_pipe, worker_queue_int)
119                del worker_queue_int[0:written]
120            except (IOError, OSError) as e:
121                if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
122                    raise
123        if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
124            return
125
126worker_thread = Thread(target=worker_flush, args=(worker_queue,))
127worker_thread.start()
128
129def worker_child_fire(event, d):
130    global worker_pipe
131    global worker_pipe_lock
132
133    data = b"<event>" + pickle.dumps(event) + b"</event>"
134    try:
135        with bb.utils.lock_timeout(worker_pipe_lock):
136            while(len(data)):
137                written = worker_pipe.write(data)
138                data = data[written:]
139    except IOError:
140        sigterm_handler(None, None)
141        raise
142
143bb.event.worker_fire = worker_fire
144
145lf = None
146#lf = open("/tmp/workercommandlog", "w+")
147def workerlog_write(msg):
148    if lf:
149        lf.write(msg)
150        lf.flush()
151
152def sigterm_handler(signum, frame):
153    signal.signal(signal.SIGTERM, signal.SIG_DFL)
154    os.killpg(0, signal.SIGTERM)
155    sys.exit()
156
157def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask):
158
159    fn = runtask['fn']
160    task = runtask['task']
161    taskname = runtask['taskname']
162    taskhash = runtask['taskhash']
163    unihash = runtask['unihash']
164    appends = runtask['appends']
165    layername = runtask['layername']
166    taskdepdata = runtask['taskdepdata']
167    quieterrors = runtask['quieterrors']
168    # We need to setup the environment BEFORE the fork, since
169    # a fork() or exec*() activates PSEUDO...
170
171    envbackup = {}
172    fakeroot = False
173    fakeenv = {}
174    umask = None
175
176    uid = os.getuid()
177    gid = os.getgid()
178
179    taskdep = runtask['taskdep']
180    if 'umask' in taskdep and taskname in taskdep['umask']:
181        umask = taskdep['umask'][taskname]
182    elif workerdata["umask"]:
183        umask = workerdata["umask"]
184    if umask:
185        # umask might come in as a number or text string..
186        try:
187             umask = int(umask, 8)
188        except TypeError:
189             pass
190
191    dry_run = cfg.dry_run or runtask['dry_run']
192
193    # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
194    if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
195        fakeroot = True
196        envvars = (runtask['fakerootenv'] or "").split()
197        for key, value in (var.split('=',1) for var in envvars):
198            envbackup[key] = os.environ.get(key)
199            os.environ[key] = value
200            fakeenv[key] = value
201
202        fakedirs = (runtask['fakerootdirs'] or "").split()
203        for p in fakedirs:
204            bb.utils.mkdirhier(p)
205        logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' %
206                        (fn, taskname, ', '.join(fakedirs)))
207    else:
208        envvars = (runtask['fakerootnoenv'] or "").split()
209        for key, value in (var.split('=',1) for var in envvars):
210            envbackup[key] = os.environ.get(key)
211            os.environ[key] = value
212            fakeenv[key] = value
213
214    sys.stdout.flush()
215    sys.stderr.flush()
216
217    try:
218        pipein, pipeout = os.pipe()
219        pipein = os.fdopen(pipein, 'rb', 4096)
220        pipeout = os.fdopen(pipeout, 'wb', 0)
221        pid = os.fork()
222    except OSError as e:
223        logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
224        sys.exit(1)
225
226    if pid == 0:
227        def child():
228            global worker_pipe
229            global worker_pipe_lock
230            pipein.close()
231
232            bb.utils.signal_on_parent_exit("SIGTERM")
233
234            # Save out the PID so that the event can include it the
235            # events
236            bb.event.worker_pid = os.getpid()
237            bb.event.worker_fire = worker_child_fire
238            worker_pipe = pipeout
239            worker_pipe_lock = Lock()
240
241            # Make the child the process group leader and ensure no
242            # child process will be controlled by the current terminal
243            # This ensures signals sent to the controlling terminal like Ctrl+C
244            # don't stop the child processes.
245            os.setsid()
246
247            signal.signal(signal.SIGTERM, sigterm_handler)
248            # Let SIGHUP exit as SIGTERM
249            signal.signal(signal.SIGHUP, sigterm_handler)
250
251            # No stdin & stdout
252            # stdout is used as a status report channel and must not be used by child processes.
253            dumbio = os.open(os.devnull, os.O_RDWR)
254            os.dup2(dumbio, sys.stdin.fileno())
255            os.dup2(dumbio, sys.stdout.fileno())
256
257            if umask is not None:
258                os.umask(umask)
259
260            try:
261                (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
262                the_data = databuilder.mcdata[mc]
263                the_data.setVar("BB_WORKERCONTEXT", "1")
264                the_data.setVar("BB_TASKDEPDATA", taskdepdata)
265                the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", ""))
266                if cfg.limited_deps:
267                    the_data.setVar("BB_LIMITEDDEPS", "1")
268                the_data.setVar("BUILDNAME", workerdata["buildname"])
269                the_data.setVar("DATE", workerdata["date"])
270                the_data.setVar("TIME", workerdata["time"])
271                for varname, value in extraconfigdata.items():
272                    the_data.setVar(varname, value)
273
274                bb.parse.siggen.set_taskdata(workerdata["sigdata"])
275                if "newhashes" in workerdata:
276                    bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
277                ret = 0
278
279                the_data = databuilder.parseRecipe(fn, appends, layername)
280                the_data.setVar('BB_TASKHASH', taskhash)
281                the_data.setVar('BB_UNIHASH', unihash)
282                bb.parse.siggen.setup_datacache_from_datastore(fn, the_data)
283
284                bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
285
286                if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')):
287                    if bb.utils.is_local_uid(uid):
288                        logger.debug("Attempting to disable network for %s" % taskname)
289                        bb.utils.disable_network(uid, gid)
290                    else:
291                        logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid))
292
293                # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
294                # successfully. We also need to unset anything from the environment which shouldn't be there
295                exports = bb.data.exported_vars(the_data)
296
297                bb.utils.empty_environment()
298                for e, v in exports:
299                    os.environ[e] = v
300
301                for e in fakeenv:
302                    os.environ[e] = fakeenv[e]
303                    the_data.setVar(e, fakeenv[e])
304                    the_data.setVarFlag(e, 'export', "1")
305
306                task_exports = the_data.getVarFlag(taskname, 'exports')
307                if task_exports:
308                    for e in task_exports.split():
309                        the_data.setVarFlag(e, 'export', '1')
310                        v = the_data.getVar(e)
311                        if v is not None:
312                            os.environ[e] = v
313
314                if quieterrors:
315                    the_data.setVarFlag(taskname, "quieterrors", "1")
316
317            except Exception:
318                if not quieterrors:
319                    logger.critical(traceback.format_exc())
320                os._exit(1)
321
322            sys.stdout.flush()
323            sys.stderr.flush()
324
325            try:
326                if dry_run:
327                    return 0
328                try:
329                    ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
330                finally:
331                    if fakeroot:
332                        fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD"))
333                        subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE)
334                return ret
335            except:
336                os._exit(1)
337        if not profiling:
338            os._exit(child())
339        else:
340            profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
341            prof = profile.Profile()
342            try:
343                ret = profile.Profile.runcall(prof, child)
344            finally:
345                prof.dump_stats(profname)
346                bb.utils.process_profilelog(profname)
347                os._exit(ret)
348    else:
349        for key, value in iter(envbackup.items()):
350            if value is None:
351                del os.environ[key]
352            else:
353                os.environ[key] = value
354
355    return pid, pipein, pipeout
356
357class runQueueWorkerPipe():
358    """
359    Abstraction for a pipe between a worker thread and the worker server
360    """
361    def __init__(self, pipein, pipeout):
362        self.input = pipein
363        if pipeout:
364            pipeout.close()
365        bb.utils.nonblockingfd(self.input)
366        self.queue = bytearray()
367
368    def read(self):
369        start = len(self.queue)
370        try:
371            self.queue.extend(self.input.read(512*1024) or b"")
372        except (OSError, IOError) as e:
373            if e.errno != errno.EAGAIN:
374                raise
375
376        end = len(self.queue)
377        index = self.queue.find(b"</event>")
378        while index != -1:
379            msg = self.queue[:index+8]
380            assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1
381            worker_fire_prepickled(msg)
382            self.queue = self.queue[index+8:]
383            index = self.queue.find(b"</event>")
384        return (end > start)
385
386    def close(self):
387        while self.read():
388            continue
389        if len(self.queue) > 0:
390            print("Warning, worker child left partial message: %s" % self.queue)
391        self.input.close()
392
393normalexit = False
394
395class BitbakeWorker(object):
396    def __init__(self, din):
397        self.input = din
398        bb.utils.nonblockingfd(self.input)
399        self.queue = bytearray()
400        self.cookercfg = None
401        self.databuilder = None
402        self.data = None
403        self.extraconfigdata = None
404        self.build_pids = {}
405        self.build_pipes = {}
406
407        signal.signal(signal.SIGTERM, self.sigterm_exception)
408        # Let SIGHUP exit as SIGTERM
409        signal.signal(signal.SIGHUP, self.sigterm_exception)
410        if "beef" in sys.argv[1]:
411            bb.utils.set_process_name("Worker (Fakeroot)")
412        else:
413            bb.utils.set_process_name("Worker")
414
415    def sigterm_exception(self, signum, stackframe):
416        if signum == signal.SIGTERM:
417            bb.warn("Worker received SIGTERM, shutting down...")
418        elif signum == signal.SIGHUP:
419            bb.warn("Worker received SIGHUP, shutting down...")
420        self.handle_finishnow(None)
421        signal.signal(signal.SIGTERM, signal.SIG_DFL)
422        os.kill(os.getpid(), signal.SIGTERM)
423
424    def serve(self):
425        while True:
426            (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
427            if self.input in ready:
428                try:
429                    r = self.input.read()
430                    if len(r) == 0:
431                        # EOF on pipe, server must have terminated
432                        self.sigterm_exception(signal.SIGTERM, None)
433                    self.queue.extend(r)
434                except (OSError, IOError):
435                    pass
436            if len(self.queue):
437                self.handle_item(b"cookerconfig", self.handle_cookercfg)
438                self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
439                self.handle_item(b"workerdata", self.handle_workerdata)
440                self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
441                self.handle_item(b"runtask", self.handle_runtask)
442                self.handle_item(b"finishnow", self.handle_finishnow)
443                self.handle_item(b"ping", self.handle_ping)
444                self.handle_item(b"quit", self.handle_quit)
445
446            for pipe in self.build_pipes:
447                if self.build_pipes[pipe].input in ready:
448                    self.build_pipes[pipe].read()
449            if len(self.build_pids):
450                while self.process_waitpid():
451                    continue
452
453    def handle_item(self, item, func):
454        opening_tag = b"<" + item + b">"
455        if not self.queue.startswith(opening_tag):
456            return
457
458        tag_len = len(opening_tag)
459        if len(self.queue) < tag_len + 4:
460            # we need to receive more data
461            return
462        header = self.queue[tag_len:tag_len + 4]
463        payload_len = int.from_bytes(header, 'big')
464        # closing tag has length (tag_len + 1)
465        if len(self.queue) < tag_len * 2 + 1 + payload_len:
466            # we need to receive more data
467            return
468
469        index = self.queue.find(b"</" + item + b">")
470        if index != -1:
471            try:
472                func(self.queue[(tag_len + 4):index])
473            except pickle.UnpicklingError:
474                workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
475                raise
476            self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
477
478    def handle_cookercfg(self, data):
479        self.cookercfg = pickle.loads(data)
480        self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
481        self.databuilder.parseBaseConfiguration(worker=True)
482        self.data = self.databuilder.data
483
484    def handle_extraconfigdata(self, data):
485        self.extraconfigdata = pickle.loads(data)
486
487    def handle_workerdata(self, data):
488        self.workerdata = pickle.loads(data)
489        bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"]
490        bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"]
491        bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
492        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
493        for mc in self.databuilder.mcdata:
494            self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
495            self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
496            self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe")
497
498    def handle_newtaskhashes(self, data):
499        self.workerdata["newhashes"] = pickle.loads(data)
500
501    def handle_ping(self, _):
502        workerlog_write("Handling ping\n")
503
504        logger.warning("Pong from bitbake-worker!")
505
506    def handle_quit(self, data):
507        workerlog_write("Handling quit\n")
508
509        global normalexit
510        normalexit = True
511        sys.exit(0)
512
513    def handle_runtask(self, data):
514        runtask = pickle.loads(data)
515
516        fn = runtask['fn']
517        task = runtask['task']
518        taskname = runtask['taskname']
519
520        workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
521
522        pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask)
523        self.build_pids[pid] = task
524        self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
525
526    def process_waitpid(self):
527        """
528        Return none is there are no processes awaiting result collection, otherwise
529        collect the process exit codes and close the information pipe.
530        """
531        try:
532            pid, status = os.waitpid(-1, os.WNOHANG)
533            if pid == 0 or os.WIFSTOPPED(status):
534                return False
535        except OSError:
536            return False
537
538        workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
539
540        if os.WIFEXITED(status):
541            status = os.WEXITSTATUS(status)
542        elif os.WIFSIGNALED(status):
543            # Per shell conventions for $?, when a process exits due to
544            # a signal, we return an exit code of 128 + SIGNUM
545            status = 128 + os.WTERMSIG(status)
546
547        task = self.build_pids[pid]
548        del self.build_pids[pid]
549
550        self.build_pipes[pid].close()
551        del self.build_pipes[pid]
552
553        worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
554
555        return True
556
557    def handle_finishnow(self, _):
558        if self.build_pids:
559            logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
560            for k, v in iter(self.build_pids.items()):
561                try:
562                    os.kill(-k, signal.SIGTERM)
563                    os.waitpid(-1, 0)
564                except:
565                    pass
566        for pipe in self.build_pipes:
567            self.build_pipes[pipe].read()
568
569try:
570    worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
571    if not profiling:
572        worker.serve()
573    else:
574        profname = "profile-worker.log"
575        prof = profile.Profile()
576        try:
577            profile.Profile.runcall(prof, worker.serve)
578        finally:
579            prof.dump_stats(profname)
580            bb.utils.process_profilelog(profname)
581except BaseException as e:
582    if not normalexit:
583        import traceback
584        sys.stderr.write(traceback.format_exc())
585        sys.stderr.write(str(e))
586finally:
587    worker_thread_exit = True
588    worker_thread.join()
589
590workerlog_write("exiting")
591if not normalexit:
592    sys.exit(1)
593sys.exit(0)
594