1#!/usr/bin/env python3
2#
3# Copyright BitBake Contributors
4#
5# SPDX-License-Identifier: GPL-2.0-only
6#
7
8import os
9import sys
10import warnings
11warnings.simplefilter("default")
12sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
13from bb import fetch2
14import logging
15import bb
16import select
17import errno
18import signal
19import pickle
20import traceback
21import queue
22import shlex
23import subprocess
24from multiprocessing import Lock
25from threading import Thread
26
27bb.utils.check_system_locale()
28
29# Users shouldn't be running this code directly
30if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
31    print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
32    sys.exit(1)
33
34profiling = False
35if sys.argv[1].startswith("decafbadbad"):
36    profiling = True
37    try:
38        import cProfile as profile
39    except:
40        import profile
41
42# Unbuffer stdout to avoid log truncation in the event
43# of an unorderly exit as well as to provide timely
44# updates to log files for use with tail
45try:
46    if sys.stdout.name == '<stdout>':
47        import fcntl
48        fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
49        fl |= os.O_SYNC
50        fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
51        #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
52except:
53    pass
54
55logger = logging.getLogger("BitBake")
56
57worker_pipe = sys.stdout.fileno()
58bb.utils.nonblockingfd(worker_pipe)
59# Need to guard against multiprocessing being used in child processes
60# and multiple processes trying to write to the parent at the same time
61worker_pipe_lock = None
62
63handler = bb.event.LogHandler()
64logger.addHandler(handler)
65
66if 0:
67    # Code to write out a log file of all events passing through the worker
68    logfilename = "/tmp/workerlogfile"
69    format_str = "%(levelname)s: %(message)s"
70    conlogformat = bb.msg.BBLogFormatter(format_str)
71    consolelog = logging.FileHandler(logfilename)
72    consolelog.setFormatter(conlogformat)
73    logger.addHandler(consolelog)
74
75worker_queue = queue.Queue()
76
77def worker_fire(event, d):
78    data = b"<event>" + pickle.dumps(event) + b"</event>"
79    worker_fire_prepickled(data)
80
81def worker_fire_prepickled(event):
82    global worker_queue
83
84    worker_queue.put(event)
85
86#
87# We can end up with write contention with the cooker, it can be trying to send commands
88# and we can be trying to send event data back. Therefore use a separate thread for writing
89# back data to cooker.
90#
91worker_thread_exit = False
92
93def worker_flush(worker_queue):
94    worker_queue_int = bytearray()
95    global worker_pipe, worker_thread_exit
96
97    while True:
98        try:
99            worker_queue_int.extend(worker_queue.get(True, 1))
100        except queue.Empty:
101            pass
102        while (worker_queue_int or not worker_queue.empty()):
103            try:
104                (_, ready, _) = select.select([], [worker_pipe], [], 1)
105                if not worker_queue.empty():
106                    worker_queue_int.extend(worker_queue.get())
107                written = os.write(worker_pipe, worker_queue_int)
108                worker_queue_int = worker_queue_int[written:]
109            except (IOError, OSError) as e:
110                if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
111                    raise
112        if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
113            return
114
115worker_thread = Thread(target=worker_flush, args=(worker_queue,))
116worker_thread.start()
117
118def worker_child_fire(event, d):
119    global worker_pipe
120    global worker_pipe_lock
121
122    data = b"<event>" + pickle.dumps(event) + b"</event>"
123    try:
124        with bb.utils.lock_timeout(worker_pipe_lock):
125            while(len(data)):
126                written = worker_pipe.write(data)
127                data = data[written:]
128    except IOError:
129        sigterm_handler(None, None)
130        raise
131
132bb.event.worker_fire = worker_fire
133
134lf = None
135#lf = open("/tmp/workercommandlog", "w+")
136def workerlog_write(msg):
137    if lf:
138        lf.write(msg)
139        lf.flush()
140
141def sigterm_handler(signum, frame):
142    signal.signal(signal.SIGTERM, signal.SIG_DFL)
143    os.killpg(0, signal.SIGTERM)
144    sys.exit()
145
146def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask):
147
148    fn = runtask['fn']
149    task = runtask['task']
150    taskname = runtask['taskname']
151    taskhash = runtask['taskhash']
152    unihash = runtask['unihash']
153    appends = runtask['appends']
154    layername = runtask['layername']
155    taskdepdata = runtask['taskdepdata']
156    quieterrors = runtask['quieterrors']
157    # We need to setup the environment BEFORE the fork, since
158    # a fork() or exec*() activates PSEUDO...
159
160    envbackup = {}
161    fakeroot = False
162    fakeenv = {}
163    umask = None
164
165    uid = os.getuid()
166    gid = os.getgid()
167
168    taskdep = runtask['taskdep']
169    if 'umask' in taskdep and taskname in taskdep['umask']:
170        umask = taskdep['umask'][taskname]
171    elif workerdata["umask"]:
172        umask = workerdata["umask"]
173    if umask:
174        # umask might come in as a number or text string..
175        try:
176             umask = int(umask, 8)
177        except TypeError:
178             pass
179
180    dry_run = cfg.dry_run or runtask['dry_run']
181
182    # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
183    if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
184        fakeroot = True
185        envvars = (runtask['fakerootenv'] or "").split()
186        for key, value in (var.split('=',1) for var in envvars):
187            envbackup[key] = os.environ.get(key)
188            os.environ[key] = value
189            fakeenv[key] = value
190
191        fakedirs = (runtask['fakerootdirs'] or "").split()
192        for p in fakedirs:
193            bb.utils.mkdirhier(p)
194        logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' %
195                        (fn, taskname, ', '.join(fakedirs)))
196    else:
197        envvars = (runtask['fakerootnoenv'] or "").split()
198        for key, value in (var.split('=',1) for var in envvars):
199            envbackup[key] = os.environ.get(key)
200            os.environ[key] = value
201            fakeenv[key] = value
202
203    sys.stdout.flush()
204    sys.stderr.flush()
205
206    try:
207        pipein, pipeout = os.pipe()
208        pipein = os.fdopen(pipein, 'rb', 4096)
209        pipeout = os.fdopen(pipeout, 'wb', 0)
210        pid = os.fork()
211    except OSError as e:
212        logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
213        sys.exit(1)
214
215    if pid == 0:
216        def child():
217            global worker_pipe
218            global worker_pipe_lock
219            pipein.close()
220
221            bb.utils.signal_on_parent_exit("SIGTERM")
222
223            # Save out the PID so that the event can include it the
224            # events
225            bb.event.worker_pid = os.getpid()
226            bb.event.worker_fire = worker_child_fire
227            worker_pipe = pipeout
228            worker_pipe_lock = Lock()
229
230            # Make the child the process group leader and ensure no
231            # child process will be controlled by the current terminal
232            # This ensures signals sent to the controlling terminal like Ctrl+C
233            # don't stop the child processes.
234            os.setsid()
235
236            signal.signal(signal.SIGTERM, sigterm_handler)
237            # Let SIGHUP exit as SIGTERM
238            signal.signal(signal.SIGHUP, sigterm_handler)
239
240            # No stdin & stdout
241            # stdout is used as a status report channel and must not be used by child processes.
242            dumbio = os.open(os.devnull, os.O_RDWR)
243            os.dup2(dumbio, sys.stdin.fileno())
244            os.dup2(dumbio, sys.stdout.fileno())
245
246            if umask is not None:
247                os.umask(umask)
248
249            try:
250                (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
251                the_data = databuilder.mcdata[mc]
252                the_data.setVar("BB_WORKERCONTEXT", "1")
253                the_data.setVar("BB_TASKDEPDATA", taskdepdata)
254                the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", ""))
255                if cfg.limited_deps:
256                    the_data.setVar("BB_LIMITEDDEPS", "1")
257                the_data.setVar("BUILDNAME", workerdata["buildname"])
258                the_data.setVar("DATE", workerdata["date"])
259                the_data.setVar("TIME", workerdata["time"])
260                for varname, value in extraconfigdata.items():
261                    the_data.setVar(varname, value)
262
263                bb.parse.siggen.set_taskdata(workerdata["sigdata"])
264                if "newhashes" in workerdata:
265                    bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
266                ret = 0
267
268                the_data = databuilder.parseRecipe(fn, appends, layername)
269                the_data.setVar('BB_TASKHASH', taskhash)
270                the_data.setVar('BB_UNIHASH', unihash)
271                bb.parse.siggen.setup_datacache_from_datastore(fn, the_data)
272
273                bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
274
275                if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')):
276                    if bb.utils.is_local_uid(uid):
277                        logger.debug("Attempting to disable network for %s" % taskname)
278                        bb.utils.disable_network(uid, gid)
279                    else:
280                        logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid))
281
282                # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
283                # successfully. We also need to unset anything from the environment which shouldn't be there
284                exports = bb.data.exported_vars(the_data)
285
286                bb.utils.empty_environment()
287                for e, v in exports:
288                    os.environ[e] = v
289
290                for e in fakeenv:
291                    os.environ[e] = fakeenv[e]
292                    the_data.setVar(e, fakeenv[e])
293                    the_data.setVarFlag(e, 'export', "1")
294
295                task_exports = the_data.getVarFlag(taskname, 'exports')
296                if task_exports:
297                    for e in task_exports.split():
298                        the_data.setVarFlag(e, 'export', '1')
299                        v = the_data.getVar(e)
300                        if v is not None:
301                            os.environ[e] = v
302
303                if quieterrors:
304                    the_data.setVarFlag(taskname, "quieterrors", "1")
305
306            except Exception:
307                if not quieterrors:
308                    logger.critical(traceback.format_exc())
309                os._exit(1)
310
311            sys.stdout.flush()
312            sys.stderr.flush()
313
314            try:
315                if dry_run:
316                    return 0
317                try:
318                    ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
319                finally:
320                    if fakeroot:
321                        fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD"))
322                        subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE)
323                return ret
324            except:
325                os._exit(1)
326        if not profiling:
327            os._exit(child())
328        else:
329            profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
330            prof = profile.Profile()
331            try:
332                ret = profile.Profile.runcall(prof, child)
333            finally:
334                prof.dump_stats(profname)
335                bb.utils.process_profilelog(profname)
336                os._exit(ret)
337    else:
338        for key, value in iter(envbackup.items()):
339            if value is None:
340                del os.environ[key]
341            else:
342                os.environ[key] = value
343
344    return pid, pipein, pipeout
345
346class runQueueWorkerPipe():
347    """
348    Abstraction for a pipe between a worker thread and the worker server
349    """
350    def __init__(self, pipein, pipeout):
351        self.input = pipein
352        if pipeout:
353            pipeout.close()
354        bb.utils.nonblockingfd(self.input)
355        self.queue = bytearray()
356
357    def read(self):
358        start = len(self.queue)
359        try:
360            self.queue.extend(self.input.read(102400) or b"")
361        except (OSError, IOError) as e:
362            if e.errno != errno.EAGAIN:
363                raise
364
365        end = len(self.queue)
366        index = self.queue.find(b"</event>")
367        while index != -1:
368            msg = self.queue[:index+8]
369            assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1
370            worker_fire_prepickled(msg)
371            self.queue = self.queue[index+8:]
372            index = self.queue.find(b"</event>")
373        return (end > start)
374
375    def close(self):
376        while self.read():
377            continue
378        if len(self.queue) > 0:
379            print("Warning, worker child left partial message: %s" % self.queue)
380        self.input.close()
381
382normalexit = False
383
384class BitbakeWorker(object):
385    def __init__(self, din):
386        self.input = din
387        bb.utils.nonblockingfd(self.input)
388        self.queue = bytearray()
389        self.cookercfg = None
390        self.databuilder = None
391        self.data = None
392        self.extraconfigdata = None
393        self.build_pids = {}
394        self.build_pipes = {}
395
396        signal.signal(signal.SIGTERM, self.sigterm_exception)
397        # Let SIGHUP exit as SIGTERM
398        signal.signal(signal.SIGHUP, self.sigterm_exception)
399        if "beef" in sys.argv[1]:
400            bb.utils.set_process_name("Worker (Fakeroot)")
401        else:
402            bb.utils.set_process_name("Worker")
403
404    def sigterm_exception(self, signum, stackframe):
405        if signum == signal.SIGTERM:
406            bb.warn("Worker received SIGTERM, shutting down...")
407        elif signum == signal.SIGHUP:
408            bb.warn("Worker received SIGHUP, shutting down...")
409        self.handle_finishnow(None)
410        signal.signal(signal.SIGTERM, signal.SIG_DFL)
411        os.kill(os.getpid(), signal.SIGTERM)
412
413    def serve(self):
414        while True:
415            (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
416            if self.input in ready:
417                try:
418                    r = self.input.read()
419                    if len(r) == 0:
420                        # EOF on pipe, server must have terminated
421                        self.sigterm_exception(signal.SIGTERM, None)
422                    self.queue.extend(r)
423                except (OSError, IOError):
424                    pass
425            if len(self.queue):
426                self.handle_item(b"cookerconfig", self.handle_cookercfg)
427                self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
428                self.handle_item(b"workerdata", self.handle_workerdata)
429                self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
430                self.handle_item(b"runtask", self.handle_runtask)
431                self.handle_item(b"finishnow", self.handle_finishnow)
432                self.handle_item(b"ping", self.handle_ping)
433                self.handle_item(b"quit", self.handle_quit)
434
435            for pipe in self.build_pipes:
436                if self.build_pipes[pipe].input in ready:
437                    self.build_pipes[pipe].read()
438            if len(self.build_pids):
439                while self.process_waitpid():
440                    continue
441
442    def handle_item(self, item, func):
443        opening_tag = b"<" + item + b">"
444        if not self.queue.startswith(opening_tag):
445            return
446
447        tag_len = len(opening_tag)
448        if len(self.queue) < tag_len + 4:
449            # we need to receive more data
450            return
451        header = self.queue[tag_len:tag_len + 4]
452        payload_len = int.from_bytes(header, 'big')
453        # closing tag has length (tag_len + 1)
454        if len(self.queue) < tag_len * 2 + 1 + payload_len:
455            # we need to receive more data
456            return
457
458        index = self.queue.find(b"</" + item + b">")
459        if index != -1:
460            try:
461                func(self.queue[(tag_len + 4):index])
462            except pickle.UnpicklingError:
463                workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
464                raise
465            self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
466
467    def handle_cookercfg(self, data):
468        self.cookercfg = pickle.loads(data)
469        self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
470        self.databuilder.parseBaseConfiguration(worker=True)
471        self.data = self.databuilder.data
472
473    def handle_extraconfigdata(self, data):
474        self.extraconfigdata = pickle.loads(data)
475
476    def handle_workerdata(self, data):
477        self.workerdata = pickle.loads(data)
478        bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"]
479        bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"]
480        bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
481        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
482        for mc in self.databuilder.mcdata:
483            self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
484            self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
485            self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe")
486
487    def handle_newtaskhashes(self, data):
488        self.workerdata["newhashes"] = pickle.loads(data)
489
490    def handle_ping(self, _):
491        workerlog_write("Handling ping\n")
492
493        logger.warning("Pong from bitbake-worker!")
494
495    def handle_quit(self, data):
496        workerlog_write("Handling quit\n")
497
498        global normalexit
499        normalexit = True
500        sys.exit(0)
501
502    def handle_runtask(self, data):
503        runtask = pickle.loads(data)
504
505        fn = runtask['fn']
506        task = runtask['task']
507        taskname = runtask['taskname']
508
509        workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
510
511        pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask)
512        self.build_pids[pid] = task
513        self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
514
515    def process_waitpid(self):
516        """
517        Return none is there are no processes awaiting result collection, otherwise
518        collect the process exit codes and close the information pipe.
519        """
520        try:
521            pid, status = os.waitpid(-1, os.WNOHANG)
522            if pid == 0 or os.WIFSTOPPED(status):
523                return False
524        except OSError:
525            return False
526
527        workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
528
529        if os.WIFEXITED(status):
530            status = os.WEXITSTATUS(status)
531        elif os.WIFSIGNALED(status):
532            # Per shell conventions for $?, when a process exits due to
533            # a signal, we return an exit code of 128 + SIGNUM
534            status = 128 + os.WTERMSIG(status)
535
536        task = self.build_pids[pid]
537        del self.build_pids[pid]
538
539        self.build_pipes[pid].close()
540        del self.build_pipes[pid]
541
542        worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
543
544        return True
545
546    def handle_finishnow(self, _):
547        if self.build_pids:
548            logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
549            for k, v in iter(self.build_pids.items()):
550                try:
551                    os.kill(-k, signal.SIGTERM)
552                    os.waitpid(-1, 0)
553                except:
554                    pass
555        for pipe in self.build_pipes:
556            self.build_pipes[pipe].read()
557
558try:
559    worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
560    if not profiling:
561        worker.serve()
562    else:
563        profname = "profile-worker.log"
564        prof = profile.Profile()
565        try:
566            profile.Profile.runcall(prof, worker.serve)
567        finally:
568            prof.dump_stats(profname)
569            bb.utils.process_profilelog(profname)
570except BaseException as e:
571    if not normalexit:
572        import traceback
573        sys.stderr.write(traceback.format_exc())
574        sys.stderr.write(str(e))
575finally:
576    worker_thread_exit = True
577    worker_thread.join()
578
579workerlog_write("exiting")
580if not normalexit:
581    sys.exit(1)
582sys.exit(0)
583