xref: /openbmc/openbmc/poky/bitbake/lib/bb/runqueue.py (revision 1e42709b)
1"""
2BitBake 'RunQueue' implementation
3
4Handles preparation and execution of a queue of tasks
5"""
6
7# Copyright (C) 2006-2007  Richard Purdie
8#
9# SPDX-License-Identifier: GPL-2.0-only
10#
11
12import copy
13import os
14import sys
15import stat
16import errno
17import logging
18import re
19import bb
20from bb import msg, event
21from bb import monitordisk
22import subprocess
23import pickle
24from multiprocessing import Process
25import shlex
26import pprint
27
28bblogger = logging.getLogger("BitBake")
29logger = logging.getLogger("BitBake.RunQueue")
30hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv")
31
32__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' )
33
34def fn_from_tid(tid):
35     return tid.rsplit(":", 1)[0]
36
37def taskname_from_tid(tid):
38    return tid.rsplit(":", 1)[1]
39
40def mc_from_tid(tid):
41    if tid.startswith('mc:') and tid.count(':') >= 2:
42        return tid.split(':')[1]
43    return ""
44
45def split_tid(tid):
46    (mc, fn, taskname, _) = split_tid_mcfn(tid)
47    return (mc, fn, taskname)
48
49def split_mc(n):
50    if n.startswith("mc:") and n.count(':') >= 2:
51        _, mc, n = n.split(":", 2)
52        return (mc, n)
53    return ('', n)
54
55def split_tid_mcfn(tid):
56    if tid.startswith('mc:') and tid.count(':') >= 2:
57        elems = tid.split(':')
58        mc = elems[1]
59        fn = ":".join(elems[2:-1])
60        taskname = elems[-1]
61        mcfn = "mc:" + mc + ":" + fn
62    else:
63        tid = tid.rsplit(":", 1)
64        mc = ""
65        fn = tid[0]
66        taskname = tid[1]
67        mcfn = fn
68
69    return (mc, fn, taskname, mcfn)
70
71def build_tid(mc, fn, taskname):
72    if mc:
73        return "mc:" + mc + ":" + fn + ":" + taskname
74    return fn + ":" + taskname
75
76# Index used to pair up potentially matching multiconfig tasks
77# We match on PN, taskname and hash being equal
78def pending_hash_index(tid, rqdata):
79    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
80    pn = rqdata.dataCaches[mc].pkg_fn[taskfn]
81    h = rqdata.runtaskentries[tid].unihash
82    return pn + ":" + "taskname" + h
83
84class RunQueueStats:
85    """
86    Holds statistics on the tasks handled by the associated runQueue
87    """
88    def __init__(self, total):
89        self.completed = 0
90        self.skipped = 0
91        self.failed = 0
92        self.active = 0
93        self.total = total
94
95    def copy(self):
96        obj = self.__class__(self.total)
97        obj.__dict__.update(self.__dict__)
98        return obj
99
100    def taskFailed(self):
101        self.active = self.active - 1
102        self.failed = self.failed + 1
103
104    def taskCompleted(self):
105        self.active = self.active - 1
106        self.completed = self.completed + 1
107
108    def taskSkipped(self):
109        self.active = self.active + 1
110        self.skipped = self.skipped + 1
111
112    def taskActive(self):
113        self.active = self.active + 1
114
115# These values indicate the next step due to be run in the
116# runQueue state machine
117runQueuePrepare = 2
118runQueueSceneInit = 3
119runQueueRunning = 6
120runQueueFailed = 7
121runQueueCleanUp = 8
122runQueueComplete = 9
123
124class RunQueueScheduler(object):
125    """
126    Control the order tasks are scheduled in.
127    """
128    name = "basic"
129
130    def __init__(self, runqueue, rqdata):
131        """
132        The default scheduler just returns the first buildable task (the
133        priority map is sorted by task number)
134        """
135        self.rq = runqueue
136        self.rqdata = rqdata
137        self.numTasks = len(self.rqdata.runtaskentries)
138
139        self.prio_map = [self.rqdata.runtaskentries.keys()]
140
141        self.buildable = set()
142        self.skip_maxthread = {}
143        self.stamps = {}
144        for tid in self.rqdata.runtaskentries:
145            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
146            self.stamps[tid] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
147            if tid in self.rq.runq_buildable:
148                self.buildable.append(tid)
149
150        self.rev_prio_map = None
151
152    def next_buildable_task(self):
153        """
154        Return the id of the first task we find that is buildable
155        """
156        # Once tasks are running we don't need to worry about them again
157        self.buildable.difference_update(self.rq.runq_running)
158        buildable = set(self.buildable)
159        buildable.difference_update(self.rq.holdoff_tasks)
160        buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered)
161        if not buildable:
162            return None
163
164        # Filter out tasks that have a max number of threads that have been exceeded
165        skip_buildable = {}
166        for running in self.rq.runq_running.difference(self.rq.runq_complete):
167            rtaskname = taskname_from_tid(running)
168            if rtaskname not in self.skip_maxthread:
169                self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads")
170            if not self.skip_maxthread[rtaskname]:
171                continue
172            if rtaskname in skip_buildable:
173                skip_buildable[rtaskname] += 1
174            else:
175                skip_buildable[rtaskname] = 1
176
177        if len(buildable) == 1:
178            tid = buildable.pop()
179            taskname = taskname_from_tid(tid)
180            if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
181                return None
182            stamp = self.stamps[tid]
183            if stamp not in self.rq.build_stamps.values():
184                return tid
185
186        if not self.rev_prio_map:
187            self.rev_prio_map = {}
188            for tid in self.rqdata.runtaskentries:
189                self.rev_prio_map[tid] = self.prio_map.index(tid)
190
191        best = None
192        bestprio = None
193        for tid in buildable:
194            taskname = taskname_from_tid(tid)
195            if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
196                continue
197            prio = self.rev_prio_map[tid]
198            if bestprio is None or bestprio > prio:
199                stamp = self.stamps[tid]
200                if stamp in self.rq.build_stamps.values():
201                    continue
202                bestprio = prio
203                best = tid
204
205        return best
206
207    def next(self):
208        """
209        Return the id of the task we should build next
210        """
211        if self.rq.can_start_task():
212            return self.next_buildable_task()
213
214    def newbuildable(self, task):
215        self.buildable.add(task)
216
217    def removebuildable(self, task):
218        self.buildable.remove(task)
219
220    def describe_task(self, taskid):
221        result = 'ID %s' % taskid
222        if self.rev_prio_map:
223            result = result + (' pri %d' % self.rev_prio_map[taskid])
224        return result
225
226    def dump_prio(self, comment):
227        bb.debug(3, '%s (most important first):\n%s' %
228                 (comment,
229                  '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for
230                             index, taskid in enumerate(self.prio_map)])))
231
232class RunQueueSchedulerSpeed(RunQueueScheduler):
233    """
234    A scheduler optimised for speed. The priority map is sorted by task weight,
235    heavier weighted tasks (tasks needed by the most other tasks) are run first.
236    """
237    name = "speed"
238
239    def __init__(self, runqueue, rqdata):
240        """
241        The priority map is sorted by task weight.
242        """
243        RunQueueScheduler.__init__(self, runqueue, rqdata)
244
245        weights = {}
246        for tid in self.rqdata.runtaskentries:
247            weight = self.rqdata.runtaskentries[tid].weight
248            if not weight in weights:
249                weights[weight] = []
250            weights[weight].append(tid)
251
252        self.prio_map = []
253        for weight in sorted(weights):
254            for w in weights[weight]:
255                self.prio_map.append(w)
256
257        self.prio_map.reverse()
258
259class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
260    """
261    A scheduler optimised to complete .bb files as quickly as possible. The
262    priority map is sorted by task weight, but then reordered so once a given
263    .bb file starts to build, it's completed as quickly as possible by
264    running all tasks related to the same .bb file one after the after.
265    This works well where disk space is at a premium and classes like OE's
266    rm_work are in force.
267    """
268    name = "completion"
269
270    def __init__(self, runqueue, rqdata):
271        super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata)
272
273        # Extract list of tasks for each recipe, with tasks sorted
274        # ascending from "must run first" (typically do_fetch) to
275        # "runs last" (do_build). The speed scheduler prioritizes
276        # tasks that must run first before the ones that run later;
277        # this is what we depend on here.
278        task_lists = {}
279        for taskid in self.prio_map:
280            fn, taskname = taskid.rsplit(':', 1)
281            task_lists.setdefault(fn, []).append(taskname)
282
283        # Now unify the different task lists. The strategy is that
284        # common tasks get skipped and new ones get inserted after the
285        # preceeding common one(s) as they are found. Because task
286        # lists should differ only by their number of tasks, but not
287        # the ordering of the common tasks, this should result in a
288        # deterministic result that is a superset of the individual
289        # task ordering.
290        all_tasks = []
291        for recipe, new_tasks in task_lists.items():
292            index = 0
293            old_task = all_tasks[index] if index < len(all_tasks) else None
294            for new_task in new_tasks:
295                if old_task == new_task:
296                    # Common task, skip it. This is the fast-path which
297                    # avoids a full search.
298                    index += 1
299                    old_task = all_tasks[index] if index < len(all_tasks) else None
300                else:
301                    try:
302                        index = all_tasks.index(new_task)
303                        # Already present, just not at the current
304                        # place. We re-synchronized by changing the
305                        # index so that it matches again. Now
306                        # move on to the next existing task.
307                        index += 1
308                        old_task = all_tasks[index] if index < len(all_tasks) else None
309                    except ValueError:
310                        # Not present. Insert before old_task, which
311                        # remains the same (but gets shifted back).
312                        all_tasks.insert(index, new_task)
313                        index += 1
314        bb.debug(3, 'merged task list: %s'  % all_tasks)
315
316        # Now reverse the order so that tasks that finish the work on one
317        # recipe are considered more imporant (= come first). The ordering
318        # is now so that do_build is most important.
319        all_tasks.reverse()
320
321        # Group tasks of the same kind before tasks of less important
322        # kinds at the head of the queue (because earlier = lower
323        # priority number = runs earlier), while preserving the
324        # ordering by recipe. If recipe foo is more important than
325        # bar, then the goal is to work on foo's do_populate_sysroot
326        # before bar's do_populate_sysroot and on the more important
327        # tasks of foo before any of the less important tasks in any
328        # other recipe (if those other recipes are more important than
329        # foo).
330        #
331        # All of this only applies when tasks are runable. Explicit
332        # dependencies still override this ordering by priority.
333        #
334        # Here's an example why this priority re-ordering helps with
335        # minimizing disk usage. Consider a recipe foo with a higher
336        # priority than bar where foo DEPENDS on bar. Then the
337        # implicit rule (from base.bbclass) is that foo's do_configure
338        # depends on bar's do_populate_sysroot. This ensures that
339        # bar's do_populate_sysroot gets done first. Normally the
340        # tasks from foo would continue to run once that is done, and
341        # bar only gets completed and cleaned up later. By ordering
342        # bar's task that depend on bar's do_populate_sysroot before foo's
343        # do_configure, that problem gets avoided.
344        task_index = 0
345        self.dump_prio('original priorities')
346        for task in all_tasks:
347            for index in range(task_index, self.numTasks):
348                taskid = self.prio_map[index]
349                taskname = taskid.rsplit(':', 1)[1]
350                if taskname == task:
351                    del self.prio_map[index]
352                    self.prio_map.insert(task_index, taskid)
353                    task_index += 1
354        self.dump_prio('completion priorities')
355
356class RunTaskEntry(object):
357    def __init__(self):
358        self.depends = set()
359        self.revdeps = set()
360        self.hash = None
361        self.unihash = None
362        self.task = None
363        self.weight = 1
364
365class RunQueueData:
366    """
367    BitBake Run Queue implementation
368    """
369    def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets):
370        self.cooker = cooker
371        self.dataCaches = dataCaches
372        self.taskData = taskData
373        self.targets = targets
374        self.rq = rq
375        self.warn_multi_bb = False
376
377        self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST") or ""
378        self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST") or "").split()
379        self.setscenewhitelist = get_setscene_enforce_whitelist(cfgData, targets)
380        self.setscenewhitelist_checked = False
381        self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1")
382        self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter()
383
384        self.reset()
385
386    def reset(self):
387        self.runtaskentries = {}
388
389    def runq_depends_names(self, ids):
390        import re
391        ret = []
392        for id in ids:
393            nam = os.path.basename(id)
394            nam = re.sub("_[^,]*,", ",", nam)
395            ret.extend([nam])
396        return ret
397
398    def get_task_hash(self, tid):
399        return self.runtaskentries[tid].hash
400
401    def get_task_unihash(self, tid):
402        return self.runtaskentries[tid].unihash
403
404    def get_user_idstring(self, tid, task_name_suffix = ""):
405        return tid + task_name_suffix
406
407    def get_short_user_idstring(self, task, task_name_suffix = ""):
408        (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
409        pn = self.dataCaches[mc].pkg_fn[taskfn]
410        taskname = taskname_from_tid(task) + task_name_suffix
411        return "%s:%s" % (pn, taskname)
412
413    def circular_depchains_handler(self, tasks):
414        """
415        Some tasks aren't buildable, likely due to circular dependency issues.
416        Identify the circular dependencies and print them in a user readable format.
417        """
418        from copy import deepcopy
419
420        valid_chains = []
421        explored_deps = {}
422        msgs = []
423
424        class TooManyLoops(Exception):
425            pass
426
427        def chain_reorder(chain):
428            """
429            Reorder a dependency chain so the lowest task id is first
430            """
431            lowest = 0
432            new_chain = []
433            for entry in range(len(chain)):
434                if chain[entry] < chain[lowest]:
435                    lowest = entry
436            new_chain.extend(chain[lowest:])
437            new_chain.extend(chain[:lowest])
438            return new_chain
439
440        def chain_compare_equal(chain1, chain2):
441            """
442            Compare two dependency chains and see if they're the same
443            """
444            if len(chain1) != len(chain2):
445                return False
446            for index in range(len(chain1)):
447                if chain1[index] != chain2[index]:
448                    return False
449            return True
450
451        def chain_array_contains(chain, chain_array):
452            """
453            Return True if chain_array contains chain
454            """
455            for ch in chain_array:
456                if chain_compare_equal(ch, chain):
457                    return True
458            return False
459
460        def find_chains(tid, prev_chain):
461            prev_chain.append(tid)
462            total_deps = []
463            total_deps.extend(self.runtaskentries[tid].revdeps)
464            for revdep in self.runtaskentries[tid].revdeps:
465                if revdep in prev_chain:
466                    idx = prev_chain.index(revdep)
467                    # To prevent duplicates, reorder the chain to start with the lowest taskid
468                    # and search through an array of those we've already printed
469                    chain = prev_chain[idx:]
470                    new_chain = chain_reorder(chain)
471                    if not chain_array_contains(new_chain, valid_chains):
472                        valid_chains.append(new_chain)
473                        msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
474                        for dep in new_chain:
475                            msgs.append("  Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends)))
476                        msgs.append("\n")
477                    if len(valid_chains) > 10:
478                        msgs.append("Aborted dependency loops search after 10 matches.\n")
479                        raise TooManyLoops
480                    continue
481                scan = False
482                if revdep not in explored_deps:
483                    scan = True
484                elif revdep in explored_deps[revdep]:
485                    scan = True
486                else:
487                    for dep in prev_chain:
488                        if dep in explored_deps[revdep]:
489                            scan = True
490                if scan:
491                    find_chains(revdep, copy.deepcopy(prev_chain))
492                for dep in explored_deps[revdep]:
493                    if dep not in total_deps:
494                        total_deps.append(dep)
495
496            explored_deps[tid] = total_deps
497
498        try:
499            for task in tasks:
500                find_chains(task, [])
501        except TooManyLoops:
502            pass
503
504        return msgs
505
506    def calculate_task_weights(self, endpoints):
507        """
508        Calculate a number representing the "weight" of each task. Heavier weighted tasks
509        have more dependencies and hence should be executed sooner for maximum speed.
510
511        This function also sanity checks the task list finding tasks that are not
512        possible to execute due to circular dependencies.
513        """
514
515        numTasks = len(self.runtaskentries)
516        weight = {}
517        deps_left = {}
518        task_done = {}
519
520        for tid in self.runtaskentries:
521            task_done[tid] = False
522            weight[tid] = 1
523            deps_left[tid] = len(self.runtaskentries[tid].revdeps)
524
525        for tid in endpoints:
526            weight[tid] = 10
527            task_done[tid] = True
528
529        while True:
530            next_points = []
531            for tid in endpoints:
532                for revdep in self.runtaskentries[tid].depends:
533                    weight[revdep] = weight[revdep] + weight[tid]
534                    deps_left[revdep] = deps_left[revdep] - 1
535                    if deps_left[revdep] == 0:
536                        next_points.append(revdep)
537                        task_done[revdep] = True
538            endpoints = next_points
539            if len(next_points) == 0:
540                break
541
542        # Circular dependency sanity check
543        problem_tasks = []
544        for tid in self.runtaskentries:
545            if task_done[tid] is False or deps_left[tid] != 0:
546                problem_tasks.append(tid)
547                logger.debug2("Task %s is not buildable", tid)
548                logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid])
549            self.runtaskentries[tid].weight = weight[tid]
550
551        if problem_tasks:
552            message = "%s unbuildable tasks were found.\n" % len(problem_tasks)
553            message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
554            message = message + "Identifying dependency loops (this may take a short while)...\n"
555            logger.error(message)
556
557            msgs = self.circular_depchains_handler(problem_tasks)
558
559            message = "\n"
560            for msg in msgs:
561                message = message + msg
562            bb.msg.fatal("RunQueue", message)
563
564        return weight
565
566    def prepare(self):
567        """
568        Turn a set of taskData into a RunQueue and compute data needed
569        to optimise the execution order.
570        """
571
572        runq_build = {}
573        recursivetasks = {}
574        recursiveitasks = {}
575        recursivetasksselfref = set()
576
577        taskData = self.taskData
578
579        found = False
580        for mc in self.taskData:
581            if len(taskData[mc].taskentries) > 0:
582                found = True
583                break
584        if not found:
585            # Nothing to do
586            return 0
587
588        self.init_progress_reporter.start()
589        self.init_progress_reporter.next_stage()
590
591        # Step A - Work out a list of tasks to run
592        #
593        # Taskdata gives us a list of possible providers for every build and run
594        # target ordered by priority. It also gives information on each of those
595        # providers.
596        #
597        # To create the actual list of tasks to execute we fix the list of
598        # providers and then resolve the dependencies into task IDs. This
599        # process is repeated for each type of dependency (tdepends, deptask,
600        # rdeptast, recrdeptask, idepends).
601
602        def add_build_dependencies(depids, tasknames, depends, mc):
603            for depname in depids:
604                # Won't be in build_targets if ASSUME_PROVIDED
605                if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]:
606                    continue
607                depdata = taskData[mc].build_targets[depname][0]
608                if depdata is None:
609                    continue
610                for taskname in tasknames:
611                    t = depdata + ":" + taskname
612                    if t in taskData[mc].taskentries:
613                        depends.add(t)
614
615        def add_runtime_dependencies(depids, tasknames, depends, mc):
616            for depname in depids:
617                if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]:
618                    continue
619                depdata = taskData[mc].run_targets[depname][0]
620                if depdata is None:
621                    continue
622                for taskname in tasknames:
623                    t = depdata + ":" + taskname
624                    if t in taskData[mc].taskentries:
625                        depends.add(t)
626
627        def add_mc_dependencies(mc, tid):
628            mcdeps = taskData[mc].get_mcdepends()
629            for dep in mcdeps:
630                mcdependency = dep.split(':')
631                pn = mcdependency[3]
632                frommc = mcdependency[1]
633                mcdep = mcdependency[2]
634                deptask = mcdependency[4]
635                if mc == frommc:
636                    fn = taskData[mcdep].build_targets[pn][0]
637                    newdep = '%s:%s' % (fn,deptask)
638                    taskData[mc].taskentries[tid].tdepends.append(newdep)
639
640        for mc in taskData:
641            for tid in taskData[mc].taskentries:
642
643                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
644                #runtid = build_tid(mc, fn, taskname)
645
646                #logger.debug2("Processing %s,%s:%s", mc, fn, taskname)
647
648                depends = set()
649                task_deps = self.dataCaches[mc].task_deps[taskfn]
650
651                self.runtaskentries[tid] = RunTaskEntry()
652
653                if fn in taskData[mc].failed_fns:
654                    continue
655
656                # We add multiconfig dependencies before processing internal task deps (tdepends)
657                if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']:
658                    add_mc_dependencies(mc, tid)
659
660                # Resolve task internal dependencies
661                #
662                # e.g. addtask before X after Y
663                for t in taskData[mc].taskentries[tid].tdepends:
664                    (depmc, depfn, deptaskname, _) = split_tid_mcfn(t)
665                    depends.add(build_tid(depmc, depfn, deptaskname))
666
667                # Resolve 'deptask' dependencies
668                #
669                # e.g. do_sometask[deptask] = "do_someothertask"
670                # (makes sure sometask runs after someothertask of all DEPENDS)
671                if 'deptask' in task_deps and taskname in task_deps['deptask']:
672                    tasknames = task_deps['deptask'][taskname].split()
673                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
674
675                # Resolve 'rdeptask' dependencies
676                #
677                # e.g. do_sometask[rdeptask] = "do_someothertask"
678                # (makes sure sometask runs after someothertask of all RDEPENDS)
679                if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']:
680                    tasknames = task_deps['rdeptask'][taskname].split()
681                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
682
683                # Resolve inter-task dependencies
684                #
685                # e.g. do_sometask[depends] = "targetname:do_someothertask"
686                # (makes sure sometask runs after targetname's someothertask)
687                idepends = taskData[mc].taskentries[tid].idepends
688                for (depname, idependtask) in idepends:
689                    if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps:
690                        # Won't be in build_targets if ASSUME_PROVIDED
691                        depdata = taskData[mc].build_targets[depname][0]
692                        if depdata is not None:
693                            t = depdata + ":" + idependtask
694                            depends.add(t)
695                            if t not in taskData[mc].taskentries:
696                                bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
697                irdepends = taskData[mc].taskentries[tid].irdepends
698                for (depname, idependtask) in irdepends:
699                    if depname in taskData[mc].run_targets:
700                        # Won't be in run_targets if ASSUME_PROVIDED
701                        if not taskData[mc].run_targets[depname]:
702                            continue
703                        depdata = taskData[mc].run_targets[depname][0]
704                        if depdata is not None:
705                            t = depdata + ":" + idependtask
706                            depends.add(t)
707                            if t not in taskData[mc].taskentries:
708                                bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
709
710                # Resolve recursive 'recrdeptask' dependencies (Part A)
711                #
712                # e.g. do_sometask[recrdeptask] = "do_someothertask"
713                # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
714                # We cover the recursive part of the dependencies below
715                if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']:
716                    tasknames = task_deps['recrdeptask'][taskname].split()
717                    recursivetasks[tid] = tasknames
718                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
719                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
720                    if taskname in tasknames:
721                        recursivetasksselfref.add(tid)
722
723                    if 'recideptask' in task_deps and taskname in task_deps['recideptask']:
724                        recursiveitasks[tid] = []
725                        for t in task_deps['recideptask'][taskname].split():
726                            newdep = build_tid(mc, fn, t)
727                            recursiveitasks[tid].append(newdep)
728
729                self.runtaskentries[tid].depends = depends
730                # Remove all self references
731                self.runtaskentries[tid].depends.discard(tid)
732
733        #self.dump_data()
734
735        self.init_progress_reporter.next_stage()
736
737        # Resolve recursive 'recrdeptask' dependencies (Part B)
738        #
739        # e.g. do_sometask[recrdeptask] = "do_someothertask"
740        # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
741        # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed
742
743        # Generating/interating recursive lists of dependencies is painful and potentially slow
744        # Precompute recursive task dependencies here by:
745        #     a) create a temp list of reverse dependencies (revdeps)
746        #     b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0)
747        #     c) combine the total list of dependencies in cumulativedeps
748        #     d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower)
749
750
751        revdeps = {}
752        deps = {}
753        cumulativedeps = {}
754        for tid in self.runtaskentries:
755            deps[tid] = set(self.runtaskentries[tid].depends)
756            revdeps[tid] = set()
757            cumulativedeps[tid] = set()
758        # Generate a temp list of reverse dependencies
759        for tid in self.runtaskentries:
760            for dep in self.runtaskentries[tid].depends:
761                revdeps[dep].add(tid)
762        # Find the dependency chain endpoints
763        endpoints = set()
764        for tid in self.runtaskentries:
765            if len(deps[tid]) == 0:
766                endpoints.add(tid)
767        # Iterate the chains collating dependencies
768        while endpoints:
769            next = set()
770            for tid in endpoints:
771                for dep in revdeps[tid]:
772                    cumulativedeps[dep].add(fn_from_tid(tid))
773                    cumulativedeps[dep].update(cumulativedeps[tid])
774                    if tid in deps[dep]:
775                        deps[dep].remove(tid)
776                    if len(deps[dep]) == 0:
777                        next.add(dep)
778            endpoints = next
779        #for tid in deps:
780        #    if len(deps[tid]) != 0:
781        #        bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid]))
782
783        # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to
784        # resolve these recursively until we aren't adding any further extra dependencies
785        extradeps = True
786        while extradeps:
787            extradeps = 0
788            for tid in recursivetasks:
789                tasknames = recursivetasks[tid]
790
791                totaldeps = set(self.runtaskentries[tid].depends)
792                if tid in recursiveitasks:
793                    totaldeps.update(recursiveitasks[tid])
794                    for dep in recursiveitasks[tid]:
795                        if dep not in self.runtaskentries:
796                            continue
797                        totaldeps.update(self.runtaskentries[dep].depends)
798
799                deps = set()
800                for dep in totaldeps:
801                    if dep in cumulativedeps:
802                        deps.update(cumulativedeps[dep])
803
804                for t in deps:
805                    for taskname in tasknames:
806                        newtid = t + ":" + taskname
807                        if newtid == tid:
808                            continue
809                        if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends:
810                            extradeps += 1
811                            self.runtaskentries[tid].depends.add(newtid)
812
813                # Handle recursive tasks which depend upon other recursive tasks
814                deps = set()
815                for dep in self.runtaskentries[tid].depends.intersection(recursivetasks):
816                    deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends))
817                for newtid in deps:
818                    for taskname in tasknames:
819                        if not newtid.endswith(":" + taskname):
820                            continue
821                        if newtid in self.runtaskentries:
822                            extradeps += 1
823                            self.runtaskentries[tid].depends.add(newtid)
824
825            bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps)
826
827        # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work
828        for tid in recursivetasksselfref:
829            self.runtaskentries[tid].depends.difference_update(recursivetasksselfref)
830
831        self.init_progress_reporter.next_stage()
832
833        #self.dump_data()
834
835        # Step B - Mark all active tasks
836        #
837        # Start with the tasks we were asked to run and mark all dependencies
838        # as active too. If the task is to be 'forced', clear its stamp. Once
839        # all active tasks are marked, prune the ones we don't need.
840
841        logger.verbose("Marking Active Tasks")
842
843        def mark_active(tid, depth):
844            """
845            Mark an item as active along with its depends
846            (calls itself recursively)
847            """
848
849            if tid in runq_build:
850                return
851
852            runq_build[tid] = 1
853
854            depends = self.runtaskentries[tid].depends
855            for depend in depends:
856                mark_active(depend, depth+1)
857
858        def invalidate_task(tid, error_nostamp):
859            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
860            taskdep = self.dataCaches[mc].task_deps[taskfn]
861            if fn + ":" + taskname not in taskData[mc].taskentries:
862                logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname)
863            if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
864                if error_nostamp:
865                    bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
866                else:
867                    bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
868            else:
869                logger.verbose("Invalidate task %s, %s", taskname, fn)
870                bb.parse.siggen.invalidate_task(taskname, self.dataCaches[mc], taskfn)
871
872        self.target_tids = []
873        for (mc, target, task, fn) in self.targets:
874
875            if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]:
876                continue
877
878            if target in taskData[mc].failed_deps:
879                continue
880
881            parents = False
882            if task.endswith('-'):
883                parents = True
884                task = task[:-1]
885
886            if fn in taskData[mc].failed_fns:
887                continue
888
889            # fn already has mc prefix
890            tid = fn + ":" + task
891            self.target_tids.append(tid)
892            if tid not in taskData[mc].taskentries:
893                import difflib
894                tasks = []
895                for x in taskData[mc].taskentries:
896                    if x.startswith(fn + ":"):
897                        tasks.append(taskname_from_tid(x))
898                close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7)
899                if close_matches:
900                    extra = ". Close matches:\n  %s" % "\n  ".join(close_matches)
901                else:
902                    extra = ""
903                bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra))
904
905            # For tasks called "XXXX-", ony run their dependencies
906            if parents:
907                for i in self.runtaskentries[tid].depends:
908                    mark_active(i, 1)
909            else:
910                mark_active(tid, 1)
911
912        self.init_progress_reporter.next_stage()
913
914        # Step C - Prune all inactive tasks
915        #
916        # Once all active tasks are marked, prune the ones we don't need.
917
918        delcount = {}
919        for tid in list(self.runtaskentries.keys()):
920            if tid not in runq_build:
921                delcount[tid] = self.runtaskentries[tid]
922                del self.runtaskentries[tid]
923
924        # Handle --runall
925        if self.cooker.configuration.runall:
926            # re-run the mark_active and then drop unused tasks from new list
927            runq_build = {}
928
929            for task in self.cooker.configuration.runall:
930                if not task.startswith("do_"):
931                    task = "do_{0}".format(task)
932                runall_tids = set()
933                for tid in list(self.runtaskentries):
934                    wanttid = "{0}:{1}".format(fn_from_tid(tid), task)
935                    if wanttid in delcount:
936                        self.runtaskentries[wanttid] = delcount[wanttid]
937                    if wanttid in self.runtaskentries:
938                        runall_tids.add(wanttid)
939
940                for tid in list(runall_tids):
941                    mark_active(tid,1)
942                    if self.cooker.configuration.force:
943                        invalidate_task(tid, False)
944
945            for tid in list(self.runtaskentries.keys()):
946                if tid not in runq_build:
947                    delcount[tid] = self.runtaskentries[tid]
948                    del self.runtaskentries[tid]
949
950            if len(self.runtaskentries) == 0:
951                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets)))
952
953        self.init_progress_reporter.next_stage()
954
955        # Handle runonly
956        if self.cooker.configuration.runonly:
957            # re-run the mark_active and then drop unused tasks from new list
958            runq_build = {}
959
960            for task in self.cooker.configuration.runonly:
961                if not task.startswith("do_"):
962                    task = "do_{0}".format(task)
963                runonly_tids = { k: v for k, v in self.runtaskentries.items() if taskname_from_tid(k) == task }
964
965                for tid in list(runonly_tids):
966                    mark_active(tid,1)
967                    if self.cooker.configuration.force:
968                        invalidate_task(tid, False)
969
970            for tid in list(self.runtaskentries.keys()):
971                if tid not in runq_build:
972                    delcount[tid] = self.runtaskentries[tid]
973                    del self.runtaskentries[tid]
974
975            if len(self.runtaskentries) == 0:
976                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets)))
977
978        #
979        # Step D - Sanity checks and computation
980        #
981
982        # Check to make sure we still have tasks to run
983        if len(self.runtaskentries) == 0:
984            if not taskData[''].abort:
985                bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
986            else:
987                bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
988
989        logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries))
990
991        logger.verbose("Assign Weightings")
992
993        self.init_progress_reporter.next_stage()
994
995        # Generate a list of reverse dependencies to ease future calculations
996        for tid in self.runtaskentries:
997            for dep in self.runtaskentries[tid].depends:
998                self.runtaskentries[dep].revdeps.add(tid)
999
1000        self.init_progress_reporter.next_stage()
1001
1002        # Identify tasks at the end of dependency chains
1003        # Error on circular dependency loops (length two)
1004        endpoints = []
1005        for tid in self.runtaskentries:
1006            revdeps = self.runtaskentries[tid].revdeps
1007            if len(revdeps) == 0:
1008                endpoints.append(tid)
1009            for dep in revdeps:
1010                if dep in self.runtaskentries[tid].depends:
1011                    bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep))
1012
1013
1014        logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
1015
1016        self.init_progress_reporter.next_stage()
1017
1018        # Calculate task weights
1019        # Check of higher length circular dependencies
1020        self.runq_weight = self.calculate_task_weights(endpoints)
1021
1022        self.init_progress_reporter.next_stage()
1023
1024        # Sanity Check - Check for multiple tasks building the same provider
1025        for mc in self.dataCaches:
1026            prov_list = {}
1027            seen_fn = []
1028            for tid in self.runtaskentries:
1029                (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1030                if taskfn in seen_fn:
1031                    continue
1032                if mc != tidmc:
1033                    continue
1034                seen_fn.append(taskfn)
1035                for prov in self.dataCaches[mc].fn_provides[taskfn]:
1036                    if prov not in prov_list:
1037                        prov_list[prov] = [taskfn]
1038                    elif taskfn not in prov_list[prov]:
1039                        prov_list[prov].append(taskfn)
1040            for prov in prov_list:
1041                if len(prov_list[prov]) < 2:
1042                    continue
1043                if prov in self.multi_provider_whitelist:
1044                    continue
1045                seen_pn = []
1046                # If two versions of the same PN are being built its fatal, we don't support it.
1047                for fn in prov_list[prov]:
1048                    pn = self.dataCaches[mc].pkg_fn[fn]
1049                    if pn not in seen_pn:
1050                        seen_pn.append(pn)
1051                    else:
1052                        bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
1053                msg = "Multiple .bb files are due to be built which each provide %s:\n  %s" % (prov, "\n  ".join(prov_list[prov]))
1054                #
1055                # Construct a list of things which uniquely depend on each provider
1056                # since this may help the user figure out which dependency is triggering this warning
1057                #
1058                msg += "\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from."
1059                deplist = {}
1060                commondeps = None
1061                for provfn in prov_list[prov]:
1062                    deps = set()
1063                    for tid in self.runtaskentries:
1064                        fn = fn_from_tid(tid)
1065                        if fn != provfn:
1066                            continue
1067                        for dep in self.runtaskentries[tid].revdeps:
1068                            fn = fn_from_tid(dep)
1069                            if fn == provfn:
1070                                continue
1071                            deps.add(dep)
1072                    if not commondeps:
1073                        commondeps = set(deps)
1074                    else:
1075                        commondeps &= deps
1076                    deplist[provfn] = deps
1077                for provfn in deplist:
1078                    msg += "\n%s has unique dependees:\n  %s" % (provfn, "\n  ".join(deplist[provfn] - commondeps))
1079                #
1080                # Construct a list of provides and runtime providers for each recipe
1081                # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC)
1082                #
1083                msg += "\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful."
1084                provide_results = {}
1085                rprovide_results = {}
1086                commonprovs = None
1087                commonrprovs = None
1088                for provfn in prov_list[prov]:
1089                    provides = set(self.dataCaches[mc].fn_provides[provfn])
1090                    rprovides = set()
1091                    for rprovide in self.dataCaches[mc].rproviders:
1092                        if provfn in self.dataCaches[mc].rproviders[rprovide]:
1093                            rprovides.add(rprovide)
1094                    for package in self.dataCaches[mc].packages:
1095                        if provfn in self.dataCaches[mc].packages[package]:
1096                            rprovides.add(package)
1097                    for package in self.dataCaches[mc].packages_dynamic:
1098                        if provfn in self.dataCaches[mc].packages_dynamic[package]:
1099                            rprovides.add(package)
1100                    if not commonprovs:
1101                        commonprovs = set(provides)
1102                    else:
1103                        commonprovs &= provides
1104                    provide_results[provfn] = provides
1105                    if not commonrprovs:
1106                        commonrprovs = set(rprovides)
1107                    else:
1108                        commonrprovs &= rprovides
1109                    rprovide_results[provfn] = rprovides
1110                #msg += "\nCommon provides:\n  %s" % ("\n  ".join(commonprovs))
1111                #msg += "\nCommon rprovides:\n  %s" % ("\n  ".join(commonrprovs))
1112                for provfn in prov_list[prov]:
1113                    msg += "\n%s has unique provides:\n  %s" % (provfn, "\n  ".join(provide_results[provfn] - commonprovs))
1114                    msg += "\n%s has unique rprovides:\n  %s" % (provfn, "\n  ".join(rprovide_results[provfn] - commonrprovs))
1115
1116                if self.warn_multi_bb:
1117                    logger.verbnote(msg)
1118                else:
1119                    logger.error(msg)
1120
1121        self.init_progress_reporter.next_stage()
1122
1123        # Create a whitelist usable by the stamp checks
1124        self.stampfnwhitelist = {}
1125        for mc in self.taskData:
1126            self.stampfnwhitelist[mc] = []
1127            for entry in self.stampwhitelist.split():
1128                if entry not in self.taskData[mc].build_targets:
1129                    continue
1130                fn = self.taskData.build_targets[entry][0]
1131                self.stampfnwhitelist[mc].append(fn)
1132
1133        self.init_progress_reporter.next_stage()
1134
1135        # Iterate over the task list looking for tasks with a 'setscene' function
1136        self.runq_setscene_tids = set()
1137        if not self.cooker.configuration.nosetscene:
1138            for tid in self.runtaskentries:
1139                (mc, fn, taskname, _) = split_tid_mcfn(tid)
1140                setscenetid = tid + "_setscene"
1141                if setscenetid not in taskData[mc].taskentries:
1142                    continue
1143                self.runq_setscene_tids.add(tid)
1144
1145        self.init_progress_reporter.next_stage()
1146
1147        # Invalidate task if force mode active
1148        if self.cooker.configuration.force:
1149            for tid in self.target_tids:
1150                invalidate_task(tid, False)
1151
1152        # Invalidate task if invalidate mode active
1153        if self.cooker.configuration.invalidate_stamp:
1154            for tid in self.target_tids:
1155                fn = fn_from_tid(tid)
1156                for st in self.cooker.configuration.invalidate_stamp.split(','):
1157                    if not st.startswith("do_"):
1158                        st = "do_%s" % st
1159                    invalidate_task(fn + ":" + st, True)
1160
1161        self.init_progress_reporter.next_stage()
1162
1163        # Create and print to the logs a virtual/xxxx -> PN (fn) table
1164        for mc in taskData:
1165            virtmap = taskData[mc].get_providermap(prefix="virtual/")
1166            virtpnmap = {}
1167            for v in virtmap:
1168                virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]]
1169                bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v]))
1170            if hasattr(bb.parse.siggen, "tasks_resolved"):
1171                bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc])
1172
1173        self.init_progress_reporter.next_stage()
1174
1175        bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids)
1176
1177        # Iterate over the task list and call into the siggen code
1178        dealtwith = set()
1179        todeal = set(self.runtaskentries)
1180        while len(todeal) > 0:
1181            for tid in todeal.copy():
1182                if len(self.runtaskentries[tid].depends - dealtwith) == 0:
1183                    dealtwith.add(tid)
1184                    todeal.remove(tid)
1185                    self.prepare_task_hash(tid)
1186
1187        bb.parse.siggen.writeout_file_checksum_cache()
1188
1189        #self.dump_data()
1190        return len(self.runtaskentries)
1191
1192    def prepare_task_hash(self, tid):
1193        dc = bb.parse.siggen.get_data_caches(self.dataCaches, mc_from_tid(tid))
1194        bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, dc)
1195        self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, dc)
1196        self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
1197
1198    def dump_data(self):
1199        """
1200        Dump some debug information on the internal data structures
1201        """
1202        logger.debug3("run_tasks:")
1203        for tid in self.runtaskentries:
1204            logger.debug3(" %s: %s   Deps %s RevDeps %s", tid,
1205                         self.runtaskentries[tid].weight,
1206                         self.runtaskentries[tid].depends,
1207                         self.runtaskentries[tid].revdeps)
1208
1209class RunQueueWorker():
1210    def __init__(self, process, pipe):
1211        self.process = process
1212        self.pipe = pipe
1213
1214class RunQueue:
1215    def __init__(self, cooker, cfgData, dataCaches, taskData, targets):
1216
1217        self.cooker = cooker
1218        self.cfgData = cfgData
1219        self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets)
1220
1221        self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY") or "perfile"
1222        self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None
1223        self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None
1224
1225        self.state = runQueuePrepare
1226
1227        # For disk space monitor
1228        # Invoked at regular time intervals via the bitbake heartbeat event
1229        # while the build is running. We generate a unique name for the handler
1230        # here, just in case that there ever is more than one RunQueue instance,
1231        # start the handler when reaching runQueueSceneInit, and stop it when
1232        # done with the build.
1233        self.dm = monitordisk.diskMonitor(cfgData)
1234        self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self))
1235        self.dm_event_handler_registered = False
1236        self.rqexe = None
1237        self.worker = {}
1238        self.fakeworker = {}
1239
1240    def _start_worker(self, mc, fakeroot = False, rqexec = None):
1241        logger.debug("Starting bitbake-worker")
1242        magic = "decafbad"
1243        if self.cooker.configuration.profile:
1244            magic = "decafbadbad"
1245        fakerootlogs = None
1246        if fakeroot:
1247            magic = magic + "beef"
1248            mcdata = self.cooker.databuilder.mcdata[mc]
1249            fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD"))
1250            fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split()
1251            env = os.environ.copy()
1252            for key, value in (var.split('=') for var in fakerootenv):
1253                env[key] = value
1254            worker = subprocess.Popen(fakerootcmd + ["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
1255            fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs
1256        else:
1257            worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1258        bb.utils.nonblockingfd(worker.stdout)
1259        workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs)
1260
1261        workerdata = {
1262            "taskdeps" : self.rqdata.dataCaches[mc].task_deps,
1263            "fakerootenv" : self.rqdata.dataCaches[mc].fakerootenv,
1264            "fakerootdirs" : self.rqdata.dataCaches[mc].fakerootdirs,
1265            "fakerootnoenv" : self.rqdata.dataCaches[mc].fakerootnoenv,
1266            "sigdata" : bb.parse.siggen.get_taskdata(),
1267            "logdefaultlevel" : bb.msg.loggerDefaultLogLevel,
1268            "build_verbose_shell" : self.cooker.configuration.build_verbose_shell,
1269            "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout,
1270            "logdefaultdomain" : bb.msg.loggerDefaultDomains,
1271            "prhost" : self.cooker.prhost,
1272            "buildname" : self.cfgData.getVar("BUILDNAME"),
1273            "date" : self.cfgData.getVar("DATE"),
1274            "time" : self.cfgData.getVar("TIME"),
1275            "hashservaddr" : self.cooker.hashservaddr,
1276            "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
1277        }
1278
1279        worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>")
1280        worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>")
1281        worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
1282        worker.stdin.flush()
1283
1284        return RunQueueWorker(worker, workerpipe)
1285
1286    def _teardown_worker(self, worker):
1287        if not worker:
1288            return
1289        logger.debug("Teardown for bitbake-worker")
1290        try:
1291           worker.process.stdin.write(b"<quit></quit>")
1292           worker.process.stdin.flush()
1293           worker.process.stdin.close()
1294        except IOError:
1295           pass
1296        while worker.process.returncode is None:
1297            worker.pipe.read()
1298            worker.process.poll()
1299        while worker.pipe.read():
1300            continue
1301        worker.pipe.close()
1302
1303    def start_worker(self):
1304        if self.worker:
1305            self.teardown_workers()
1306        self.teardown = False
1307        for mc in self.rqdata.dataCaches:
1308            self.worker[mc] = self._start_worker(mc)
1309
1310    def start_fakeworker(self, rqexec, mc):
1311        if not mc in self.fakeworker:
1312            self.fakeworker[mc] = self._start_worker(mc, True, rqexec)
1313
1314    def teardown_workers(self):
1315        self.teardown = True
1316        for mc in self.worker:
1317            self._teardown_worker(self.worker[mc])
1318        self.worker = {}
1319        for mc in self.fakeworker:
1320            self._teardown_worker(self.fakeworker[mc])
1321        self.fakeworker = {}
1322
1323    def read_workers(self):
1324        for mc in self.worker:
1325            self.worker[mc].pipe.read()
1326        for mc in self.fakeworker:
1327            self.fakeworker[mc].pipe.read()
1328
1329    def active_fds(self):
1330        fds = []
1331        for mc in self.worker:
1332            fds.append(self.worker[mc].pipe.input)
1333        for mc in self.fakeworker:
1334            fds.append(self.fakeworker[mc].pipe.input)
1335        return fds
1336
1337    def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None):
1338        def get_timestamp(f):
1339            try:
1340                if not os.access(f, os.F_OK):
1341                    return None
1342                return os.stat(f)[stat.ST_MTIME]
1343            except:
1344                return None
1345
1346        (mc, fn, tn, taskfn) = split_tid_mcfn(tid)
1347        if taskname is None:
1348            taskname = tn
1349
1350        if self.stamppolicy == "perfile":
1351            fulldeptree = False
1352        else:
1353            fulldeptree = True
1354            stampwhitelist = []
1355            if self.stamppolicy == "whitelist":
1356                stampwhitelist = self.rqdata.stampfnwhitelist[mc]
1357
1358        stampfile = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn)
1359
1360        # If the stamp is missing, it's not current
1361        if not os.access(stampfile, os.F_OK):
1362            logger.debug2("Stampfile %s not available", stampfile)
1363            return False
1364        # If it's a 'nostamp' task, it's not current
1365        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1366        if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
1367            logger.debug2("%s.%s is nostamp\n", fn, taskname)
1368            return False
1369
1370        if taskname != "do_setscene" and taskname.endswith("_setscene"):
1371            return True
1372
1373        if cache is None:
1374            cache = {}
1375
1376        iscurrent = True
1377        t1 = get_timestamp(stampfile)
1378        for dep in self.rqdata.runtaskentries[tid].depends:
1379            if iscurrent:
1380                (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep)
1381                stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCaches[mc2], taskfn2)
1382                stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCaches[mc2], taskfn2)
1383                t2 = get_timestamp(stampfile2)
1384                t3 = get_timestamp(stampfile3)
1385                if t3 and not t2:
1386                    continue
1387                if t3 and t3 > t2:
1388                    continue
1389                if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
1390                    if not t2:
1391                        logger.debug2('Stampfile %s does not exist', stampfile2)
1392                        iscurrent = False
1393                        break
1394                    if t1 < t2:
1395                        logger.debug2('Stampfile %s < %s', stampfile, stampfile2)
1396                        iscurrent = False
1397                        break
1398                    if recurse and iscurrent:
1399                        if dep in cache:
1400                            iscurrent = cache[dep]
1401                            if not iscurrent:
1402                                logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1403                        else:
1404                            iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1405                            cache[dep] = iscurrent
1406        if recurse:
1407            cache[tid] = iscurrent
1408        return iscurrent
1409
1410    def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True):
1411        valid = set()
1412        if self.hashvalidate:
1413            sq_data = {}
1414            sq_data['hash'] = {}
1415            sq_data['hashfn'] = {}
1416            sq_data['unihash'] = {}
1417            for tid in tocheck:
1418                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1419                sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash
1420                sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn]
1421                sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash
1422
1423            valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary)
1424
1425        return valid
1426
1427    def validate_hash(self, sq_data, d, siginfo, currentcount, summary):
1428        locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary}
1429
1430        # Metadata has **kwargs so args can be added, sq_data can also gain new fields
1431        call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)"
1432
1433        return bb.utils.better_eval(call, locs)
1434
1435    def _execute_runqueue(self):
1436        """
1437        Run the tasks in a queue prepared by rqdata.prepare()
1438        Upon failure, optionally try to recover the build using any alternate providers
1439        (if the abort on failure configuration option isn't set)
1440        """
1441
1442        retval = True
1443
1444        if self.state is runQueuePrepare:
1445            # NOTE: if you add, remove or significantly refactor the stages of this
1446            # process then you should recalculate the weightings here. This is quite
1447            # easy to do - just change the next line temporarily to pass debug=True as
1448            # the last parameter and you'll get a printout of the weightings as well
1449            # as a map to the lines where next_stage() was called. Of course this isn't
1450            # critical, but it helps to keep the progress reporting accurate.
1451            self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data,
1452                                                            "Initialising tasks",
1453                                                            [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244])
1454            if self.rqdata.prepare() == 0:
1455                self.state = runQueueComplete
1456            else:
1457                self.state = runQueueSceneInit
1458                bb.parse.siggen.save_unitaskhashes()
1459
1460        if self.state is runQueueSceneInit:
1461            self.rqdata.init_progress_reporter.next_stage()
1462
1463            # we are ready to run,  emit dependency info to any UI or class which
1464            # needs it
1465            depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1466            self.rqdata.init_progress_reporter.next_stage()
1467            bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1468
1469            if not self.dm_event_handler_registered:
1470                 res = bb.event.register(self.dm_event_handler_name,
1471                                         lambda x: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False,
1472                                         ('bb.event.HeartbeatEvent',), data=self.cfgData)
1473                 self.dm_event_handler_registered = True
1474
1475            dump = self.cooker.configuration.dump_signatures
1476            if dump:
1477                self.rqdata.init_progress_reporter.finish()
1478                if 'printdiff' in dump:
1479                    invalidtasks = self.print_diffscenetasks()
1480                self.dump_signatures(dump)
1481                if 'printdiff' in dump:
1482                    self.write_diffscenetasks(invalidtasks)
1483                self.state = runQueueComplete
1484
1485        if self.state is runQueueSceneInit:
1486            self.rqdata.init_progress_reporter.next_stage()
1487            self.start_worker()
1488            self.rqdata.init_progress_reporter.next_stage()
1489            self.rqexe = RunQueueExecute(self)
1490
1491            # If we don't have any setscene functions, skip execution
1492            if len(self.rqdata.runq_setscene_tids) == 0:
1493                logger.info('No setscene tasks')
1494                for tid in self.rqdata.runtaskentries:
1495                    if len(self.rqdata.runtaskentries[tid].depends) == 0:
1496                        self.rqexe.setbuildable(tid)
1497                    self.rqexe.tasks_notcovered.add(tid)
1498                self.rqexe.sqdone = True
1499            logger.info('Executing Tasks')
1500            self.state = runQueueRunning
1501
1502        if self.state is runQueueRunning:
1503            retval = self.rqexe.execute()
1504
1505        if self.state is runQueueCleanUp:
1506            retval = self.rqexe.finish()
1507
1508        build_done = self.state is runQueueComplete or self.state is runQueueFailed
1509
1510        if build_done and self.dm_event_handler_registered:
1511            bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData)
1512            self.dm_event_handler_registered = False
1513
1514        if build_done and self.rqexe:
1515            bb.parse.siggen.save_unitaskhashes()
1516            self.teardown_workers()
1517            if self.rqexe:
1518                if self.rqexe.stats.failed:
1519                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1520                else:
1521                    # Let's avoid the word "failed" if nothing actually did
1522                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1523
1524        if self.state is runQueueFailed:
1525            raise bb.runqueue.TaskFailure(self.rqexe.failed_tids)
1526
1527        if self.state is runQueueComplete:
1528            # All done
1529            return False
1530
1531        # Loop
1532        return retval
1533
1534    def execute_runqueue(self):
1535        # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1536        try:
1537            return self._execute_runqueue()
1538        except bb.runqueue.TaskFailure:
1539            raise
1540        except SystemExit:
1541            raise
1542        except bb.BBHandledException:
1543            try:
1544                self.teardown_workers()
1545            except:
1546                pass
1547            self.state = runQueueComplete
1548            raise
1549        except Exception as err:
1550            logger.exception("An uncaught exception occurred in runqueue")
1551            try:
1552                self.teardown_workers()
1553            except:
1554                pass
1555            self.state = runQueueComplete
1556            raise
1557
1558    def finish_runqueue(self, now = False):
1559        if not self.rqexe:
1560            self.state = runQueueComplete
1561            return
1562
1563        if now:
1564            self.rqexe.finish_now()
1565        else:
1566            self.rqexe.finish()
1567
1568    def rq_dump_sigfn(self, fn, options):
1569        bb_cache = bb.cache.NoCache(self.cooker.databuilder)
1570        mc = bb.runqueue.mc_from_tid(fn)
1571        the_data = bb_cache.loadDataFull(fn, self.cooker.collections[mc].get_file_appends(fn))
1572        siggen = bb.parse.siggen
1573        dataCaches = self.rqdata.dataCaches
1574        siggen.dump_sigfn(fn, dataCaches, options)
1575
1576    def dump_signatures(self, options):
1577        fns = set()
1578        bb.note("Reparsing files to collect dependency data")
1579
1580        for tid in self.rqdata.runtaskentries:
1581            fn = fn_from_tid(tid)
1582            fns.add(fn)
1583
1584        max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1)
1585        # We cannot use the real multiprocessing.Pool easily due to some local data
1586        # that can't be pickled. This is a cheap multi-process solution.
1587        launched = []
1588        while fns:
1589            if len(launched) < max_process:
1590                p = Process(target=self.rq_dump_sigfn, args=(fns.pop(), options))
1591                p.start()
1592                launched.append(p)
1593            for q in launched:
1594                # The finished processes are joined when calling is_alive()
1595                if not q.is_alive():
1596                    launched.remove(q)
1597        for p in launched:
1598                p.join()
1599
1600        bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options)
1601
1602        return
1603
1604    def print_diffscenetasks(self):
1605
1606        noexec = []
1607        tocheck = set()
1608
1609        for tid in self.rqdata.runtaskentries:
1610            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1611            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1612
1613            if 'noexec' in taskdep and taskname in taskdep['noexec']:
1614                noexec.append(tid)
1615                continue
1616
1617            tocheck.add(tid)
1618
1619        valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False)
1620
1621        # Tasks which are both setscene and noexec never care about dependencies
1622        # We therefore find tasks which are setscene and noexec and mark their
1623        # unique dependencies as valid.
1624        for tid in noexec:
1625            if tid not in self.rqdata.runq_setscene_tids:
1626                continue
1627            for dep in self.rqdata.runtaskentries[tid].depends:
1628                hasnoexecparents = True
1629                for dep2 in self.rqdata.runtaskentries[dep].revdeps:
1630                    if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec:
1631                        continue
1632                    hasnoexecparents = False
1633                    break
1634                if hasnoexecparents:
1635                    valid_new.add(dep)
1636
1637        invalidtasks = set()
1638        for tid in self.rqdata.runtaskentries:
1639            if tid not in valid_new and tid not in noexec:
1640                invalidtasks.add(tid)
1641
1642        found = set()
1643        processed = set()
1644        for tid in invalidtasks:
1645            toprocess = set([tid])
1646            while toprocess:
1647                next = set()
1648                for t in toprocess:
1649                    for dep in self.rqdata.runtaskentries[t].depends:
1650                        if dep in invalidtasks:
1651                            found.add(tid)
1652                        if dep not in processed:
1653                            processed.add(dep)
1654                            next.add(dep)
1655                toprocess = next
1656                if tid in found:
1657                    toprocess = set()
1658
1659        tasklist = []
1660        for tid in invalidtasks.difference(found):
1661            tasklist.append(tid)
1662
1663        if tasklist:
1664            bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1665
1666        return invalidtasks.difference(found)
1667
1668    def write_diffscenetasks(self, invalidtasks):
1669
1670        # Define recursion callback
1671        def recursecb(key, hash1, hash2):
1672            hashes = [hash1, hash2]
1673            hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1674
1675            recout = []
1676            if len(hashfiles) == 2:
1677                out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb)
1678                recout.extend(list('    ' + l for l in out2))
1679            else:
1680                recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1681
1682            return recout
1683
1684
1685        for tid in invalidtasks:
1686            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1687            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1688            h = self.rqdata.runtaskentries[tid].hash
1689            matches = bb.siggen.find_siginfo(pn, taskname, [], self.cfgData)
1690            match = None
1691            for m in matches:
1692                if h in m:
1693                    match = m
1694            if match is None:
1695                bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h)
1696            matches = {k : v for k, v in iter(matches.items()) if h not in k}
1697            if matches:
1698                latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1]
1699                prevh = __find_sha256__.search(latestmatch).group(0)
1700                output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1701                bb.plain("\nTask %s:%s couldn't be used from the cache because:\n  We need hash %s, closest matching task was %s\n  " % (pn, taskname, h, prevh) + '\n  '.join(output))
1702
1703
1704class RunQueueExecute:
1705
1706    def __init__(self, rq):
1707        self.rq = rq
1708        self.cooker = rq.cooker
1709        self.cfgData = rq.cfgData
1710        self.rqdata = rq.rqdata
1711
1712        self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1)
1713        self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed"
1714
1715        self.sq_buildable = set()
1716        self.sq_running = set()
1717        self.sq_live = set()
1718
1719        self.updated_taskhash_queue = []
1720        self.pending_migrations = set()
1721
1722        self.runq_buildable = set()
1723        self.runq_running = set()
1724        self.runq_complete = set()
1725        self.runq_tasksrun = set()
1726
1727        self.build_stamps = {}
1728        self.build_stamps2 = []
1729        self.failed_tids = []
1730        self.sq_deferred = {}
1731
1732        self.stampcache = {}
1733
1734        self.holdoff_tasks = set()
1735        self.holdoff_need_update = True
1736        self.sqdone = False
1737
1738        self.stats = RunQueueStats(len(self.rqdata.runtaskentries))
1739        self.sq_stats = RunQueueStats(len(self.rqdata.runq_setscene_tids))
1740
1741        for mc in rq.worker:
1742            rq.worker[mc].pipe.setrunqueueexec(self)
1743        for mc in rq.fakeworker:
1744            rq.fakeworker[mc].pipe.setrunqueueexec(self)
1745
1746        if self.number_tasks <= 0:
1747             bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
1748
1749        # List of setscene tasks which we've covered
1750        self.scenequeue_covered = set()
1751        # List of tasks which are covered (including setscene ones)
1752        self.tasks_covered = set()
1753        self.tasks_scenequeue_done = set()
1754        self.scenequeue_notcovered = set()
1755        self.tasks_notcovered = set()
1756        self.scenequeue_notneeded = set()
1757
1758        # We can't skip specified target tasks which aren't setscene tasks
1759        self.cantskip = set(self.rqdata.target_tids)
1760        self.cantskip.difference_update(self.rqdata.runq_setscene_tids)
1761        self.cantskip.intersection_update(self.rqdata.runtaskentries)
1762
1763        schedulers = self.get_schedulers()
1764        for scheduler in schedulers:
1765            if self.scheduler == scheduler.name:
1766                self.sched = scheduler(self, self.rqdata)
1767                logger.debug("Using runqueue scheduler '%s'", scheduler.name)
1768                break
1769        else:
1770            bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
1771                     (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1772
1773        #if len(self.rqdata.runq_setscene_tids) > 0:
1774        self.sqdata = SQData()
1775        build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self)
1776
1777    def runqueue_process_waitpid(self, task, status, fakerootlog=None):
1778
1779        # self.build_stamps[pid] may not exist when use shared work directory.
1780        if task in self.build_stamps:
1781            self.build_stamps2.remove(self.build_stamps[task])
1782            del self.build_stamps[task]
1783
1784        if task in self.sq_live:
1785            if status != 0:
1786                self.sq_task_fail(task, status)
1787            else:
1788                self.sq_task_complete(task)
1789            self.sq_live.remove(task)
1790        else:
1791            if status != 0:
1792                self.task_fail(task, status, fakerootlog=fakerootlog)
1793            else:
1794                self.task_complete(task)
1795        return True
1796
1797    def finish_now(self):
1798        for mc in self.rq.worker:
1799            try:
1800                self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
1801                self.rq.worker[mc].process.stdin.flush()
1802            except IOError:
1803                # worker must have died?
1804                pass
1805        for mc in self.rq.fakeworker:
1806            try:
1807                self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
1808                self.rq.fakeworker[mc].process.stdin.flush()
1809            except IOError:
1810                # worker must have died?
1811                pass
1812
1813        if len(self.failed_tids) != 0:
1814            self.rq.state = runQueueFailed
1815            return
1816
1817        self.rq.state = runQueueComplete
1818        return
1819
1820    def finish(self):
1821        self.rq.state = runQueueCleanUp
1822
1823        active = self.stats.active + self.sq_stats.active
1824        if active > 0:
1825            bb.event.fire(runQueueExitWait(active), self.cfgData)
1826            self.rq.read_workers()
1827            return self.rq.active_fds()
1828
1829        if len(self.failed_tids) != 0:
1830            self.rq.state = runQueueFailed
1831            return True
1832
1833        self.rq.state = runQueueComplete
1834        return True
1835
1836    # Used by setscene only
1837    def check_dependencies(self, task, taskdeps):
1838        if not self.rq.depvalidate:
1839            return False
1840
1841        # Must not edit parent data
1842        taskdeps = set(taskdeps)
1843
1844        taskdata = {}
1845        taskdeps.add(task)
1846        for dep in taskdeps:
1847            (mc, fn, taskname, taskfn) = split_tid_mcfn(dep)
1848            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1849            taskdata[dep] = [pn, taskname, fn]
1850        call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
1851        locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data }
1852        valid = bb.utils.better_eval(call, locs)
1853        return valid
1854
1855    def can_start_task(self):
1856        active = self.stats.active + self.sq_stats.active
1857        can_start = active < self.number_tasks
1858        return can_start
1859
1860    def get_schedulers(self):
1861        schedulers = set(obj for obj in globals().values()
1862                             if type(obj) is type and
1863                                issubclass(obj, RunQueueScheduler))
1864
1865        user_schedulers = self.cfgData.getVar("BB_SCHEDULERS")
1866        if user_schedulers:
1867            for sched in user_schedulers.split():
1868                if not "." in sched:
1869                    bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1870                    continue
1871
1872                modname, name = sched.rsplit(".", 1)
1873                try:
1874                    module = __import__(modname, fromlist=(name,))
1875                except ImportError as exc:
1876                    logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1877                    raise SystemExit(1)
1878                else:
1879                    schedulers.add(getattr(module, name))
1880        return schedulers
1881
1882    def setbuildable(self, task):
1883        self.runq_buildable.add(task)
1884        self.sched.newbuildable(task)
1885
1886    def task_completeoutright(self, task):
1887        """
1888        Mark a task as completed
1889        Look at the reverse dependencies and mark any task with
1890        completed dependencies as buildable
1891        """
1892        self.runq_complete.add(task)
1893        for revdep in self.rqdata.runtaskentries[task].revdeps:
1894            if revdep in self.runq_running:
1895                continue
1896            if revdep in self.runq_buildable:
1897                continue
1898            alldeps = True
1899            for dep in self.rqdata.runtaskentries[revdep].depends:
1900                if dep not in self.runq_complete:
1901                    alldeps = False
1902                    break
1903            if alldeps:
1904                self.setbuildable(revdep)
1905                logger.debug("Marking task %s as buildable", revdep)
1906
1907    def task_complete(self, task):
1908        self.stats.taskCompleted()
1909        bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1910        self.task_completeoutright(task)
1911        self.runq_tasksrun.add(task)
1912
1913    def task_fail(self, task, exitcode, fakerootlog=None):
1914        """
1915        Called when a task has failed
1916        Updates the state engine with the failure
1917        """
1918        self.stats.taskFailed()
1919        self.failed_tids.append(task)
1920
1921        fakeroot_log = ""
1922        if fakerootlog and os.path.exists(fakerootlog):
1923            with open(fakerootlog) as fakeroot_log_file:
1924                fakeroot_failed = False
1925                for line in reversed(fakeroot_log_file.readlines()):
1926                    for fakeroot_error in ['mismatch', 'error', 'fatal']:
1927                        if fakeroot_error in line.lower():
1928                            fakeroot_failed = True
1929                    if 'doing new pid setup and server start' in line:
1930                        break
1931                    fakeroot_log = line + fakeroot_log
1932
1933            if not fakeroot_failed:
1934                fakeroot_log = None
1935
1936        bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=fakeroot_log), self.cfgData)
1937
1938        if self.rqdata.taskData[''].abort:
1939            self.rq.state = runQueueCleanUp
1940
1941    def task_skip(self, task, reason):
1942        self.runq_running.add(task)
1943        self.setbuildable(task)
1944        bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
1945        self.task_completeoutright(task)
1946        self.stats.taskSkipped()
1947        self.stats.taskCompleted()
1948
1949    def summarise_scenequeue_errors(self):
1950        err = False
1951        if not self.sqdone:
1952            logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
1953            completeevent = sceneQueueComplete(self.sq_stats, self.rq)
1954            bb.event.fire(completeevent, self.cfgData)
1955        if self.sq_deferred:
1956            logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred))
1957            err = True
1958        if self.updated_taskhash_queue:
1959            logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue))
1960            err = True
1961        if self.holdoff_tasks:
1962            logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks))
1963            err = True
1964
1965        for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered):
1966            # No task should end up in both covered and uncovered, that is a bug.
1967            logger.error("Setscene task %s in both covered and notcovered." % tid)
1968
1969        for tid in self.rqdata.runq_setscene_tids:
1970            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
1971                err = True
1972                logger.error("Setscene Task %s was never marked as covered or not covered" % tid)
1973            if tid not in self.sq_buildable:
1974                err = True
1975                logger.error("Setscene Task %s was never marked as buildable" % tid)
1976            if tid not in self.sq_running:
1977                err = True
1978                logger.error("Setscene Task %s was never marked as running" % tid)
1979
1980        for x in self.rqdata.runtaskentries:
1981            if x not in self.tasks_covered and x not in self.tasks_notcovered:
1982                logger.error("Task %s was never moved from the setscene queue" % x)
1983                err = True
1984            if x not in self.tasks_scenequeue_done:
1985                logger.error("Task %s was never processed by the setscene code" % x)
1986                err = True
1987            if len(self.rqdata.runtaskentries[x].depends) == 0 and x not in self.runq_buildable:
1988                logger.error("Task %s was never marked as buildable by the setscene code" % x)
1989                err = True
1990        return err
1991
1992
1993    def execute(self):
1994        """
1995        Run the tasks in a queue prepared by prepare_runqueue
1996        """
1997
1998        self.rq.read_workers()
1999        if self.updated_taskhash_queue or self.pending_migrations:
2000            self.process_possible_migrations()
2001
2002        if not hasattr(self, "sorted_setscene_tids"):
2003            # Don't want to sort this set every execution
2004            self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids)
2005
2006        task = None
2007        if not self.sqdone and self.can_start_task():
2008            # Find the next setscene to run
2009            for nexttask in self.sorted_setscene_tids:
2010                if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values():
2011                    if nexttask not in self.sqdata.unskippable and len(self.sqdata.sq_revdeps[nexttask]) > 0 and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]):
2012                        if nexttask not in self.rqdata.target_tids:
2013                            logger.debug2("Skipping setscene for task %s" % nexttask)
2014                            self.sq_task_skip(nexttask)
2015                            self.scenequeue_notneeded.add(nexttask)
2016                            if nexttask in self.sq_deferred:
2017                                del self.sq_deferred[nexttask]
2018                            return True
2019                    # If covered tasks are running, need to wait for them to complete
2020                    for t in self.sqdata.sq_covered_tasks[nexttask]:
2021                        if t in self.runq_running and t not in self.runq_complete:
2022                            continue
2023                    if nexttask in self.sq_deferred:
2024                        if self.sq_deferred[nexttask] not in self.runq_complete:
2025                            continue
2026                        logger.debug("Task %s no longer deferred" % nexttask)
2027                        del self.sq_deferred[nexttask]
2028                        valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False)
2029                        if not valid:
2030                            logger.debug("%s didn't become valid, skipping setscene" % nexttask)
2031                            self.sq_task_failoutright(nexttask)
2032                            return True
2033                    if nexttask in self.sqdata.outrightfail:
2034                        logger.debug2('No package found, so skipping setscene task %s', nexttask)
2035                        self.sq_task_failoutright(nexttask)
2036                        return True
2037                    if nexttask in self.sqdata.unskippable:
2038                        logger.debug2("Setscene task %s is unskippable" % nexttask)
2039                    task = nexttask
2040                    break
2041        if task is not None:
2042            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2043            taskname = taskname + "_setscene"
2044            if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache):
2045                logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task)
2046                self.sq_task_failoutright(task)
2047                return True
2048
2049            if self.cooker.configuration.force:
2050                if task in self.rqdata.target_tids:
2051                    self.sq_task_failoutright(task)
2052                    return True
2053
2054            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2055                logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task)
2056                self.sq_task_skip(task)
2057                return True
2058
2059            if self.cooker.configuration.skipsetscene:
2060                logger.debug2('No setscene tasks should be executed. Skipping %s', task)
2061                self.sq_task_failoutright(task)
2062                return True
2063
2064            startevent = sceneQueueTaskStarted(task, self.sq_stats, self.rq)
2065            bb.event.fire(startevent, self.cfgData)
2066
2067            taskdepdata = self.sq_build_taskdepdata(task)
2068
2069            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2070            taskhash = self.rqdata.get_task_hash(task)
2071            unihash = self.rqdata.get_task_unihash(task)
2072            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
2073                if not mc in self.rq.fakeworker:
2074                    self.rq.start_fakeworker(self, mc)
2075                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
2076                self.rq.fakeworker[mc].process.stdin.flush()
2077            else:
2078                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
2079                self.rq.worker[mc].process.stdin.flush()
2080
2081            self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
2082            self.build_stamps2.append(self.build_stamps[task])
2083            self.sq_running.add(task)
2084            self.sq_live.add(task)
2085            self.sq_stats.taskActive()
2086            if self.can_start_task():
2087                return True
2088
2089        self.update_holdofftasks()
2090
2091        if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks:
2092            hashequiv_logger.verbose("Setscene tasks completed")
2093
2094            err = self.summarise_scenequeue_errors()
2095            if err:
2096                self.rq.state = runQueueFailed
2097                return True
2098
2099            if self.cooker.configuration.setsceneonly:
2100                self.rq.state = runQueueComplete
2101                return True
2102            self.sqdone = True
2103
2104            if self.stats.total == 0:
2105                # nothing to do
2106                self.rq.state = runQueueComplete
2107                return True
2108
2109        if self.cooker.configuration.setsceneonly:
2110            task = None
2111        else:
2112            task = self.sched.next()
2113        if task is not None:
2114            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2115
2116            if self.rqdata.setscenewhitelist is not None:
2117                if self.check_setscenewhitelist(task):
2118                    self.task_fail(task, "setscene whitelist")
2119                    return True
2120
2121            if task in self.tasks_covered:
2122                logger.debug2("Setscene covered task %s", task)
2123                self.task_skip(task, "covered")
2124                return True
2125
2126            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2127                logger.debug2("Stamp current task %s", task)
2128
2129                self.task_skip(task, "existing")
2130                self.runq_tasksrun.add(task)
2131                return True
2132
2133            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2134            if 'noexec' in taskdep and taskname in taskdep['noexec']:
2135                startevent = runQueueTaskStarted(task, self.stats, self.rq,
2136                                                 noexec=True)
2137                bb.event.fire(startevent, self.cfgData)
2138                self.runq_running.add(task)
2139                self.stats.taskActive()
2140                if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2141                    bb.build.make_stamp(taskname, self.rqdata.dataCaches[mc], taskfn)
2142                self.task_complete(task)
2143                return True
2144            else:
2145                startevent = runQueueTaskStarted(task, self.stats, self.rq)
2146                bb.event.fire(startevent, self.cfgData)
2147
2148            taskdepdata = self.build_taskdepdata(task)
2149
2150            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2151            taskhash = self.rqdata.get_task_hash(task)
2152            unihash = self.rqdata.get_task_unihash(task)
2153            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2154                if not mc in self.rq.fakeworker:
2155                    try:
2156                        self.rq.start_fakeworker(self, mc)
2157                    except OSError as exc:
2158                        logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc)))
2159                        self.rq.state = runQueueFailed
2160                        self.stats.taskFailed()
2161                        return True
2162                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
2163                self.rq.fakeworker[mc].process.stdin.flush()
2164            else:
2165                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
2166                self.rq.worker[mc].process.stdin.flush()
2167
2168            self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
2169            self.build_stamps2.append(self.build_stamps[task])
2170            self.runq_running.add(task)
2171            self.stats.taskActive()
2172            if self.can_start_task():
2173                return True
2174
2175        if self.stats.active > 0 or self.sq_stats.active > 0:
2176            self.rq.read_workers()
2177            return self.rq.active_fds()
2178
2179        # No more tasks can be run. If we have deferred setscene tasks we should run them.
2180        if self.sq_deferred:
2181            tid = self.sq_deferred.pop(list(self.sq_deferred.keys())[0])
2182            logger.warning("Runqeueue deadlocked on deferred tasks, forcing task %s" % tid)
2183            self.sq_task_failoutright(tid)
2184            return True
2185
2186        if len(self.failed_tids) != 0:
2187            self.rq.state = runQueueFailed
2188            return True
2189
2190        # Sanity Checks
2191        err = self.summarise_scenequeue_errors()
2192        for task in self.rqdata.runtaskentries:
2193            if task not in self.runq_buildable:
2194                logger.error("Task %s never buildable!", task)
2195                err = True
2196            elif task not in self.runq_running:
2197                logger.error("Task %s never ran!", task)
2198                err = True
2199            elif task not in self.runq_complete:
2200                logger.error("Task %s never completed!", task)
2201                err = True
2202
2203        if err:
2204            self.rq.state = runQueueFailed
2205        else:
2206            self.rq.state = runQueueComplete
2207
2208        return True
2209
2210    def filtermcdeps(self, task, mc, deps):
2211        ret = set()
2212        for dep in deps:
2213            thismc = mc_from_tid(dep)
2214            if thismc != mc:
2215                continue
2216            ret.add(dep)
2217        return ret
2218
2219    # We filter out multiconfig dependencies from taskdepdata we pass to the tasks
2220    # as most code can't handle them
2221    def build_taskdepdata(self, task):
2222        taskdepdata = {}
2223        mc = mc_from_tid(task)
2224        next = self.rqdata.runtaskentries[task].depends.copy()
2225        next.add(task)
2226        next = self.filtermcdeps(task, mc, next)
2227        while next:
2228            additional = []
2229            for revdep in next:
2230                (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2231                pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2232                deps = self.rqdata.runtaskentries[revdep].depends
2233                provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
2234                taskhash = self.rqdata.runtaskentries[revdep].hash
2235                unihash = self.rqdata.runtaskentries[revdep].unihash
2236                deps = self.filtermcdeps(task, mc, deps)
2237                taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash]
2238                for revdep2 in deps:
2239                    if revdep2 not in taskdepdata:
2240                        additional.append(revdep2)
2241            next = additional
2242
2243        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2244        return taskdepdata
2245
2246    def update_holdofftasks(self):
2247
2248        if not self.holdoff_need_update:
2249            return
2250
2251        notcovered = set(self.scenequeue_notcovered)
2252        notcovered |= self.cantskip
2253        for tid in self.scenequeue_notcovered:
2254            notcovered |= self.sqdata.sq_covered_tasks[tid]
2255        notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids)
2256        notcovered.intersection_update(self.tasks_scenequeue_done)
2257
2258        covered = set(self.scenequeue_covered)
2259        for tid in self.scenequeue_covered:
2260            covered |= self.sqdata.sq_covered_tasks[tid]
2261        covered.difference_update(notcovered)
2262        covered.intersection_update(self.tasks_scenequeue_done)
2263
2264        for tid in notcovered | covered:
2265            if len(self.rqdata.runtaskentries[tid].depends) == 0:
2266                self.setbuildable(tid)
2267            elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
2268                 self.setbuildable(tid)
2269
2270        self.tasks_covered = covered
2271        self.tasks_notcovered = notcovered
2272
2273        self.holdoff_tasks = set()
2274
2275        for tid in self.rqdata.runq_setscene_tids:
2276            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2277                self.holdoff_tasks.add(tid)
2278
2279        for tid in self.holdoff_tasks.copy():
2280            for dep in self.sqdata.sq_covered_tasks[tid]:
2281                if dep not in self.runq_complete:
2282                    self.holdoff_tasks.add(dep)
2283
2284        self.holdoff_need_update = False
2285
2286    def process_possible_migrations(self):
2287
2288        changed = set()
2289        toprocess = set()
2290        for tid, unihash in self.updated_taskhash_queue.copy():
2291            if tid in self.runq_running and tid not in self.runq_complete:
2292                continue
2293
2294            self.updated_taskhash_queue.remove((tid, unihash))
2295
2296            if unihash != self.rqdata.runtaskentries[tid].unihash:
2297                # Make sure we rehash any other tasks with the same task hash that we're deferred against.
2298                torehash = [tid]
2299                for deftid in self.sq_deferred:
2300                    if self.sq_deferred[deftid] == tid:
2301                        torehash.append(deftid)
2302                for hashtid in torehash:
2303                    hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash))
2304                    self.rqdata.runtaskentries[hashtid].unihash = unihash
2305                    bb.parse.siggen.set_unihash(hashtid, unihash)
2306                    toprocess.add(hashtid)
2307
2308        # Work out all tasks which depend upon these
2309        total = set()
2310        next = set()
2311        for p in toprocess:
2312            next |= self.rqdata.runtaskentries[p].revdeps
2313        while next:
2314            current = next.copy()
2315            total = total | next
2316            next = set()
2317            for ntid in current:
2318                next |= self.rqdata.runtaskentries[ntid].revdeps
2319            next.difference_update(total)
2320
2321        # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
2322        next = set()
2323        for p in total:
2324            if len(self.rqdata.runtaskentries[p].depends) == 0:
2325                next.add(p)
2326            elif self.rqdata.runtaskentries[p].depends.isdisjoint(total):
2327                next.add(p)
2328
2329        # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled
2330        while next:
2331            current = next.copy()
2332            next = set()
2333            for tid in current:
2334                if len(self.rqdata.runtaskentries[p].depends) and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
2335                    continue
2336                orighash = self.rqdata.runtaskentries[tid].hash
2337                dc = bb.parse.siggen.get_data_caches(self.rqdata.dataCaches, mc_from_tid(tid))
2338                newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, dc)
2339                origuni = self.rqdata.runtaskentries[tid].unihash
2340                newuni = bb.parse.siggen.get_unihash(tid)
2341                # FIXME, need to check it can come from sstate at all for determinism?
2342                remapped = False
2343                if newuni == origuni:
2344                    # Nothing to do, we match, skip code below
2345                    remapped = True
2346                elif tid in self.scenequeue_covered or tid in self.sq_live:
2347                    # Already ran this setscene task or it running. Report the new taskhash
2348                    bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches)
2349                    hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid))
2350                    remapped = True
2351
2352                if not remapped:
2353                    #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni))
2354                    self.rqdata.runtaskentries[tid].hash = newhash
2355                    self.rqdata.runtaskentries[tid].unihash = newuni
2356                    changed.add(tid)
2357
2358                next |= self.rqdata.runtaskentries[tid].revdeps
2359                total.remove(tid)
2360                next.intersection_update(total)
2361
2362        if changed:
2363            for mc in self.rq.worker:
2364                self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
2365            for mc in self.rq.fakeworker:
2366                self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
2367
2368            hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))
2369
2370        for tid in changed:
2371            if tid not in self.rqdata.runq_setscene_tids:
2372                continue
2373            if tid not in self.pending_migrations:
2374                self.pending_migrations.add(tid)
2375
2376        update_tasks = []
2377        for tid in self.pending_migrations.copy():
2378            if tid in self.runq_running or tid in self.sq_live:
2379                # Too late, task already running, not much we can do now
2380                self.pending_migrations.remove(tid)
2381                continue
2382
2383            valid = True
2384            # Check no tasks this covers are running
2385            for dep in self.sqdata.sq_covered_tasks[tid]:
2386                if dep in self.runq_running and dep not in self.runq_complete:
2387                    hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid))
2388                    valid = False
2389                    break
2390            if not valid:
2391                continue
2392
2393            self.pending_migrations.remove(tid)
2394            changed = True
2395
2396            if tid in self.tasks_scenequeue_done:
2397                self.tasks_scenequeue_done.remove(tid)
2398            for dep in self.sqdata.sq_covered_tasks[tid]:
2399                if dep in self.runq_complete and dep not in self.runq_tasksrun:
2400                    bb.error("Task %s marked as completed but now needing to rerun? Aborting build." % dep)
2401                    self.failed_tids.append(tid)
2402                    self.rq.state = runQueueCleanUp
2403                    return
2404
2405                if dep not in self.runq_complete:
2406                    if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable:
2407                        self.tasks_scenequeue_done.remove(dep)
2408
2409            if tid in self.sq_buildable:
2410                self.sq_buildable.remove(tid)
2411            if tid in self.sq_running:
2412                self.sq_running.remove(tid)
2413            harddepfail = False
2414            for t in self.sqdata.sq_harddeps:
2415                if tid in self.sqdata.sq_harddeps[t] and t in self.scenequeue_notcovered:
2416                    harddepfail = True
2417                    break
2418            if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2419                if tid not in self.sq_buildable:
2420                    self.sq_buildable.add(tid)
2421            if len(self.sqdata.sq_revdeps[tid]) == 0:
2422                self.sq_buildable.add(tid)
2423
2424            if tid in self.sqdata.outrightfail:
2425                self.sqdata.outrightfail.remove(tid)
2426            if tid in self.scenequeue_notcovered:
2427                self.scenequeue_notcovered.remove(tid)
2428            if tid in self.scenequeue_covered:
2429                self.scenequeue_covered.remove(tid)
2430            if tid in self.scenequeue_notneeded:
2431                self.scenequeue_notneeded.remove(tid)
2432
2433            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2434            self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True)
2435
2436            if tid in self.stampcache:
2437                del self.stampcache[tid]
2438
2439            if tid in self.build_stamps:
2440                del self.build_stamps[tid]
2441
2442            update_tasks.append((tid, harddepfail, tid in self.sqdata.valid))
2443
2444        if update_tasks:
2445            self.sqdone = False
2446            update_scenequeue_data([t[0] for t in update_tasks], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
2447
2448        for (tid, harddepfail, origvalid) in update_tasks:
2449            if tid in self.sqdata.valid and not origvalid:
2450                hashequiv_logger.verbose("Setscene task %s became valid" % tid)
2451            if harddepfail:
2452                self.sq_task_failoutright(tid)
2453
2454        if changed:
2455            self.holdoff_need_update = True
2456
2457    def scenequeue_updatecounters(self, task, fail=False):
2458
2459        for dep in sorted(self.sqdata.sq_deps[task]):
2460            if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]:
2461                if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered:
2462                    # dependency could be already processed, e.g. noexec setscene task
2463                    continue
2464                noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache)
2465                if noexec or stamppresent:
2466                    continue
2467                logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
2468                self.sq_task_failoutright(dep)
2469                continue
2470            if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2471                if dep not in self.sq_buildable:
2472                    self.sq_buildable.add(dep)
2473
2474        next = set([task])
2475        while next:
2476            new = set()
2477            for t in sorted(next):
2478                self.tasks_scenequeue_done.add(t)
2479                # Look down the dependency chain for non-setscene things which this task depends on
2480                # and mark as 'done'
2481                for dep in self.rqdata.runtaskentries[t].depends:
2482                    if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done:
2483                        continue
2484                    if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done):
2485                        new.add(dep)
2486            next = new
2487
2488        self.holdoff_need_update = True
2489
2490    def sq_task_completeoutright(self, task):
2491        """
2492        Mark a task as completed
2493        Look at the reverse dependencies and mark any task with
2494        completed dependencies as buildable
2495        """
2496
2497        logger.debug('Found task %s which could be accelerated', task)
2498        self.scenequeue_covered.add(task)
2499        self.scenequeue_updatecounters(task)
2500
2501    def sq_check_taskfail(self, task):
2502        if self.rqdata.setscenewhitelist is not None:
2503            realtask = task.split('_setscene')[0]
2504            (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask)
2505            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2506            if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist):
2507                logger.error('Task %s.%s failed' % (pn, taskname + "_setscene"))
2508                self.rq.state = runQueueCleanUp
2509
2510    def sq_task_complete(self, task):
2511        self.sq_stats.taskCompleted()
2512        bb.event.fire(sceneQueueTaskCompleted(task, self.sq_stats, self.rq), self.cfgData)
2513        self.sq_task_completeoutright(task)
2514
2515    def sq_task_fail(self, task, result):
2516        self.sq_stats.taskFailed()
2517        bb.event.fire(sceneQueueTaskFailed(task, self.sq_stats, result, self), self.cfgData)
2518        self.scenequeue_notcovered.add(task)
2519        self.scenequeue_updatecounters(task, True)
2520        self.sq_check_taskfail(task)
2521
2522    def sq_task_failoutright(self, task):
2523        self.sq_running.add(task)
2524        self.sq_buildable.add(task)
2525        self.sq_stats.taskSkipped()
2526        self.sq_stats.taskCompleted()
2527        self.scenequeue_notcovered.add(task)
2528        self.scenequeue_updatecounters(task, True)
2529
2530    def sq_task_skip(self, task):
2531        self.sq_running.add(task)
2532        self.sq_buildable.add(task)
2533        self.sq_task_completeoutright(task)
2534        self.sq_stats.taskSkipped()
2535        self.sq_stats.taskCompleted()
2536
2537    def sq_build_taskdepdata(self, task):
2538        def getsetscenedeps(tid):
2539            deps = set()
2540            (mc, fn, taskname, _) = split_tid_mcfn(tid)
2541            realtid = tid + "_setscene"
2542            idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends
2543            for (depname, idependtask) in idepends:
2544                if depname not in self.rqdata.taskData[mc].build_targets:
2545                    continue
2546
2547                depfn = self.rqdata.taskData[mc].build_targets[depname][0]
2548                if depfn is None:
2549                     continue
2550                deptid = depfn + ":" + idependtask.replace("_setscene", "")
2551                deps.add(deptid)
2552            return deps
2553
2554        taskdepdata = {}
2555        next = getsetscenedeps(task)
2556        next.add(task)
2557        while next:
2558            additional = []
2559            for revdep in next:
2560                (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2561                pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2562                deps = getsetscenedeps(revdep)
2563                provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
2564                taskhash = self.rqdata.runtaskentries[revdep].hash
2565                unihash = self.rqdata.runtaskentries[revdep].unihash
2566                taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash]
2567                for revdep2 in deps:
2568                    if revdep2 not in taskdepdata:
2569                        additional.append(revdep2)
2570            next = additional
2571
2572        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2573        return taskdepdata
2574
2575    def check_setscenewhitelist(self, tid):
2576        # Check task that is going to run against the whitelist
2577        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2578        # Ignore covered tasks
2579        if tid in self.tasks_covered:
2580            return False
2581        # Ignore stamped tasks
2582        if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache):
2583            return False
2584        # Ignore noexec tasks
2585        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2586        if 'noexec' in taskdep and taskname in taskdep['noexec']:
2587            return False
2588
2589        pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2590        if not check_setscene_enforce_whitelist(pn, taskname, self.rqdata.setscenewhitelist):
2591            if tid in self.rqdata.runq_setscene_tids:
2592                msg = 'Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)
2593            else:
2594                msg = 'Task %s.%s attempted to execute unexpectedly' % (pn, taskname)
2595            for t in self.scenequeue_notcovered:
2596                msg = msg + "\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash)
2597            logger.error(msg + '\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered))
2598            return True
2599        return False
2600
2601class SQData(object):
2602    def __init__(self):
2603        # SceneQueue dependencies
2604        self.sq_deps = {}
2605        # SceneQueue reverse dependencies
2606        self.sq_revdeps = {}
2607        # Injected inter-setscene task dependencies
2608        self.sq_harddeps = {}
2609        # Cache of stamp files so duplicates can't run in parallel
2610        self.stamps = {}
2611        # Setscene tasks directly depended upon by the build
2612        self.unskippable = set()
2613        # List of setscene tasks which aren't present
2614        self.outrightfail = set()
2615        # A list of normal tasks a setscene task covers
2616        self.sq_covered_tasks = {}
2617
2618def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
2619
2620    sq_revdeps = {}
2621    sq_revdeps_squash = {}
2622    sq_collated_deps = {}
2623
2624    # We need to construct a dependency graph for the setscene functions. Intermediate
2625    # dependencies between the setscene tasks only complicate the code. This code
2626    # therefore aims to collapse the huge runqueue dependency tree into a smaller one
2627    # only containing the setscene functions.
2628
2629    rqdata.init_progress_reporter.next_stage()
2630
2631    # First process the chains up to the first setscene task.
2632    endpoints = {}
2633    for tid in rqdata.runtaskentries:
2634        sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps)
2635        sq_revdeps_squash[tid] = set()
2636        if (len(sq_revdeps[tid]) == 0) and tid not in rqdata.runq_setscene_tids:
2637            #bb.warn("Added endpoint %s" % (tid))
2638            endpoints[tid] = set()
2639
2640    rqdata.init_progress_reporter.next_stage()
2641
2642    # Secondly process the chains between setscene tasks.
2643    for tid in rqdata.runq_setscene_tids:
2644        sq_collated_deps[tid] = set()
2645        #bb.warn("Added endpoint 2 %s" % (tid))
2646        for dep in rqdata.runtaskentries[tid].depends:
2647                if tid in sq_revdeps[dep]:
2648                    sq_revdeps[dep].remove(tid)
2649                if dep not in endpoints:
2650                    endpoints[dep] = set()
2651                #bb.warn("  Added endpoint 3 %s" % (dep))
2652                endpoints[dep].add(tid)
2653
2654    rqdata.init_progress_reporter.next_stage()
2655
2656    def process_endpoints(endpoints):
2657        newendpoints = {}
2658        for point, task in endpoints.items():
2659            tasks = set()
2660            if task:
2661                tasks |= task
2662            if sq_revdeps_squash[point]:
2663                tasks |= sq_revdeps_squash[point]
2664            if point not in rqdata.runq_setscene_tids:
2665                for t in tasks:
2666                    sq_collated_deps[t].add(point)
2667            sq_revdeps_squash[point] = set()
2668            if point in rqdata.runq_setscene_tids:
2669                sq_revdeps_squash[point] = tasks
2670                tasks = set()
2671                continue
2672            for dep in rqdata.runtaskentries[point].depends:
2673                if point in sq_revdeps[dep]:
2674                    sq_revdeps[dep].remove(point)
2675                if tasks:
2676                    sq_revdeps_squash[dep] |= tasks
2677                if len(sq_revdeps[dep]) == 0 and dep not in rqdata.runq_setscene_tids:
2678                    newendpoints[dep] = task
2679        if len(newendpoints) != 0:
2680            process_endpoints(newendpoints)
2681
2682    process_endpoints(endpoints)
2683
2684    rqdata.init_progress_reporter.next_stage()
2685
2686    # Build a list of tasks which are "unskippable"
2687    # These are direct endpoints referenced by the build upto and including setscene tasks
2688    # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon
2689    new = True
2690    for tid in rqdata.runtaskentries:
2691        if len(rqdata.runtaskentries[tid].revdeps) == 0:
2692            sqdata.unskippable.add(tid)
2693    sqdata.unskippable |= sqrq.cantskip
2694    while new:
2695        new = False
2696        orig = sqdata.unskippable.copy()
2697        for tid in sorted(orig, reverse=True):
2698            if tid in rqdata.runq_setscene_tids:
2699                continue
2700            if len(rqdata.runtaskentries[tid].depends) == 0:
2701                # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
2702                sqrq.setbuildable(tid)
2703            sqdata.unskippable |= rqdata.runtaskentries[tid].depends
2704            if sqdata.unskippable != orig:
2705                new = True
2706
2707    sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids)
2708
2709    rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries))
2710
2711    # Sanity check all dependencies could be changed to setscene task references
2712    for taskcounter, tid in enumerate(rqdata.runtaskentries):
2713        if tid in rqdata.runq_setscene_tids:
2714            pass
2715        elif len(sq_revdeps_squash[tid]) != 0:
2716            bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
2717        else:
2718            del sq_revdeps_squash[tid]
2719        rqdata.init_progress_reporter.update(taskcounter)
2720
2721    rqdata.init_progress_reporter.next_stage()
2722
2723    # Resolve setscene inter-task dependencies
2724    # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
2725    # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
2726    for tid in rqdata.runq_setscene_tids:
2727        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2728        realtid = tid + "_setscene"
2729        idepends = rqdata.taskData[mc].taskentries[realtid].idepends
2730        sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", rqdata.dataCaches[mc], taskfn, noextra=True)
2731        for (depname, idependtask) in idepends:
2732
2733            if depname not in rqdata.taskData[mc].build_targets:
2734                continue
2735
2736            depfn = rqdata.taskData[mc].build_targets[depname][0]
2737            if depfn is None:
2738                continue
2739            deptid = depfn + ":" + idependtask.replace("_setscene", "")
2740            if deptid not in rqdata.runtaskentries:
2741                bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask))
2742
2743            if not deptid in sqdata.sq_harddeps:
2744                sqdata.sq_harddeps[deptid] = set()
2745            sqdata.sq_harddeps[deptid].add(tid)
2746
2747            sq_revdeps_squash[tid].add(deptid)
2748            # Have to zero this to avoid circular dependencies
2749            sq_revdeps_squash[deptid] = set()
2750
2751    rqdata.init_progress_reporter.next_stage()
2752
2753    for task in sqdata.sq_harddeps:
2754        for dep in sqdata.sq_harddeps[task]:
2755            sq_revdeps_squash[dep].add(task)
2756
2757    rqdata.init_progress_reporter.next_stage()
2758
2759    #for tid in sq_revdeps_squash:
2760    #    data = ""
2761    #    for dep in sq_revdeps_squash[tid]:
2762    #        data = data + "\n   %s" % dep
2763    #    bb.warn("Task %s_setscene: is %s " % (tid, data))
2764
2765    sqdata.sq_revdeps = sq_revdeps_squash
2766    sqdata.sq_covered_tasks = sq_collated_deps
2767
2768    # Build reverse version of revdeps to populate deps structure
2769    for tid in sqdata.sq_revdeps:
2770        sqdata.sq_deps[tid] = set()
2771    for tid in sqdata.sq_revdeps:
2772        for dep in sqdata.sq_revdeps[tid]:
2773            sqdata.sq_deps[dep].add(tid)
2774
2775    rqdata.init_progress_reporter.next_stage()
2776
2777    sqdata.multiconfigs = set()
2778    for tid in sqdata.sq_revdeps:
2779        sqdata.multiconfigs.add(mc_from_tid(tid))
2780        if len(sqdata.sq_revdeps[tid]) == 0:
2781            sqrq.sq_buildable.add(tid)
2782
2783    rqdata.init_progress_reporter.finish()
2784
2785    sqdata.noexec = set()
2786    sqdata.stamppresent = set()
2787    sqdata.valid = set()
2788
2789    update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True)
2790
2791    # Compute a list of 'stale' sstate tasks where the current hash does not match the one
2792    # in any stamp files. Pass the list out to metadata as an event.
2793    found = {}
2794    for tid in rqdata.runq_setscene_tids:
2795        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2796        stamps = bb.build.find_stale_stamps(taskname, rqdata.dataCaches[mc], taskfn)
2797        if stamps:
2798            if mc not in found:
2799                found[mc] = {}
2800            found[mc][tid] = stamps
2801    for mc in found:
2802        event = bb.event.StaleSetSceneTasks(found[mc])
2803        bb.event.fire(event, cooker.databuilder.mcdata[mc])
2804
2805def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False):
2806
2807    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2808
2809    taskdep = rqdata.dataCaches[mc].task_deps[taskfn]
2810
2811    if 'noexec' in taskdep and taskname in taskdep['noexec']:
2812        bb.build.make_stamp(taskname + "_setscene", rqdata.dataCaches[mc], taskfn)
2813        return True, False
2814
2815    if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache):
2816        logger.debug2('Setscene stamp current for task %s', tid)
2817        return False, True
2818
2819    if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache):
2820        logger.debug2('Normal stamp current for task %s', tid)
2821        return False, True
2822
2823    return False, False
2824
2825def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True):
2826
2827    tocheck = set()
2828
2829    for tid in sorted(tids):
2830        if tid in sqdata.stamppresent:
2831            sqdata.stamppresent.remove(tid)
2832        if tid in sqdata.valid:
2833            sqdata.valid.remove(tid)
2834        if tid in sqdata.outrightfail:
2835            sqdata.outrightfail.remove(tid)
2836
2837        noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True)
2838
2839        if noexec:
2840            sqdata.noexec.add(tid)
2841            sqrq.sq_task_skip(tid)
2842            continue
2843
2844        if stamppresent:
2845            sqdata.stamppresent.add(tid)
2846            sqrq.sq_task_skip(tid)
2847            continue
2848
2849        tocheck.add(tid)
2850
2851    sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary)
2852
2853    sqdata.hashes = {}
2854    sqrq.sq_deferred = {}
2855    for mc in sorted(sqdata.multiconfigs):
2856        for tid in sorted(sqdata.sq_revdeps):
2857            if mc_from_tid(tid) != mc:
2858                continue
2859            if tid in sqdata.stamppresent:
2860                continue
2861            if tid in sqdata.valid:
2862                continue
2863            if tid in sqdata.noexec:
2864                continue
2865            if tid in sqrq.scenequeue_notcovered:
2866                continue
2867            if tid in sqrq.scenequeue_covered:
2868                continue
2869
2870            h = pending_hash_index(tid, rqdata)
2871            if h not in sqdata.hashes:
2872                if tid in tids:
2873                    sqdata.outrightfail.add(tid)
2874                sqdata.hashes[h] = tid
2875            else:
2876                sqrq.sq_deferred[tid] = sqdata.hashes[h]
2877                bb.note("Deferring %s after %s" % (tid, sqdata.hashes[h]))
2878
2879
2880class TaskFailure(Exception):
2881    """
2882    Exception raised when a task in a runqueue fails
2883    """
2884    def __init__(self, x):
2885        self.args = x
2886
2887
2888class runQueueExitWait(bb.event.Event):
2889    """
2890    Event when waiting for task processes to exit
2891    """
2892
2893    def __init__(self, remain):
2894        self.remain = remain
2895        self.message = "Waiting for %s active tasks to finish" % remain
2896        bb.event.Event.__init__(self)
2897
2898class runQueueEvent(bb.event.Event):
2899    """
2900    Base runQueue event class
2901    """
2902    def __init__(self, task, stats, rq):
2903        self.taskid = task
2904        self.taskstring = task
2905        self.taskname = taskname_from_tid(task)
2906        self.taskfile = fn_from_tid(task)
2907        self.taskhash = rq.rqdata.get_task_hash(task)
2908        self.stats = stats.copy()
2909        bb.event.Event.__init__(self)
2910
2911class sceneQueueEvent(runQueueEvent):
2912    """
2913    Base sceneQueue event class
2914    """
2915    def __init__(self, task, stats, rq, noexec=False):
2916        runQueueEvent.__init__(self, task, stats, rq)
2917        self.taskstring = task + "_setscene"
2918        self.taskname = taskname_from_tid(task) + "_setscene"
2919        self.taskfile = fn_from_tid(task)
2920        self.taskhash = rq.rqdata.get_task_hash(task)
2921
2922class runQueueTaskStarted(runQueueEvent):
2923    """
2924    Event notifying a task was started
2925    """
2926    def __init__(self, task, stats, rq, noexec=False):
2927        runQueueEvent.__init__(self, task, stats, rq)
2928        self.noexec = noexec
2929
2930class sceneQueueTaskStarted(sceneQueueEvent):
2931    """
2932    Event notifying a setscene task was started
2933    """
2934    def __init__(self, task, stats, rq, noexec=False):
2935        sceneQueueEvent.__init__(self, task, stats, rq)
2936        self.noexec = noexec
2937
2938class runQueueTaskFailed(runQueueEvent):
2939    """
2940    Event notifying a task failed
2941    """
2942    def __init__(self, task, stats, exitcode, rq, fakeroot_log=None):
2943        runQueueEvent.__init__(self, task, stats, rq)
2944        self.exitcode = exitcode
2945        self.fakeroot_log = fakeroot_log
2946
2947    def __str__(self):
2948        if self.fakeroot_log:
2949            return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log)
2950        else:
2951            return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode)
2952
2953class sceneQueueTaskFailed(sceneQueueEvent):
2954    """
2955    Event notifying a setscene task failed
2956    """
2957    def __init__(self, task, stats, exitcode, rq):
2958        sceneQueueEvent.__init__(self, task, stats, rq)
2959        self.exitcode = exitcode
2960
2961    def __str__(self):
2962        return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode)
2963
2964class sceneQueueComplete(sceneQueueEvent):
2965    """
2966    Event when all the sceneQueue tasks are complete
2967    """
2968    def __init__(self, stats, rq):
2969        self.stats = stats.copy()
2970        bb.event.Event.__init__(self)
2971
2972class runQueueTaskCompleted(runQueueEvent):
2973    """
2974    Event notifying a task completed
2975    """
2976
2977class sceneQueueTaskCompleted(sceneQueueEvent):
2978    """
2979    Event notifying a setscene task completed
2980    """
2981
2982class runQueueTaskSkipped(runQueueEvent):
2983    """
2984    Event notifying a task was skipped
2985    """
2986    def __init__(self, task, stats, rq, reason):
2987        runQueueEvent.__init__(self, task, stats, rq)
2988        self.reason = reason
2989
2990class taskUniHashUpdate(bb.event.Event):
2991    """
2992    Base runQueue event class
2993    """
2994    def __init__(self, task, unihash):
2995        self.taskid = task
2996        self.unihash = unihash
2997        bb.event.Event.__init__(self)
2998
2999class runQueuePipe():
3000    """
3001    Abstraction for a pipe between a worker thread and the server
3002    """
3003    def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None):
3004        self.input = pipein
3005        if pipeout:
3006            pipeout.close()
3007        bb.utils.nonblockingfd(self.input)
3008        self.queue = b""
3009        self.d = d
3010        self.rq = rq
3011        self.rqexec = rqexec
3012        self.fakerootlogs = fakerootlogs
3013
3014    def setrunqueueexec(self, rqexec):
3015        self.rqexec = rqexec
3016
3017    def read(self):
3018        for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]:
3019            for worker in workers.values():
3020                worker.process.poll()
3021                if worker.process.returncode is not None and not self.rq.teardown:
3022                    bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode)))
3023                    self.rq.finish_runqueue(True)
3024
3025        start = len(self.queue)
3026        try:
3027            self.queue = self.queue + (self.input.read(102400) or b"")
3028        except (OSError, IOError) as e:
3029            if e.errno != errno.EAGAIN:
3030                raise
3031        end = len(self.queue)
3032        found = True
3033        while found and len(self.queue):
3034            found = False
3035            index = self.queue.find(b"</event>")
3036            while index != -1 and self.queue.startswith(b"<event>"):
3037                try:
3038                    event = pickle.loads(self.queue[7:index])
3039                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3040                    if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e):
3041                        # The pickled data could contain "</event>" so search for the next occurance
3042                        # unpickling again, this should be the only way an unpickle error could occur
3043                        index = self.queue.find(b"</event>", index + 1)
3044                        continue
3045                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
3046                bb.event.fire_from_worker(event, self.d)
3047                if isinstance(event, taskUniHashUpdate):
3048                    self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash))
3049                found = True
3050                self.queue = self.queue[index+8:]
3051                index = self.queue.find(b"</event>")
3052            index = self.queue.find(b"</exitcode>")
3053            while index != -1 and self.queue.startswith(b"<exitcode>"):
3054                try:
3055                    task, status = pickle.loads(self.queue[10:index])
3056                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3057                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
3058                (_, _, _, taskfn) = split_tid_mcfn(task)
3059                fakerootlog = None
3060                if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs:
3061                    fakerootlog = self.fakerootlogs[taskfn]
3062                self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog)
3063                found = True
3064                self.queue = self.queue[index+11:]
3065                index = self.queue.find(b"</exitcode>")
3066        return (end > start)
3067
3068    def close(self):
3069        while self.read():
3070            continue
3071        if len(self.queue) > 0:
3072            print("Warning, worker left partial message: %s" % self.queue)
3073        self.input.close()
3074
3075def get_setscene_enforce_whitelist(d, targets):
3076    if d.getVar('BB_SETSCENE_ENFORCE') != '1':
3077        return None
3078    whitelist = (d.getVar("BB_SETSCENE_ENFORCE_WHITELIST") or "").split()
3079    outlist = []
3080    for item in whitelist[:]:
3081        if item.startswith('%:'):
3082            for (mc, target, task, fn) in targets:
3083                outlist.append(target + ':' + item.split(':')[1])
3084        else:
3085            outlist.append(item)
3086    return outlist
3087
3088def check_setscene_enforce_whitelist(pn, taskname, whitelist):
3089    import fnmatch
3090    if whitelist is not None:
3091        item = '%s:%s' % (pn, taskname)
3092        for whitelist_item in whitelist:
3093            if fnmatch.fnmatch(item, whitelist_item):
3094                return True
3095        return False
3096    return True
3097