xref: /openbmc/openbmc/poky/bitbake/bin/bitbake-worker (revision 8460358c3d24c71d9d38fd126c745854a6301564)
1#!/usr/bin/env python3
2#
3# Copyright BitBake Contributors
4#
5# SPDX-License-Identifier: GPL-2.0-only
6#
7
8import os
9import sys
10import warnings
11warnings.simplefilter("default")
12sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
13from bb import fetch2
14import logging
15import bb
16import select
17import errno
18import signal
19import pickle
20import traceback
21import queue
22import shlex
23import subprocess
24import fcntl
25from multiprocessing import Lock
26from threading import Thread
27
28# Remove when we have a minimum of python 3.10
29if not hasattr(fcntl, 'F_SETPIPE_SZ'):
30    fcntl.F_SETPIPE_SZ = 1031
31
32bb.utils.check_system_locale()
33
34# Users shouldn't be running this code directly
35if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
36    print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
37    sys.exit(1)
38
39profiling = False
40if sys.argv[1].startswith("decafbadbad"):
41    profiling = True
42    try:
43        import cProfile as profile
44    except:
45        import profile
46
47# Unbuffer stdout to avoid log truncation in the event
48# of an unorderly exit as well as to provide timely
49# updates to log files for use with tail
50try:
51    if sys.stdout.name == '<stdout>':
52        fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
53        fl |= os.O_SYNC
54        fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
55        #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
56except:
57    pass
58
59logger = logging.getLogger("BitBake")
60
61worker_pipe = sys.stdout.fileno()
62bb.utils.nonblockingfd(worker_pipe)
63# Try to make the pipe buffers larger as it is much more efficient. If we can't
64# e.g. out of buffer space (/proc/sys/fs/pipe-user-pages-soft) then just pass over.
65try:
66    fcntl.fcntl(worker_pipe, fcntl.F_SETPIPE_SZ, 512 * 1024)
67except:
68    pass
69# Need to guard against multiprocessing being used in child processes
70# and multiple processes trying to write to the parent at the same time
71worker_pipe_lock = None
72
73handler = bb.event.LogHandler()
74logger.addHandler(handler)
75
76if 0:
77    # Code to write out a log file of all events passing through the worker
78    logfilename = "/tmp/workerlogfile"
79    format_str = "%(levelname)s: %(message)s"
80    conlogformat = bb.msg.BBLogFormatter(format_str)
81    consolelog = logging.FileHandler(logfilename)
82    consolelog.setFormatter(conlogformat)
83    logger.addHandler(consolelog)
84
85worker_queue = queue.Queue()
86
87def worker_fire(event, d):
88    data = b"<event>" + pickle.dumps(event) + b"</event>"
89    worker_fire_prepickled(data)
90
91def worker_fire_prepickled(event):
92    global worker_queue
93
94    worker_queue.put(event)
95
96#
97# We can end up with write contention with the cooker, it can be trying to send commands
98# and we can be trying to send event data back. Therefore use a separate thread for writing
99# back data to cooker.
100#
101worker_thread_exit = False
102
103def worker_flush(worker_queue):
104    worker_queue_int = bytearray()
105    global worker_pipe, worker_thread_exit
106
107    while True:
108        try:
109            worker_queue_int.extend(worker_queue.get(True, 1))
110        except queue.Empty:
111            pass
112        while (worker_queue_int or not worker_queue.empty()):
113            try:
114                (_, ready, _) = select.select([], [worker_pipe], [], 1)
115                if not worker_queue.empty():
116                    worker_queue_int.extend(worker_queue.get())
117                written = os.write(worker_pipe, worker_queue_int)
118                del worker_queue_int[0:written]
119            except (IOError, OSError) as e:
120                if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
121                    raise
122        if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
123            return
124
125worker_thread = Thread(target=worker_flush, args=(worker_queue,))
126worker_thread.start()
127
128def worker_child_fire(event, d):
129    global worker_pipe
130    global worker_pipe_lock
131
132    data = b"<event>" + pickle.dumps(event) + b"</event>"
133    try:
134        with bb.utils.lock_timeout(worker_pipe_lock):
135            while(len(data)):
136                written = worker_pipe.write(data)
137                data = data[written:]
138    except IOError:
139        sigterm_handler(None, None)
140        raise
141
142bb.event.worker_fire = worker_fire
143
144lf = None
145#lf = open("/tmp/workercommandlog", "w+")
146def workerlog_write(msg):
147    if lf:
148        lf.write(msg)
149        lf.flush()
150
151def sigterm_handler(signum, frame):
152    signal.signal(signal.SIGTERM, signal.SIG_DFL)
153    os.killpg(0, signal.SIGTERM)
154    sys.exit()
155
156def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask):
157
158    fn = runtask['fn']
159    task = runtask['task']
160    taskname = runtask['taskname']
161    taskhash = runtask['taskhash']
162    unihash = runtask['unihash']
163    appends = runtask['appends']
164    layername = runtask['layername']
165    taskdepdata = runtask['taskdepdata']
166    quieterrors = runtask['quieterrors']
167    # We need to setup the environment BEFORE the fork, since
168    # a fork() or exec*() activates PSEUDO...
169
170    envbackup = {}
171    fakeroot = False
172    fakeenv = {}
173    umask = None
174
175    uid = os.getuid()
176    gid = os.getgid()
177
178    taskdep = runtask['taskdep']
179    if 'umask' in taskdep and taskname in taskdep['umask']:
180        umask = taskdep['umask'][taskname]
181    elif workerdata["umask"]:
182        umask = workerdata["umask"]
183    if umask:
184        # umask might come in as a number or text string..
185        try:
186             umask = int(umask, 8)
187        except TypeError:
188             pass
189
190    dry_run = cfg.dry_run or runtask['dry_run']
191
192    # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
193    if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
194        fakeroot = True
195        envvars = (runtask['fakerootenv'] or "").split()
196        for key, value in (var.split('=',1) for var in envvars):
197            envbackup[key] = os.environ.get(key)
198            os.environ[key] = value
199            fakeenv[key] = value
200
201        fakedirs = (runtask['fakerootdirs'] or "").split()
202        for p in fakedirs:
203            bb.utils.mkdirhier(p)
204        logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' %
205                        (fn, taskname, ', '.join(fakedirs)))
206    else:
207        envvars = (runtask['fakerootnoenv'] or "").split()
208        for key, value in (var.split('=',1) for var in envvars):
209            envbackup[key] = os.environ.get(key)
210            os.environ[key] = value
211            fakeenv[key] = value
212
213    sys.stdout.flush()
214    sys.stderr.flush()
215
216    try:
217        pipein, pipeout = os.pipe()
218        pipein = os.fdopen(pipein, 'rb', 4096)
219        pipeout = os.fdopen(pipeout, 'wb', 0)
220        pid = os.fork()
221    except OSError as e:
222        logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
223        sys.exit(1)
224
225    if pid == 0:
226        def child():
227            global worker_pipe
228            global worker_pipe_lock
229            pipein.close()
230
231            bb.utils.signal_on_parent_exit("SIGTERM")
232
233            # Save out the PID so that the event can include it the
234            # events
235            bb.event.worker_pid = os.getpid()
236            bb.event.worker_fire = worker_child_fire
237            worker_pipe = pipeout
238            worker_pipe_lock = Lock()
239
240            # Make the child the process group leader and ensure no
241            # child process will be controlled by the current terminal
242            # This ensures signals sent to the controlling terminal like Ctrl+C
243            # don't stop the child processes.
244            os.setsid()
245
246            signal.signal(signal.SIGTERM, sigterm_handler)
247            # Let SIGHUP exit as SIGTERM
248            signal.signal(signal.SIGHUP, sigterm_handler)
249
250            # No stdin & stdout
251            # stdout is used as a status report channel and must not be used by child processes.
252            dumbio = os.open(os.devnull, os.O_RDWR)
253            os.dup2(dumbio, sys.stdin.fileno())
254            os.dup2(dumbio, sys.stdout.fileno())
255
256            if umask is not None:
257                os.umask(umask)
258
259            try:
260                (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
261                the_data = databuilder.mcdata[mc]
262                the_data.setVar("BB_WORKERCONTEXT", "1")
263                the_data.setVar("BB_TASKDEPDATA", taskdepdata)
264                the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", ""))
265                if cfg.limited_deps:
266                    the_data.setVar("BB_LIMITEDDEPS", "1")
267                the_data.setVar("BUILDNAME", workerdata["buildname"])
268                the_data.setVar("DATE", workerdata["date"])
269                the_data.setVar("TIME", workerdata["time"])
270                for varname, value in extraconfigdata.items():
271                    the_data.setVar(varname, value)
272
273                bb.parse.siggen.set_taskdata(workerdata["sigdata"])
274                if "newhashes" in workerdata:
275                    bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
276                ret = 0
277
278                the_data = databuilder.parseRecipe(fn, appends, layername)
279                the_data.setVar('BB_TASKHASH', taskhash)
280                the_data.setVar('BB_UNIHASH', unihash)
281                bb.parse.siggen.setup_datacache_from_datastore(fn, the_data)
282
283                bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
284
285                if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')):
286                    if bb.utils.is_local_uid(uid):
287                        logger.debug("Attempting to disable network for %s" % taskname)
288                        bb.utils.disable_network(uid, gid)
289                    else:
290                        logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid))
291
292                # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
293                # successfully. We also need to unset anything from the environment which shouldn't be there
294                exports = bb.data.exported_vars(the_data)
295
296                bb.utils.empty_environment()
297                for e, v in exports:
298                    os.environ[e] = v
299
300                for e in fakeenv:
301                    os.environ[e] = fakeenv[e]
302                    the_data.setVar(e, fakeenv[e])
303                    the_data.setVarFlag(e, 'export', "1")
304
305                task_exports = the_data.getVarFlag(taskname, 'exports')
306                if task_exports:
307                    for e in task_exports.split():
308                        the_data.setVarFlag(e, 'export', '1')
309                        v = the_data.getVar(e)
310                        if v is not None:
311                            os.environ[e] = v
312
313                if quieterrors:
314                    the_data.setVarFlag(taskname, "quieterrors", "1")
315
316            except Exception:
317                if not quieterrors:
318                    logger.critical(traceback.format_exc())
319                os._exit(1)
320
321            sys.stdout.flush()
322            sys.stderr.flush()
323
324            try:
325                if dry_run:
326                    return 0
327                try:
328                    ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
329                finally:
330                    if fakeroot:
331                        fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD"))
332                        subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE)
333                return ret
334            except:
335                os._exit(1)
336        if not profiling:
337            os._exit(child())
338        else:
339            profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
340            prof = profile.Profile()
341            try:
342                ret = profile.Profile.runcall(prof, child)
343            finally:
344                prof.dump_stats(profname)
345                bb.utils.process_profilelog(profname)
346                os._exit(ret)
347    else:
348        for key, value in iter(envbackup.items()):
349            if value is None:
350                del os.environ[key]
351            else:
352                os.environ[key] = value
353
354    return pid, pipein, pipeout
355
356class runQueueWorkerPipe():
357    """
358    Abstraction for a pipe between a worker thread and the worker server
359    """
360    def __init__(self, pipein, pipeout):
361        self.input = pipein
362        if pipeout:
363            pipeout.close()
364        bb.utils.nonblockingfd(self.input)
365        self.queue = bytearray()
366
367    def read(self):
368        start = len(self.queue)
369        try:
370            self.queue.extend(self.input.read(512*1024) or b"")
371        except (OSError, IOError) as e:
372            if e.errno != errno.EAGAIN:
373                raise
374
375        end = len(self.queue)
376        index = self.queue.find(b"</event>")
377        while index != -1:
378            msg = self.queue[:index+8]
379            assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1
380            worker_fire_prepickled(msg)
381            self.queue = self.queue[index+8:]
382            index = self.queue.find(b"</event>")
383        return (end > start)
384
385    def close(self):
386        while self.read():
387            continue
388        if len(self.queue) > 0:
389            print("Warning, worker child left partial message: %s" % self.queue)
390        self.input.close()
391
392normalexit = False
393
394class BitbakeWorker(object):
395    def __init__(self, din):
396        self.input = din
397        bb.utils.nonblockingfd(self.input)
398        self.queue = bytearray()
399        self.cookercfg = None
400        self.databuilder = None
401        self.data = None
402        self.extraconfigdata = None
403        self.build_pids = {}
404        self.build_pipes = {}
405
406        signal.signal(signal.SIGTERM, self.sigterm_exception)
407        # Let SIGHUP exit as SIGTERM
408        signal.signal(signal.SIGHUP, self.sigterm_exception)
409        if "beef" in sys.argv[1]:
410            bb.utils.set_process_name("Worker (Fakeroot)")
411        else:
412            bb.utils.set_process_name("Worker")
413
414    def sigterm_exception(self, signum, stackframe):
415        if signum == signal.SIGTERM:
416            bb.warn("Worker received SIGTERM, shutting down...")
417        elif signum == signal.SIGHUP:
418            bb.warn("Worker received SIGHUP, shutting down...")
419        self.handle_finishnow(None)
420        signal.signal(signal.SIGTERM, signal.SIG_DFL)
421        os.kill(os.getpid(), signal.SIGTERM)
422
423    def serve(self):
424        while True:
425            (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
426            if self.input in ready:
427                try:
428                    r = self.input.read()
429                    if len(r) == 0:
430                        # EOF on pipe, server must have terminated
431                        self.sigterm_exception(signal.SIGTERM, None)
432                    self.queue.extend(r)
433                except (OSError, IOError):
434                    pass
435            if len(self.queue):
436                self.handle_item(b"cookerconfig", self.handle_cookercfg)
437                self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
438                self.handle_item(b"workerdata", self.handle_workerdata)
439                self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
440                self.handle_item(b"runtask", self.handle_runtask)
441                self.handle_item(b"finishnow", self.handle_finishnow)
442                self.handle_item(b"ping", self.handle_ping)
443                self.handle_item(b"quit", self.handle_quit)
444
445            for pipe in self.build_pipes:
446                if self.build_pipes[pipe].input in ready:
447                    self.build_pipes[pipe].read()
448            if len(self.build_pids):
449                while self.process_waitpid():
450                    continue
451
452    def handle_item(self, item, func):
453        opening_tag = b"<" + item + b">"
454        if not self.queue.startswith(opening_tag):
455            return
456
457        tag_len = len(opening_tag)
458        if len(self.queue) < tag_len + 4:
459            # we need to receive more data
460            return
461        header = self.queue[tag_len:tag_len + 4]
462        payload_len = int.from_bytes(header, 'big')
463        # closing tag has length (tag_len + 1)
464        if len(self.queue) < tag_len * 2 + 1 + payload_len:
465            # we need to receive more data
466            return
467
468        index = self.queue.find(b"</" + item + b">")
469        if index != -1:
470            try:
471                func(self.queue[(tag_len + 4):index])
472            except pickle.UnpicklingError:
473                workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
474                raise
475            self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
476
477    def handle_cookercfg(self, data):
478        self.cookercfg = pickle.loads(data)
479        self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
480        self.databuilder.parseBaseConfiguration(worker=True)
481        self.data = self.databuilder.data
482
483    def handle_extraconfigdata(self, data):
484        self.extraconfigdata = pickle.loads(data)
485
486    def handle_workerdata(self, data):
487        self.workerdata = pickle.loads(data)
488        bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"]
489        bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"]
490        bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
491        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
492        for mc in self.databuilder.mcdata:
493            self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
494            self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
495            self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe")
496
497    def handle_newtaskhashes(self, data):
498        self.workerdata["newhashes"] = pickle.loads(data)
499
500    def handle_ping(self, _):
501        workerlog_write("Handling ping\n")
502
503        logger.warning("Pong from bitbake-worker!")
504
505    def handle_quit(self, data):
506        workerlog_write("Handling quit\n")
507
508        global normalexit
509        normalexit = True
510        sys.exit(0)
511
512    def handle_runtask(self, data):
513        runtask = pickle.loads(data)
514
515        fn = runtask['fn']
516        task = runtask['task']
517        taskname = runtask['taskname']
518
519        workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
520
521        pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask)
522        self.build_pids[pid] = task
523        self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
524
525    def process_waitpid(self):
526        """
527        Return none is there are no processes awaiting result collection, otherwise
528        collect the process exit codes and close the information pipe.
529        """
530        try:
531            pid, status = os.waitpid(-1, os.WNOHANG)
532            if pid == 0 or os.WIFSTOPPED(status):
533                return False
534        except OSError:
535            return False
536
537        workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
538
539        if os.WIFEXITED(status):
540            status = os.WEXITSTATUS(status)
541        elif os.WIFSIGNALED(status):
542            # Per shell conventions for $?, when a process exits due to
543            # a signal, we return an exit code of 128 + SIGNUM
544            status = 128 + os.WTERMSIG(status)
545
546        task = self.build_pids[pid]
547        del self.build_pids[pid]
548
549        self.build_pipes[pid].close()
550        del self.build_pipes[pid]
551
552        worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
553
554        return True
555
556    def handle_finishnow(self, _):
557        if self.build_pids:
558            logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
559            for k, v in iter(self.build_pids.items()):
560                try:
561                    os.kill(-k, signal.SIGTERM)
562                    os.waitpid(-1, 0)
563                except:
564                    pass
565        for pipe in self.build_pipes:
566            self.build_pipes[pipe].read()
567
568try:
569    worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
570    if not profiling:
571        worker.serve()
572    else:
573        profname = "profile-worker.log"
574        prof = profile.Profile()
575        try:
576            profile.Profile.runcall(prof, worker.serve)
577        finally:
578            prof.dump_stats(profname)
579            bb.utils.process_profilelog(profname)
580except BaseException as e:
581    if not normalexit:
582        import traceback
583        sys.stderr.write(traceback.format_exc())
584        sys.stderr.write(str(e))
585finally:
586    worker_thread_exit = True
587    worker_thread.join()
588
589workerlog_write("exiting")
590if not normalexit:
591    sys.exit(1)
592sys.exit(0)
593