1#!/usr/bin/env python3
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
6import os
7import sys
8import warnings
9warnings.simplefilter("default")
10sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
11from bb import fetch2
12import logging
13import bb
14import select
15import errno
16import signal
17import pickle
18import traceback
19import queue
20import shlex
21import subprocess
22from multiprocessing import Lock
23from threading import Thread
24
25if sys.getfilesystemencoding() != "utf-8":
26    sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.")
27
28# Users shouldn't be running this code directly
29if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
30    print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
31    sys.exit(1)
32
33profiling = False
34if sys.argv[1].startswith("decafbadbad"):
35    profiling = True
36    try:
37        import cProfile as profile
38    except:
39        import profile
40
41# Unbuffer stdout to avoid log truncation in the event
42# of an unorderly exit as well as to provide timely
43# updates to log files for use with tail
44try:
45    if sys.stdout.name == '<stdout>':
46        import fcntl
47        fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
48        fl |= os.O_SYNC
49        fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
50        #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
51except:
52    pass
53
54logger = logging.getLogger("BitBake")
55
56worker_pipe = sys.stdout.fileno()
57bb.utils.nonblockingfd(worker_pipe)
58# Need to guard against multiprocessing being used in child processes
59# and multiple processes trying to write to the parent at the same time
60worker_pipe_lock = None
61
62handler = bb.event.LogHandler()
63logger.addHandler(handler)
64
65if 0:
66    # Code to write out a log file of all events passing through the worker
67    logfilename = "/tmp/workerlogfile"
68    format_str = "%(levelname)s: %(message)s"
69    conlogformat = bb.msg.BBLogFormatter(format_str)
70    consolelog = logging.FileHandler(logfilename)
71    consolelog.setFormatter(conlogformat)
72    logger.addHandler(consolelog)
73
74worker_queue = queue.Queue()
75
76def worker_fire(event, d):
77    data = b"<event>" + pickle.dumps(event) + b"</event>"
78    worker_fire_prepickled(data)
79
80def worker_fire_prepickled(event):
81    global worker_queue
82
83    worker_queue.put(event)
84
85#
86# We can end up with write contention with the cooker, it can be trying to send commands
87# and we can be trying to send event data back. Therefore use a separate thread for writing
88# back data to cooker.
89#
90worker_thread_exit = False
91
92def worker_flush(worker_queue):
93    worker_queue_int = b""
94    global worker_pipe, worker_thread_exit
95
96    while True:
97        try:
98            worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
99        except queue.Empty:
100            pass
101        while (worker_queue_int or not worker_queue.empty()):
102            try:
103                (_, ready, _) = select.select([], [worker_pipe], [], 1)
104                if not worker_queue.empty():
105                    worker_queue_int = worker_queue_int + worker_queue.get()
106                written = os.write(worker_pipe, worker_queue_int)
107                worker_queue_int = worker_queue_int[written:]
108            except (IOError, OSError) as e:
109                if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
110                    raise
111        if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
112            return
113
114worker_thread = Thread(target=worker_flush, args=(worker_queue,))
115worker_thread.start()
116
117def worker_child_fire(event, d):
118    global worker_pipe
119    global worker_pipe_lock
120
121    data = b"<event>" + pickle.dumps(event) + b"</event>"
122    try:
123        worker_pipe_lock.acquire()
124        while(len(data)):
125            written = worker_pipe.write(data)
126            data = data[written:]
127        worker_pipe_lock.release()
128    except IOError:
129        sigterm_handler(None, None)
130        raise
131
132bb.event.worker_fire = worker_fire
133
134lf = None
135#lf = open("/tmp/workercommandlog", "w+")
136def workerlog_write(msg):
137    if lf:
138        lf.write(msg)
139        lf.flush()
140
141def sigterm_handler(signum, frame):
142    signal.signal(signal.SIGTERM, signal.SIG_DFL)
143    os.killpg(0, signal.SIGTERM)
144    sys.exit()
145
146def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False):
147    # We need to setup the environment BEFORE the fork, since
148    # a fork() or exec*() activates PSEUDO...
149
150    envbackup = {}
151    fakeroot = False
152    fakeenv = {}
153    umask = None
154
155    uid = os.getuid()
156    gid = os.getgid()
157
158
159    taskdep = workerdata["taskdeps"][fn]
160    if 'umask' in taskdep and taskname in taskdep['umask']:
161        umask = taskdep['umask'][taskname]
162    elif workerdata["umask"]:
163        umask = workerdata["umask"]
164    if umask:
165        # umask might come in as a number or text string..
166        try:
167             umask = int(umask, 8)
168        except TypeError:
169             pass
170
171    dry_run = cfg.dry_run or dry_run_exec
172
173    # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
174    if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
175        fakeroot = True
176        envvars = (workerdata["fakerootenv"][fn] or "").split()
177        for key, value in (var.split('=') for var in envvars):
178            envbackup[key] = os.environ.get(key)
179            os.environ[key] = value
180            fakeenv[key] = value
181
182        fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
183        for p in fakedirs:
184            bb.utils.mkdirhier(p)
185        logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' %
186                        (fn, taskname, ', '.join(fakedirs)))
187    else:
188        envvars = (workerdata["fakerootnoenv"][fn] or "").split()
189        for key, value in (var.split('=') for var in envvars):
190            envbackup[key] = os.environ.get(key)
191            os.environ[key] = value
192            fakeenv[key] = value
193
194    sys.stdout.flush()
195    sys.stderr.flush()
196
197    try:
198        pipein, pipeout = os.pipe()
199        pipein = os.fdopen(pipein, 'rb', 4096)
200        pipeout = os.fdopen(pipeout, 'wb', 0)
201        pid = os.fork()
202    except OSError as e:
203        logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
204        sys.exit(1)
205
206    if pid == 0:
207        def child():
208            global worker_pipe
209            global worker_pipe_lock
210            pipein.close()
211
212            bb.utils.signal_on_parent_exit("SIGTERM")
213
214            # Save out the PID so that the event can include it the
215            # events
216            bb.event.worker_pid = os.getpid()
217            bb.event.worker_fire = worker_child_fire
218            worker_pipe = pipeout
219            worker_pipe_lock = Lock()
220
221            # Make the child the process group leader and ensure no
222            # child process will be controlled by the current terminal
223            # This ensures signals sent to the controlling terminal like Ctrl+C
224            # don't stop the child processes.
225            os.setsid()
226
227            signal.signal(signal.SIGTERM, sigterm_handler)
228            # Let SIGHUP exit as SIGTERM
229            signal.signal(signal.SIGHUP, sigterm_handler)
230
231            # No stdin
232            newsi = os.open(os.devnull, os.O_RDWR)
233            os.dup2(newsi, sys.stdin.fileno())
234
235            if umask:
236                os.umask(umask)
237
238            try:
239                bb_cache = bb.cache.NoCache(databuilder)
240                (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
241                the_data = databuilder.mcdata[mc]
242                the_data.setVar("BB_WORKERCONTEXT", "1")
243                the_data.setVar("BB_TASKDEPDATA", taskdepdata)
244                the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", ""))
245                if cfg.limited_deps:
246                    the_data.setVar("BB_LIMITEDDEPS", "1")
247                the_data.setVar("BUILDNAME", workerdata["buildname"])
248                the_data.setVar("DATE", workerdata["date"])
249                the_data.setVar("TIME", workerdata["time"])
250                for varname, value in extraconfigdata.items():
251                    the_data.setVar(varname, value)
252
253                bb.parse.siggen.set_taskdata(workerdata["sigdata"])
254                if "newhashes" in workerdata:
255                    bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
256                ret = 0
257
258                the_data = bb_cache.loadDataFull(fn, appends)
259                the_data.setVar('BB_TASKHASH', taskhash)
260                the_data.setVar('BB_UNIHASH', unihash)
261
262                bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
263
264                if not the_data.getVarFlag(taskname, 'network', False):
265                    if bb.utils.is_local_uid(uid):
266                        logger.debug("Attempting to disable network for %s" % taskname)
267                        bb.utils.disable_network(uid, gid)
268                    else:
269                        logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid))
270
271                # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
272                # successfully. We also need to unset anything from the environment which shouldn't be there
273                exports = bb.data.exported_vars(the_data)
274
275                bb.utils.empty_environment()
276                for e, v in exports:
277                    os.environ[e] = v
278
279                for e in fakeenv:
280                    os.environ[e] = fakeenv[e]
281                    the_data.setVar(e, fakeenv[e])
282                    the_data.setVarFlag(e, 'export', "1")
283
284                task_exports = the_data.getVarFlag(taskname, 'exports')
285                if task_exports:
286                    for e in task_exports.split():
287                        the_data.setVarFlag(e, 'export', '1')
288                        v = the_data.getVar(e)
289                        if v is not None:
290                            os.environ[e] = v
291
292                if quieterrors:
293                    the_data.setVarFlag(taskname, "quieterrors", "1")
294
295            except Exception:
296                if not quieterrors:
297                    logger.critical(traceback.format_exc())
298                os._exit(1)
299            try:
300                if dry_run:
301                    return 0
302                try:
303                    ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
304                finally:
305                    if fakeroot:
306                        fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD"))
307                        subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE)
308                return ret
309            except:
310                os._exit(1)
311        if not profiling:
312            os._exit(child())
313        else:
314            profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
315            prof = profile.Profile()
316            try:
317                ret = profile.Profile.runcall(prof, child)
318            finally:
319                prof.dump_stats(profname)
320                bb.utils.process_profilelog(profname)
321                os._exit(ret)
322    else:
323        for key, value in iter(envbackup.items()):
324            if value is None:
325                del os.environ[key]
326            else:
327                os.environ[key] = value
328
329    return pid, pipein, pipeout
330
331class runQueueWorkerPipe():
332    """
333    Abstraction for a pipe between a worker thread and the worker server
334    """
335    def __init__(self, pipein, pipeout):
336        self.input = pipein
337        if pipeout:
338            pipeout.close()
339        bb.utils.nonblockingfd(self.input)
340        self.queue = b""
341
342    def read(self):
343        start = len(self.queue)
344        try:
345            self.queue = self.queue + (self.input.read(102400) or b"")
346        except (OSError, IOError) as e:
347            if e.errno != errno.EAGAIN:
348                raise
349
350        end = len(self.queue)
351        index = self.queue.find(b"</event>")
352        while index != -1:
353            msg = self.queue[:index+8]
354            assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1
355            worker_fire_prepickled(msg)
356            self.queue = self.queue[index+8:]
357            index = self.queue.find(b"</event>")
358        return (end > start)
359
360    def close(self):
361        while self.read():
362            continue
363        if len(self.queue) > 0:
364            print("Warning, worker child left partial message: %s" % self.queue)
365        self.input.close()
366
367normalexit = False
368
369class BitbakeWorker(object):
370    def __init__(self, din):
371        self.input = din
372        bb.utils.nonblockingfd(self.input)
373        self.queue = b""
374        self.cookercfg = None
375        self.databuilder = None
376        self.data = None
377        self.extraconfigdata = None
378        self.build_pids = {}
379        self.build_pipes = {}
380
381        signal.signal(signal.SIGTERM, self.sigterm_exception)
382        # Let SIGHUP exit as SIGTERM
383        signal.signal(signal.SIGHUP, self.sigterm_exception)
384        if "beef" in sys.argv[1]:
385            bb.utils.set_process_name("Worker (Fakeroot)")
386        else:
387            bb.utils.set_process_name("Worker")
388
389    def sigterm_exception(self, signum, stackframe):
390        if signum == signal.SIGTERM:
391            bb.warn("Worker received SIGTERM, shutting down...")
392        elif signum == signal.SIGHUP:
393            bb.warn("Worker received SIGHUP, shutting down...")
394        self.handle_finishnow(None)
395        signal.signal(signal.SIGTERM, signal.SIG_DFL)
396        os.kill(os.getpid(), signal.SIGTERM)
397
398    def serve(self):
399        while True:
400            (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
401            if self.input in ready:
402                try:
403                    r = self.input.read()
404                    if len(r) == 0:
405                        # EOF on pipe, server must have terminated
406                        self.sigterm_exception(signal.SIGTERM, None)
407                    self.queue = self.queue + r
408                except (OSError, IOError):
409                    pass
410            if len(self.queue):
411                self.handle_item(b"cookerconfig", self.handle_cookercfg)
412                self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
413                self.handle_item(b"workerdata", self.handle_workerdata)
414                self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
415                self.handle_item(b"runtask", self.handle_runtask)
416                self.handle_item(b"finishnow", self.handle_finishnow)
417                self.handle_item(b"ping", self.handle_ping)
418                self.handle_item(b"quit", self.handle_quit)
419
420            for pipe in self.build_pipes:
421                if self.build_pipes[pipe].input in ready:
422                    self.build_pipes[pipe].read()
423            if len(self.build_pids):
424                while self.process_waitpid():
425                    continue
426
427
428    def handle_item(self, item, func):
429        if self.queue.startswith(b"<" + item + b">"):
430            index = self.queue.find(b"</" + item + b">")
431            while index != -1:
432                try:
433                    func(self.queue[(len(item) + 2):index])
434                except pickle.UnpicklingError:
435                    workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
436                    raise
437                self.queue = self.queue[(index + len(item) + 3):]
438                index = self.queue.find(b"</" + item + b">")
439
440    def handle_cookercfg(self, data):
441        self.cookercfg = pickle.loads(data)
442        self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
443        self.databuilder.parseBaseConfiguration(worker=True)
444        self.data = self.databuilder.data
445
446    def handle_extraconfigdata(self, data):
447        self.extraconfigdata = pickle.loads(data)
448
449    def handle_workerdata(self, data):
450        self.workerdata = pickle.loads(data)
451        bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"]
452        bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"]
453        bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
454        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
455        for mc in self.databuilder.mcdata:
456            self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
457            self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
458
459    def handle_newtaskhashes(self, data):
460        self.workerdata["newhashes"] = pickle.loads(data)
461
462    def handle_ping(self, _):
463        workerlog_write("Handling ping\n")
464
465        logger.warning("Pong from bitbake-worker!")
466
467    def handle_quit(self, data):
468        workerlog_write("Handling quit\n")
469
470        global normalexit
471        normalexit = True
472        sys.exit(0)
473
474    def handle_runtask(self, data):
475        fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data)
476        workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
477
478        pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)
479
480        self.build_pids[pid] = task
481        self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
482
483    def process_waitpid(self):
484        """
485        Return none is there are no processes awaiting result collection, otherwise
486        collect the process exit codes and close the information pipe.
487        """
488        try:
489            pid, status = os.waitpid(-1, os.WNOHANG)
490            if pid == 0 or os.WIFSTOPPED(status):
491                return False
492        except OSError:
493            return False
494
495        workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
496
497        if os.WIFEXITED(status):
498            status = os.WEXITSTATUS(status)
499        elif os.WIFSIGNALED(status):
500            # Per shell conventions for $?, when a process exits due to
501            # a signal, we return an exit code of 128 + SIGNUM
502            status = 128 + os.WTERMSIG(status)
503
504        task = self.build_pids[pid]
505        del self.build_pids[pid]
506
507        self.build_pipes[pid].close()
508        del self.build_pipes[pid]
509
510        worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
511
512        return True
513
514    def handle_finishnow(self, _):
515        if self.build_pids:
516            logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
517            for k, v in iter(self.build_pids.items()):
518                try:
519                    os.kill(-k, signal.SIGTERM)
520                    os.waitpid(-1, 0)
521                except:
522                    pass
523        for pipe in self.build_pipes:
524            self.build_pipes[pipe].read()
525
526try:
527    worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
528    if not profiling:
529        worker.serve()
530    else:
531        profname = "profile-worker.log"
532        prof = profile.Profile()
533        try:
534            profile.Profile.runcall(prof, worker.serve)
535        finally:
536            prof.dump_stats(profname)
537            bb.utils.process_profilelog(profname)
538except BaseException as e:
539    if not normalexit:
540        import traceback
541        sys.stderr.write(traceback.format_exc())
542        sys.stderr.write(str(e))
543finally:
544    worker_thread_exit = True
545    worker_thread.join()
546
547workerlog_write("exiting")
548if not normalexit:
549    sys.exit(1)
550sys.exit(0)
551