1#!/usr/bin/env python3
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
6import os
7import sys
8import warnings
9sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
10from bb import fetch2
11import logging
12import bb
13import select
14import errno
15import signal
16import pickle
17import traceback
18import queue
19from multiprocessing import Lock
20from threading import Thread
21
22if sys.getfilesystemencoding() != "utf-8":
23    sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.")
24
25# Users shouldn't be running this code directly
26if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
27    print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
28    sys.exit(1)
29
30profiling = False
31if sys.argv[1].startswith("decafbadbad"):
32    profiling = True
33    try:
34        import cProfile as profile
35    except:
36        import profile
37
38# Unbuffer stdout to avoid log truncation in the event
39# of an unorderly exit as well as to provide timely
40# updates to log files for use with tail
41try:
42    if sys.stdout.name == '<stdout>':
43        import fcntl
44        fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
45        fl |= os.O_SYNC
46        fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
47        #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
48except:
49    pass
50
51logger = logging.getLogger("BitBake")
52
53worker_pipe = sys.stdout.fileno()
54bb.utils.nonblockingfd(worker_pipe)
55# Need to guard against multiprocessing being used in child processes
56# and multiple processes trying to write to the parent at the same time
57worker_pipe_lock = None
58
59handler = bb.event.LogHandler()
60logger.addHandler(handler)
61
62if 0:
63    # Code to write out a log file of all events passing through the worker
64    logfilename = "/tmp/workerlogfile"
65    format_str = "%(levelname)s: %(message)s"
66    conlogformat = bb.msg.BBLogFormatter(format_str)
67    consolelog = logging.FileHandler(logfilename)
68    bb.msg.addDefaultlogFilter(consolelog)
69    consolelog.setFormatter(conlogformat)
70    logger.addHandler(consolelog)
71
72worker_queue = queue.Queue()
73
74def worker_fire(event, d):
75    data = b"<event>" + pickle.dumps(event) + b"</event>"
76    worker_fire_prepickled(data)
77
78def worker_fire_prepickled(event):
79    global worker_queue
80
81    worker_queue.put(event)
82
83#
84# We can end up with write contention with the cooker, it can be trying to send commands
85# and we can be trying to send event data back. Therefore use a separate thread for writing
86# back data to cooker.
87#
88worker_thread_exit = False
89
90def worker_flush(worker_queue):
91    worker_queue_int = b""
92    global worker_pipe, worker_thread_exit
93
94    while True:
95        try:
96            worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
97        except queue.Empty:
98            pass
99        while (worker_queue_int or not worker_queue.empty()):
100            try:
101                (_, ready, _) = select.select([], [worker_pipe], [], 1)
102                if not worker_queue.empty():
103                    worker_queue_int = worker_queue_int + worker_queue.get()
104                written = os.write(worker_pipe, worker_queue_int)
105                worker_queue_int = worker_queue_int[written:]
106            except (IOError, OSError) as e:
107                if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
108                    raise
109        if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
110            return
111
112worker_thread = Thread(target=worker_flush, args=(worker_queue,))
113worker_thread.start()
114
115def worker_child_fire(event, d):
116    global worker_pipe
117    global worker_pipe_lock
118
119    data = b"<event>" + pickle.dumps(event) + b"</event>"
120    try:
121        worker_pipe_lock.acquire()
122        worker_pipe.write(data)
123        worker_pipe_lock.release()
124    except IOError:
125        sigterm_handler(None, None)
126        raise
127
128bb.event.worker_fire = worker_fire
129
130lf = None
131#lf = open("/tmp/workercommandlog", "w+")
132def workerlog_write(msg):
133    if lf:
134        lf.write(msg)
135        lf.flush()
136
137def sigterm_handler(signum, frame):
138    signal.signal(signal.SIGTERM, signal.SIG_DFL)
139    os.killpg(0, signal.SIGTERM)
140    sys.exit()
141
142def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False):
143    # We need to setup the environment BEFORE the fork, since
144    # a fork() or exec*() activates PSEUDO...
145
146    envbackup = {}
147    fakeenv = {}
148    umask = None
149
150    taskdep = workerdata["taskdeps"][fn]
151    if 'umask' in taskdep and taskname in taskdep['umask']:
152        # umask might come in as a number or text string..
153        try:
154             umask = int(taskdep['umask'][taskname],8)
155        except TypeError:
156             umask = taskdep['umask'][taskname]
157
158    dry_run = cfg.dry_run or dry_run_exec
159
160    # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
161    if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
162        envvars = (workerdata["fakerootenv"][fn] or "").split()
163        for key, value in (var.split('=') for var in envvars):
164            envbackup[key] = os.environ.get(key)
165            os.environ[key] = value
166            fakeenv[key] = value
167
168        fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
169        for p in fakedirs:
170            bb.utils.mkdirhier(p)
171        logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
172                        (fn, taskname, ', '.join(fakedirs)))
173    else:
174        envvars = (workerdata["fakerootnoenv"][fn] or "").split()
175        for key, value in (var.split('=') for var in envvars):
176            envbackup[key] = os.environ.get(key)
177            os.environ[key] = value
178            fakeenv[key] = value
179
180    sys.stdout.flush()
181    sys.stderr.flush()
182
183    try:
184        pipein, pipeout = os.pipe()
185        pipein = os.fdopen(pipein, 'rb', 4096)
186        pipeout = os.fdopen(pipeout, 'wb', 0)
187        pid = os.fork()
188    except OSError as e:
189        logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
190        sys.exit(1)
191
192    if pid == 0:
193        def child():
194            global worker_pipe
195            global worker_pipe_lock
196            pipein.close()
197
198            signal.signal(signal.SIGTERM, sigterm_handler)
199            # Let SIGHUP exit as SIGTERM
200            signal.signal(signal.SIGHUP, sigterm_handler)
201            bb.utils.signal_on_parent_exit("SIGTERM")
202
203            # Save out the PID so that the event can include it the
204            # events
205            bb.event.worker_pid = os.getpid()
206            bb.event.worker_fire = worker_child_fire
207            worker_pipe = pipeout
208            worker_pipe_lock = Lock()
209
210            # Make the child the process group leader and ensure no
211            # child process will be controlled by the current terminal
212            # This ensures signals sent to the controlling terminal like Ctrl+C
213            # don't stop the child processes.
214            os.setsid()
215            # No stdin
216            newsi = os.open(os.devnull, os.O_RDWR)
217            os.dup2(newsi, sys.stdin.fileno())
218
219            if umask:
220                os.umask(umask)
221
222            try:
223                bb_cache = bb.cache.NoCache(databuilder)
224                (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
225                the_data = databuilder.mcdata[mc]
226                the_data.setVar("BB_WORKERCONTEXT", "1")
227                the_data.setVar("BB_TASKDEPDATA", taskdepdata)
228                if cfg.limited_deps:
229                    the_data.setVar("BB_LIMITEDDEPS", "1")
230                the_data.setVar("BUILDNAME", workerdata["buildname"])
231                the_data.setVar("DATE", workerdata["date"])
232                the_data.setVar("TIME", workerdata["time"])
233                for varname, value in extraconfigdata.items():
234                    the_data.setVar(varname, value)
235
236                bb.parse.siggen.set_taskdata(workerdata["sigdata"])
237                ret = 0
238
239                the_data = bb_cache.loadDataFull(fn, appends)
240                the_data.setVar('BB_TASKHASH', taskhash)
241                the_data.setVar('BB_UNIHASH', unihash)
242
243                bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
244
245                # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
246                # successfully. We also need to unset anything from the environment which shouldn't be there
247                exports = bb.data.exported_vars(the_data)
248
249                bb.utils.empty_environment()
250                for e, v in exports:
251                    os.environ[e] = v
252
253                for e in fakeenv:
254                    os.environ[e] = fakeenv[e]
255                    the_data.setVar(e, fakeenv[e])
256                    the_data.setVarFlag(e, 'export', "1")
257
258                task_exports = the_data.getVarFlag(taskname, 'exports')
259                if task_exports:
260                    for e in task_exports.split():
261                        the_data.setVarFlag(e, 'export', '1')
262                        v = the_data.getVar(e)
263                        if v is not None:
264                            os.environ[e] = v
265
266                if quieterrors:
267                    the_data.setVarFlag(taskname, "quieterrors", "1")
268
269            except Exception:
270                if not quieterrors:
271                    logger.critical(traceback.format_exc())
272                os._exit(1)
273            try:
274                if dry_run:
275                    return 0
276                return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
277            except:
278                os._exit(1)
279        if not profiling:
280            os._exit(child())
281        else:
282            profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
283            prof = profile.Profile()
284            try:
285                ret = profile.Profile.runcall(prof, child)
286            finally:
287                prof.dump_stats(profname)
288                bb.utils.process_profilelog(profname)
289                os._exit(ret)
290    else:
291        for key, value in iter(envbackup.items()):
292            if value is None:
293                del os.environ[key]
294            else:
295                os.environ[key] = value
296
297    return pid, pipein, pipeout
298
299class runQueueWorkerPipe():
300    """
301    Abstraction for a pipe between a worker thread and the worker server
302    """
303    def __init__(self, pipein, pipeout):
304        self.input = pipein
305        if pipeout:
306            pipeout.close()
307        bb.utils.nonblockingfd(self.input)
308        self.queue = b""
309
310    def read(self):
311        start = len(self.queue)
312        try:
313            self.queue = self.queue + (self.input.read(102400) or b"")
314        except (OSError, IOError) as e:
315            if e.errno != errno.EAGAIN:
316                raise
317
318        end = len(self.queue)
319        index = self.queue.find(b"</event>")
320        while index != -1:
321            worker_fire_prepickled(self.queue[:index+8])
322            self.queue = self.queue[index+8:]
323            index = self.queue.find(b"</event>")
324        return (end > start)
325
326    def close(self):
327        while self.read():
328            continue
329        if len(self.queue) > 0:
330            print("Warning, worker child left partial message: %s" % self.queue)
331        self.input.close()
332
333normalexit = False
334
335class BitbakeWorker(object):
336    def __init__(self, din):
337        self.input = din
338        bb.utils.nonblockingfd(self.input)
339        self.queue = b""
340        self.cookercfg = None
341        self.databuilder = None
342        self.data = None
343        self.extraconfigdata = None
344        self.build_pids = {}
345        self.build_pipes = {}
346
347        signal.signal(signal.SIGTERM, self.sigterm_exception)
348        # Let SIGHUP exit as SIGTERM
349        signal.signal(signal.SIGHUP, self.sigterm_exception)
350        if "beef" in sys.argv[1]:
351            bb.utils.set_process_name("Worker (Fakeroot)")
352        else:
353            bb.utils.set_process_name("Worker")
354
355    def sigterm_exception(self, signum, stackframe):
356        if signum == signal.SIGTERM:
357            bb.warn("Worker received SIGTERM, shutting down...")
358        elif signum == signal.SIGHUP:
359            bb.warn("Worker received SIGHUP, shutting down...")
360        self.handle_finishnow(None)
361        signal.signal(signal.SIGTERM, signal.SIG_DFL)
362        os.kill(os.getpid(), signal.SIGTERM)
363
364    def serve(self):
365        while True:
366            (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
367            if self.input in ready:
368                try:
369                    r = self.input.read()
370                    if len(r) == 0:
371                        # EOF on pipe, server must have terminated
372                        self.sigterm_exception(signal.SIGTERM, None)
373                    self.queue = self.queue + r
374                except (OSError, IOError):
375                    pass
376            if len(self.queue):
377                self.handle_item(b"cookerconfig", self.handle_cookercfg)
378                self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
379                self.handle_item(b"workerdata", self.handle_workerdata)
380                self.handle_item(b"runtask", self.handle_runtask)
381                self.handle_item(b"finishnow", self.handle_finishnow)
382                self.handle_item(b"ping", self.handle_ping)
383                self.handle_item(b"quit", self.handle_quit)
384
385            for pipe in self.build_pipes:
386                if self.build_pipes[pipe].input in ready:
387                    self.build_pipes[pipe].read()
388            if len(self.build_pids):
389                while self.process_waitpid():
390                    continue
391
392
393    def handle_item(self, item, func):
394        if self.queue.startswith(b"<" + item + b">"):
395            index = self.queue.find(b"</" + item + b">")
396            while index != -1:
397                func(self.queue[(len(item) + 2):index])
398                self.queue = self.queue[(index + len(item) + 3):]
399                index = self.queue.find(b"</" + item + b">")
400
401    def handle_cookercfg(self, data):
402        self.cookercfg = pickle.loads(data)
403        self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
404        self.databuilder.parseBaseConfiguration()
405        self.data = self.databuilder.data
406
407    def handle_extraconfigdata(self, data):
408        self.extraconfigdata = pickle.loads(data)
409
410    def handle_workerdata(self, data):
411        self.workerdata = pickle.loads(data)
412        bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
413        bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
414        bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
415        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
416        for mc in self.databuilder.mcdata:
417            self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
418
419    def handle_ping(self, _):
420        workerlog_write("Handling ping\n")
421
422        logger.warning("Pong from bitbake-worker!")
423
424    def handle_quit(self, data):
425        workerlog_write("Handling quit\n")
426
427        global normalexit
428        normalexit = True
429        sys.exit(0)
430
431    def handle_runtask(self, data):
432        fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data)
433        workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
434
435        pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)
436
437        self.build_pids[pid] = task
438        self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
439
440    def process_waitpid(self):
441        """
442        Return none is there are no processes awaiting result collection, otherwise
443        collect the process exit codes and close the information pipe.
444        """
445        try:
446            pid, status = os.waitpid(-1, os.WNOHANG)
447            if pid == 0 or os.WIFSTOPPED(status):
448                return False
449        except OSError:
450            return False
451
452        workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
453
454        if os.WIFEXITED(status):
455            status = os.WEXITSTATUS(status)
456        elif os.WIFSIGNALED(status):
457            # Per shell conventions for $?, when a process exits due to
458            # a signal, we return an exit code of 128 + SIGNUM
459            status = 128 + os.WTERMSIG(status)
460
461        task = self.build_pids[pid]
462        del self.build_pids[pid]
463
464        self.build_pipes[pid].close()
465        del self.build_pipes[pid]
466
467        worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
468
469        return True
470
471    def handle_finishnow(self, _):
472        if self.build_pids:
473            logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
474            for k, v in iter(self.build_pids.items()):
475                try:
476                    os.kill(-k, signal.SIGTERM)
477                    os.waitpid(-1, 0)
478                except:
479                    pass
480        for pipe in self.build_pipes:
481            self.build_pipes[pipe].read()
482
483try:
484    worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
485    if not profiling:
486        worker.serve()
487    else:
488        profname = "profile-worker.log"
489        prof = profile.Profile()
490        try:
491            profile.Profile.runcall(prof, worker.serve)
492        finally:
493            prof.dump_stats(profname)
494            bb.utils.process_profilelog(profname)
495except BaseException as e:
496    if not normalexit:
497        import traceback
498        sys.stderr.write(traceback.format_exc())
499        sys.stderr.write(str(e))
500
501worker_thread_exit = True
502worker_thread.join()
503
504workerlog_write("exitting")
505sys.exit(0)
506