xref: /openbmc/openbmc/poky/bitbake/lib/bb/runqueue.py (revision 78b72798)
1"""
2BitBake 'RunQueue' implementation
3
4Handles preparation and execution of a queue of tasks
5"""
6
7# Copyright (C) 2006-2007  Richard Purdie
8#
9# SPDX-License-Identifier: GPL-2.0-only
10#
11
12import copy
13import os
14import sys
15import stat
16import errno
17import logging
18import re
19import bb
20from bb import msg, event
21from bb import monitordisk
22import subprocess
23import pickle
24from multiprocessing import Process
25import shlex
26import pprint
27
28bblogger = logging.getLogger("BitBake")
29logger = logging.getLogger("BitBake.RunQueue")
30hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv")
31
32__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' )
33
34def fn_from_tid(tid):
35     return tid.rsplit(":", 1)[0]
36
37def taskname_from_tid(tid):
38    return tid.rsplit(":", 1)[1]
39
40def mc_from_tid(tid):
41    if tid.startswith('mc:') and tid.count(':') >= 2:
42        return tid.split(':')[1]
43    return ""
44
45def split_tid(tid):
46    (mc, fn, taskname, _) = split_tid_mcfn(tid)
47    return (mc, fn, taskname)
48
49def split_mc(n):
50    if n.startswith("mc:") and n.count(':') >= 2:
51        _, mc, n = n.split(":", 2)
52        return (mc, n)
53    return ('', n)
54
55def split_tid_mcfn(tid):
56    if tid.startswith('mc:') and tid.count(':') >= 2:
57        elems = tid.split(':')
58        mc = elems[1]
59        fn = ":".join(elems[2:-1])
60        taskname = elems[-1]
61        mcfn = "mc:" + mc + ":" + fn
62    else:
63        tid = tid.rsplit(":", 1)
64        mc = ""
65        fn = tid[0]
66        taskname = tid[1]
67        mcfn = fn
68
69    return (mc, fn, taskname, mcfn)
70
71def build_tid(mc, fn, taskname):
72    if mc:
73        return "mc:" + mc + ":" + fn + ":" + taskname
74    return fn + ":" + taskname
75
76# Index used to pair up potentially matching multiconfig tasks
77# We match on PN, taskname and hash being equal
78def pending_hash_index(tid, rqdata):
79    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
80    pn = rqdata.dataCaches[mc].pkg_fn[taskfn]
81    h = rqdata.runtaskentries[tid].unihash
82    return pn + ":" + "taskname" + h
83
84class RunQueueStats:
85    """
86    Holds statistics on the tasks handled by the associated runQueue
87    """
88    def __init__(self, total, setscene_total):
89        self.completed = 0
90        self.skipped = 0
91        self.failed = 0
92        self.active = 0
93        self.setscene_active = 0
94        self.setscene_covered = 0
95        self.setscene_notcovered = 0
96        self.setscene_total = setscene_total
97        self.total = total
98
99    def copy(self):
100        obj = self.__class__(self.total, self.setscene_total)
101        obj.__dict__.update(self.__dict__)
102        return obj
103
104    def taskFailed(self):
105        self.active = self.active - 1
106        self.failed = self.failed + 1
107
108    def taskCompleted(self):
109        self.active = self.active - 1
110        self.completed = self.completed + 1
111
112    def taskSkipped(self):
113        self.active = self.active + 1
114        self.skipped = self.skipped + 1
115
116    def taskActive(self):
117        self.active = self.active + 1
118
119    def updateCovered(self, covered, notcovered):
120        self.setscene_covered = covered
121        self.setscene_notcovered = notcovered
122
123    def updateActiveSetscene(self, active):
124        self.setscene_active = active
125
126# These values indicate the next step due to be run in the
127# runQueue state machine
128runQueuePrepare = 2
129runQueueSceneInit = 3
130runQueueRunning = 6
131runQueueFailed = 7
132runQueueCleanUp = 8
133runQueueComplete = 9
134
135class RunQueueScheduler(object):
136    """
137    Control the order tasks are scheduled in.
138    """
139    name = "basic"
140
141    def __init__(self, runqueue, rqdata):
142        """
143        The default scheduler just returns the first buildable task (the
144        priority map is sorted by task number)
145        """
146        self.rq = runqueue
147        self.rqdata = rqdata
148        self.numTasks = len(self.rqdata.runtaskentries)
149
150        self.prio_map = [self.rqdata.runtaskentries.keys()]
151
152        self.buildable = set()
153        self.skip_maxthread = {}
154        self.stamps = {}
155        for tid in self.rqdata.runtaskentries:
156            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
157            self.stamps[tid] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
158            if tid in self.rq.runq_buildable:
159                self.buildable.append(tid)
160
161        self.rev_prio_map = None
162
163    def next_buildable_task(self):
164        """
165        Return the id of the first task we find that is buildable
166        """
167        # Once tasks are running we don't need to worry about them again
168        self.buildable.difference_update(self.rq.runq_running)
169        buildable = set(self.buildable)
170        buildable.difference_update(self.rq.holdoff_tasks)
171        buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered)
172        if not buildable:
173            return None
174
175        # Filter out tasks that have a max number of threads that have been exceeded
176        skip_buildable = {}
177        for running in self.rq.runq_running.difference(self.rq.runq_complete):
178            rtaskname = taskname_from_tid(running)
179            if rtaskname not in self.skip_maxthread:
180                self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads")
181            if not self.skip_maxthread[rtaskname]:
182                continue
183            if rtaskname in skip_buildable:
184                skip_buildable[rtaskname] += 1
185            else:
186                skip_buildable[rtaskname] = 1
187
188        if len(buildable) == 1:
189            tid = buildable.pop()
190            taskname = taskname_from_tid(tid)
191            if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
192                return None
193            stamp = self.stamps[tid]
194            if stamp not in self.rq.build_stamps.values():
195                return tid
196
197        if not self.rev_prio_map:
198            self.rev_prio_map = {}
199            for tid in self.rqdata.runtaskentries:
200                self.rev_prio_map[tid] = self.prio_map.index(tid)
201
202        best = None
203        bestprio = None
204        for tid in buildable:
205            taskname = taskname_from_tid(tid)
206            if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
207                continue
208            prio = self.rev_prio_map[tid]
209            if bestprio is None or bestprio > prio:
210                stamp = self.stamps[tid]
211                if stamp in self.rq.build_stamps.values():
212                    continue
213                bestprio = prio
214                best = tid
215
216        return best
217
218    def next(self):
219        """
220        Return the id of the task we should build next
221        """
222        if self.rq.can_start_task():
223            return self.next_buildable_task()
224
225    def newbuildable(self, task):
226        self.buildable.add(task)
227
228    def removebuildable(self, task):
229        self.buildable.remove(task)
230
231    def describe_task(self, taskid):
232        result = 'ID %s' % taskid
233        if self.rev_prio_map:
234            result = result + (' pri %d' % self.rev_prio_map[taskid])
235        return result
236
237    def dump_prio(self, comment):
238        bb.debug(3, '%s (most important first):\n%s' %
239                 (comment,
240                  '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for
241                             index, taskid in enumerate(self.prio_map)])))
242
243class RunQueueSchedulerSpeed(RunQueueScheduler):
244    """
245    A scheduler optimised for speed. The priority map is sorted by task weight,
246    heavier weighted tasks (tasks needed by the most other tasks) are run first.
247    """
248    name = "speed"
249
250    def __init__(self, runqueue, rqdata):
251        """
252        The priority map is sorted by task weight.
253        """
254        RunQueueScheduler.__init__(self, runqueue, rqdata)
255
256        weights = {}
257        for tid in self.rqdata.runtaskentries:
258            weight = self.rqdata.runtaskentries[tid].weight
259            if not weight in weights:
260                weights[weight] = []
261            weights[weight].append(tid)
262
263        self.prio_map = []
264        for weight in sorted(weights):
265            for w in weights[weight]:
266                self.prio_map.append(w)
267
268        self.prio_map.reverse()
269
270class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
271    """
272    A scheduler optimised to complete .bb files as quickly as possible. The
273    priority map is sorted by task weight, but then reordered so once a given
274    .bb file starts to build, it's completed as quickly as possible by
275    running all tasks related to the same .bb file one after the after.
276    This works well where disk space is at a premium and classes like OE's
277    rm_work are in force.
278    """
279    name = "completion"
280
281    def __init__(self, runqueue, rqdata):
282        super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata)
283
284        # Extract list of tasks for each recipe, with tasks sorted
285        # ascending from "must run first" (typically do_fetch) to
286        # "runs last" (do_build). The speed scheduler prioritizes
287        # tasks that must run first before the ones that run later;
288        # this is what we depend on here.
289        task_lists = {}
290        for taskid in self.prio_map:
291            fn, taskname = taskid.rsplit(':', 1)
292            task_lists.setdefault(fn, []).append(taskname)
293
294        # Now unify the different task lists. The strategy is that
295        # common tasks get skipped and new ones get inserted after the
296        # preceeding common one(s) as they are found. Because task
297        # lists should differ only by their number of tasks, but not
298        # the ordering of the common tasks, this should result in a
299        # deterministic result that is a superset of the individual
300        # task ordering.
301        all_tasks = []
302        for recipe, new_tasks in task_lists.items():
303            index = 0
304            old_task = all_tasks[index] if index < len(all_tasks) else None
305            for new_task in new_tasks:
306                if old_task == new_task:
307                    # Common task, skip it. This is the fast-path which
308                    # avoids a full search.
309                    index += 1
310                    old_task = all_tasks[index] if index < len(all_tasks) else None
311                else:
312                    try:
313                        index = all_tasks.index(new_task)
314                        # Already present, just not at the current
315                        # place. We re-synchronized by changing the
316                        # index so that it matches again. Now
317                        # move on to the next existing task.
318                        index += 1
319                        old_task = all_tasks[index] if index < len(all_tasks) else None
320                    except ValueError:
321                        # Not present. Insert before old_task, which
322                        # remains the same (but gets shifted back).
323                        all_tasks.insert(index, new_task)
324                        index += 1
325        bb.debug(3, 'merged task list: %s'  % all_tasks)
326
327        # Now reverse the order so that tasks that finish the work on one
328        # recipe are considered more imporant (= come first). The ordering
329        # is now so that do_build is most important.
330        all_tasks.reverse()
331
332        # Group tasks of the same kind before tasks of less important
333        # kinds at the head of the queue (because earlier = lower
334        # priority number = runs earlier), while preserving the
335        # ordering by recipe. If recipe foo is more important than
336        # bar, then the goal is to work on foo's do_populate_sysroot
337        # before bar's do_populate_sysroot and on the more important
338        # tasks of foo before any of the less important tasks in any
339        # other recipe (if those other recipes are more important than
340        # foo).
341        #
342        # All of this only applies when tasks are runable. Explicit
343        # dependencies still override this ordering by priority.
344        #
345        # Here's an example why this priority re-ordering helps with
346        # minimizing disk usage. Consider a recipe foo with a higher
347        # priority than bar where foo DEPENDS on bar. Then the
348        # implicit rule (from base.bbclass) is that foo's do_configure
349        # depends on bar's do_populate_sysroot. This ensures that
350        # bar's do_populate_sysroot gets done first. Normally the
351        # tasks from foo would continue to run once that is done, and
352        # bar only gets completed and cleaned up later. By ordering
353        # bar's task that depend on bar's do_populate_sysroot before foo's
354        # do_configure, that problem gets avoided.
355        task_index = 0
356        self.dump_prio('original priorities')
357        for task in all_tasks:
358            for index in range(task_index, self.numTasks):
359                taskid = self.prio_map[index]
360                taskname = taskid.rsplit(':', 1)[1]
361                if taskname == task:
362                    del self.prio_map[index]
363                    self.prio_map.insert(task_index, taskid)
364                    task_index += 1
365        self.dump_prio('completion priorities')
366
367class RunTaskEntry(object):
368    def __init__(self):
369        self.depends = set()
370        self.revdeps = set()
371        self.hash = None
372        self.unihash = None
373        self.task = None
374        self.weight = 1
375
376class RunQueueData:
377    """
378    BitBake Run Queue implementation
379    """
380    def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets):
381        self.cooker = cooker
382        self.dataCaches = dataCaches
383        self.taskData = taskData
384        self.targets = targets
385        self.rq = rq
386        self.warn_multi_bb = False
387
388        self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split()
389        self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets)
390        self.setscene_ignore_tasks_checked = False
391        self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1")
392        self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter()
393
394        self.reset()
395
396    def reset(self):
397        self.runtaskentries = {}
398
399    def runq_depends_names(self, ids):
400        import re
401        ret = []
402        for id in ids:
403            nam = os.path.basename(id)
404            nam = re.sub("_[^,]*,", ",", nam)
405            ret.extend([nam])
406        return ret
407
408    def get_task_hash(self, tid):
409        return self.runtaskentries[tid].hash
410
411    def get_task_unihash(self, tid):
412        return self.runtaskentries[tid].unihash
413
414    def get_user_idstring(self, tid, task_name_suffix = ""):
415        return tid + task_name_suffix
416
417    def get_short_user_idstring(self, task, task_name_suffix = ""):
418        (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
419        pn = self.dataCaches[mc].pkg_fn[taskfn]
420        taskname = taskname_from_tid(task) + task_name_suffix
421        return "%s:%s" % (pn, taskname)
422
423    def circular_depchains_handler(self, tasks):
424        """
425        Some tasks aren't buildable, likely due to circular dependency issues.
426        Identify the circular dependencies and print them in a user readable format.
427        """
428        from copy import deepcopy
429
430        valid_chains = []
431        explored_deps = {}
432        msgs = []
433
434        class TooManyLoops(Exception):
435            pass
436
437        def chain_reorder(chain):
438            """
439            Reorder a dependency chain so the lowest task id is first
440            """
441            lowest = 0
442            new_chain = []
443            for entry in range(len(chain)):
444                if chain[entry] < chain[lowest]:
445                    lowest = entry
446            new_chain.extend(chain[lowest:])
447            new_chain.extend(chain[:lowest])
448            return new_chain
449
450        def chain_compare_equal(chain1, chain2):
451            """
452            Compare two dependency chains and see if they're the same
453            """
454            if len(chain1) != len(chain2):
455                return False
456            for index in range(len(chain1)):
457                if chain1[index] != chain2[index]:
458                    return False
459            return True
460
461        def chain_array_contains(chain, chain_array):
462            """
463            Return True if chain_array contains chain
464            """
465            for ch in chain_array:
466                if chain_compare_equal(ch, chain):
467                    return True
468            return False
469
470        def find_chains(tid, prev_chain):
471            prev_chain.append(tid)
472            total_deps = []
473            total_deps.extend(self.runtaskentries[tid].revdeps)
474            for revdep in self.runtaskentries[tid].revdeps:
475                if revdep in prev_chain:
476                    idx = prev_chain.index(revdep)
477                    # To prevent duplicates, reorder the chain to start with the lowest taskid
478                    # and search through an array of those we've already printed
479                    chain = prev_chain[idx:]
480                    new_chain = chain_reorder(chain)
481                    if not chain_array_contains(new_chain, valid_chains):
482                        valid_chains.append(new_chain)
483                        msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
484                        for dep in new_chain:
485                            msgs.append("  Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends)))
486                        msgs.append("\n")
487                    if len(valid_chains) > 10:
488                        msgs.append("Halted dependency loops search after 10 matches.\n")
489                        raise TooManyLoops
490                    continue
491                scan = False
492                if revdep not in explored_deps:
493                    scan = True
494                elif revdep in explored_deps[revdep]:
495                    scan = True
496                else:
497                    for dep in prev_chain:
498                        if dep in explored_deps[revdep]:
499                            scan = True
500                if scan:
501                    find_chains(revdep, copy.deepcopy(prev_chain))
502                for dep in explored_deps[revdep]:
503                    if dep not in total_deps:
504                        total_deps.append(dep)
505
506            explored_deps[tid] = total_deps
507
508        try:
509            for task in tasks:
510                find_chains(task, [])
511        except TooManyLoops:
512            pass
513
514        return msgs
515
516    def calculate_task_weights(self, endpoints):
517        """
518        Calculate a number representing the "weight" of each task. Heavier weighted tasks
519        have more dependencies and hence should be executed sooner for maximum speed.
520
521        This function also sanity checks the task list finding tasks that are not
522        possible to execute due to circular dependencies.
523        """
524
525        numTasks = len(self.runtaskentries)
526        weight = {}
527        deps_left = {}
528        task_done = {}
529
530        for tid in self.runtaskentries:
531            task_done[tid] = False
532            weight[tid] = 1
533            deps_left[tid] = len(self.runtaskentries[tid].revdeps)
534
535        for tid in endpoints:
536            weight[tid] = 10
537            task_done[tid] = True
538
539        while True:
540            next_points = []
541            for tid in endpoints:
542                for revdep in self.runtaskentries[tid].depends:
543                    weight[revdep] = weight[revdep] + weight[tid]
544                    deps_left[revdep] = deps_left[revdep] - 1
545                    if deps_left[revdep] == 0:
546                        next_points.append(revdep)
547                        task_done[revdep] = True
548            endpoints = next_points
549            if not next_points:
550                break
551
552        # Circular dependency sanity check
553        problem_tasks = []
554        for tid in self.runtaskentries:
555            if task_done[tid] is False or deps_left[tid] != 0:
556                problem_tasks.append(tid)
557                logger.debug2("Task %s is not buildable", tid)
558                logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid])
559            self.runtaskentries[tid].weight = weight[tid]
560
561        if problem_tasks:
562            message = "%s unbuildable tasks were found.\n" % len(problem_tasks)
563            message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
564            message = message + "Identifying dependency loops (this may take a short while)...\n"
565            logger.error(message)
566
567            msgs = self.circular_depchains_handler(problem_tasks)
568
569            message = "\n"
570            for msg in msgs:
571                message = message + msg
572            bb.msg.fatal("RunQueue", message)
573
574        return weight
575
576    def prepare(self):
577        """
578        Turn a set of taskData into a RunQueue and compute data needed
579        to optimise the execution order.
580        """
581
582        runq_build = {}
583        recursivetasks = {}
584        recursiveitasks = {}
585        recursivetasksselfref = set()
586
587        taskData = self.taskData
588
589        found = False
590        for mc in self.taskData:
591            if taskData[mc].taskentries:
592                found = True
593                break
594        if not found:
595            # Nothing to do
596            return 0
597
598        self.init_progress_reporter.start()
599        self.init_progress_reporter.next_stage()
600
601        # Step A - Work out a list of tasks to run
602        #
603        # Taskdata gives us a list of possible providers for every build and run
604        # target ordered by priority. It also gives information on each of those
605        # providers.
606        #
607        # To create the actual list of tasks to execute we fix the list of
608        # providers and then resolve the dependencies into task IDs. This
609        # process is repeated for each type of dependency (tdepends, deptask,
610        # rdeptast, recrdeptask, idepends).
611
612        def add_build_dependencies(depids, tasknames, depends, mc):
613            for depname in depids:
614                # Won't be in build_targets if ASSUME_PROVIDED
615                if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]:
616                    continue
617                depdata = taskData[mc].build_targets[depname][0]
618                if depdata is None:
619                    continue
620                for taskname in tasknames:
621                    t = depdata + ":" + taskname
622                    if t in taskData[mc].taskentries:
623                        depends.add(t)
624
625        def add_runtime_dependencies(depids, tasknames, depends, mc):
626            for depname in depids:
627                if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]:
628                    continue
629                depdata = taskData[mc].run_targets[depname][0]
630                if depdata is None:
631                    continue
632                for taskname in tasknames:
633                    t = depdata + ":" + taskname
634                    if t in taskData[mc].taskentries:
635                        depends.add(t)
636
637        def add_mc_dependencies(mc, tid):
638            mcdeps = taskData[mc].get_mcdepends()
639            for dep in mcdeps:
640                mcdependency = dep.split(':')
641                pn = mcdependency[3]
642                frommc = mcdependency[1]
643                mcdep = mcdependency[2]
644                deptask = mcdependency[4]
645                if mc == frommc:
646                    fn = taskData[mcdep].build_targets[pn][0]
647                    newdep = '%s:%s' % (fn,deptask)
648                    taskData[mc].taskentries[tid].tdepends.append(newdep)
649
650        for mc in taskData:
651            for tid in taskData[mc].taskentries:
652
653                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
654                #runtid = build_tid(mc, fn, taskname)
655
656                #logger.debug2("Processing %s,%s:%s", mc, fn, taskname)
657
658                depends = set()
659                task_deps = self.dataCaches[mc].task_deps[taskfn]
660
661                self.runtaskentries[tid] = RunTaskEntry()
662
663                if fn in taskData[mc].failed_fns:
664                    continue
665
666                # We add multiconfig dependencies before processing internal task deps (tdepends)
667                if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']:
668                    add_mc_dependencies(mc, tid)
669
670                # Resolve task internal dependencies
671                #
672                # e.g. addtask before X after Y
673                for t in taskData[mc].taskentries[tid].tdepends:
674                    (depmc, depfn, deptaskname, _) = split_tid_mcfn(t)
675                    depends.add(build_tid(depmc, depfn, deptaskname))
676
677                # Resolve 'deptask' dependencies
678                #
679                # e.g. do_sometask[deptask] = "do_someothertask"
680                # (makes sure sometask runs after someothertask of all DEPENDS)
681                if 'deptask' in task_deps and taskname in task_deps['deptask']:
682                    tasknames = task_deps['deptask'][taskname].split()
683                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
684
685                # Resolve 'rdeptask' dependencies
686                #
687                # e.g. do_sometask[rdeptask] = "do_someothertask"
688                # (makes sure sometask runs after someothertask of all RDEPENDS)
689                if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']:
690                    tasknames = task_deps['rdeptask'][taskname].split()
691                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
692
693                # Resolve inter-task dependencies
694                #
695                # e.g. do_sometask[depends] = "targetname:do_someothertask"
696                # (makes sure sometask runs after targetname's someothertask)
697                idepends = taskData[mc].taskentries[tid].idepends
698                for (depname, idependtask) in idepends:
699                    if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps:
700                        # Won't be in build_targets if ASSUME_PROVIDED
701                        depdata = taskData[mc].build_targets[depname][0]
702                        if depdata is not None:
703                            t = depdata + ":" + idependtask
704                            depends.add(t)
705                            if t not in taskData[mc].taskentries:
706                                bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
707                irdepends = taskData[mc].taskentries[tid].irdepends
708                for (depname, idependtask) in irdepends:
709                    if depname in taskData[mc].run_targets:
710                        # Won't be in run_targets if ASSUME_PROVIDED
711                        if not taskData[mc].run_targets[depname]:
712                            continue
713                        depdata = taskData[mc].run_targets[depname][0]
714                        if depdata is not None:
715                            t = depdata + ":" + idependtask
716                            depends.add(t)
717                            if t not in taskData[mc].taskentries:
718                                bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
719
720                # Resolve recursive 'recrdeptask' dependencies (Part A)
721                #
722                # e.g. do_sometask[recrdeptask] = "do_someothertask"
723                # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
724                # We cover the recursive part of the dependencies below
725                if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']:
726                    tasknames = task_deps['recrdeptask'][taskname].split()
727                    recursivetasks[tid] = tasknames
728                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
729                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
730                    if taskname in tasknames:
731                        recursivetasksselfref.add(tid)
732
733                    if 'recideptask' in task_deps and taskname in task_deps['recideptask']:
734                        recursiveitasks[tid] = []
735                        for t in task_deps['recideptask'][taskname].split():
736                            newdep = build_tid(mc, fn, t)
737                            recursiveitasks[tid].append(newdep)
738
739                self.runtaskentries[tid].depends = depends
740                # Remove all self references
741                self.runtaskentries[tid].depends.discard(tid)
742
743        #self.dump_data()
744
745        self.init_progress_reporter.next_stage()
746
747        # Resolve recursive 'recrdeptask' dependencies (Part B)
748        #
749        # e.g. do_sometask[recrdeptask] = "do_someothertask"
750        # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
751        # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed
752
753        # Generating/interating recursive lists of dependencies is painful and potentially slow
754        # Precompute recursive task dependencies here by:
755        #     a) create a temp list of reverse dependencies (revdeps)
756        #     b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0)
757        #     c) combine the total list of dependencies in cumulativedeps
758        #     d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower)
759
760
761        revdeps = {}
762        deps = {}
763        cumulativedeps = {}
764        for tid in self.runtaskentries:
765            deps[tid] = set(self.runtaskentries[tid].depends)
766            revdeps[tid] = set()
767            cumulativedeps[tid] = set()
768        # Generate a temp list of reverse dependencies
769        for tid in self.runtaskentries:
770            for dep in self.runtaskentries[tid].depends:
771                revdeps[dep].add(tid)
772        # Find the dependency chain endpoints
773        endpoints = set()
774        for tid in self.runtaskentries:
775            if not deps[tid]:
776                endpoints.add(tid)
777        # Iterate the chains collating dependencies
778        while endpoints:
779            next = set()
780            for tid in endpoints:
781                for dep in revdeps[tid]:
782                    cumulativedeps[dep].add(fn_from_tid(tid))
783                    cumulativedeps[dep].update(cumulativedeps[tid])
784                    if tid in deps[dep]:
785                        deps[dep].remove(tid)
786                    if not deps[dep]:
787                        next.add(dep)
788            endpoints = next
789        #for tid in deps:
790        #    if deps[tid]:
791        #        bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid]))
792
793        # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to
794        # resolve these recursively until we aren't adding any further extra dependencies
795        extradeps = True
796        while extradeps:
797            extradeps = 0
798            for tid in recursivetasks:
799                tasknames = recursivetasks[tid]
800
801                totaldeps = set(self.runtaskentries[tid].depends)
802                if tid in recursiveitasks:
803                    totaldeps.update(recursiveitasks[tid])
804                    for dep in recursiveitasks[tid]:
805                        if dep not in self.runtaskentries:
806                            continue
807                        totaldeps.update(self.runtaskentries[dep].depends)
808
809                deps = set()
810                for dep in totaldeps:
811                    if dep in cumulativedeps:
812                        deps.update(cumulativedeps[dep])
813
814                for t in deps:
815                    for taskname in tasknames:
816                        newtid = t + ":" + taskname
817                        if newtid == tid:
818                            continue
819                        if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends:
820                            extradeps += 1
821                            self.runtaskentries[tid].depends.add(newtid)
822
823                # Handle recursive tasks which depend upon other recursive tasks
824                deps = set()
825                for dep in self.runtaskentries[tid].depends.intersection(recursivetasks):
826                    deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends))
827                for newtid in deps:
828                    for taskname in tasknames:
829                        if not newtid.endswith(":" + taskname):
830                            continue
831                        if newtid in self.runtaskentries:
832                            extradeps += 1
833                            self.runtaskentries[tid].depends.add(newtid)
834
835            bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps)
836
837        # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work
838        for tid in recursivetasksselfref:
839            self.runtaskentries[tid].depends.difference_update(recursivetasksselfref)
840
841        self.init_progress_reporter.next_stage()
842
843        #self.dump_data()
844
845        # Step B - Mark all active tasks
846        #
847        # Start with the tasks we were asked to run and mark all dependencies
848        # as active too. If the task is to be 'forced', clear its stamp. Once
849        # all active tasks are marked, prune the ones we don't need.
850
851        logger.verbose("Marking Active Tasks")
852
853        def mark_active(tid, depth):
854            """
855            Mark an item as active along with its depends
856            (calls itself recursively)
857            """
858
859            if tid in runq_build:
860                return
861
862            runq_build[tid] = 1
863
864            depends = self.runtaskentries[tid].depends
865            for depend in depends:
866                mark_active(depend, depth+1)
867
868        def invalidate_task(tid, error_nostamp):
869            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
870            taskdep = self.dataCaches[mc].task_deps[taskfn]
871            if fn + ":" + taskname not in taskData[mc].taskentries:
872                logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname)
873            if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
874                if error_nostamp:
875                    bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
876                else:
877                    bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
878            else:
879                logger.verbose("Invalidate task %s, %s", taskname, fn)
880                bb.parse.siggen.invalidate_task(taskname, self.dataCaches[mc], taskfn)
881
882        self.target_tids = []
883        for (mc, target, task, fn) in self.targets:
884
885            if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]:
886                continue
887
888            if target in taskData[mc].failed_deps:
889                continue
890
891            parents = False
892            if task.endswith('-'):
893                parents = True
894                task = task[:-1]
895
896            if fn in taskData[mc].failed_fns:
897                continue
898
899            # fn already has mc prefix
900            tid = fn + ":" + task
901            self.target_tids.append(tid)
902            if tid not in taskData[mc].taskentries:
903                import difflib
904                tasks = []
905                for x in taskData[mc].taskentries:
906                    if x.startswith(fn + ":"):
907                        tasks.append(taskname_from_tid(x))
908                close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7)
909                if close_matches:
910                    extra = ". Close matches:\n  %s" % "\n  ".join(close_matches)
911                else:
912                    extra = ""
913                bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra))
914
915            # For tasks called "XXXX-", ony run their dependencies
916            if parents:
917                for i in self.runtaskentries[tid].depends:
918                    mark_active(i, 1)
919            else:
920                mark_active(tid, 1)
921
922        self.init_progress_reporter.next_stage()
923
924        # Step C - Prune all inactive tasks
925        #
926        # Once all active tasks are marked, prune the ones we don't need.
927
928        # Handle --runall
929        if self.cooker.configuration.runall:
930            # re-run the mark_active and then drop unused tasks from new list
931            reduced_tasklist = set(self.runtaskentries.keys())
932            for tid in list(self.runtaskentries.keys()):
933                if tid not in runq_build:
934                   reduced_tasklist.remove(tid)
935            runq_build = {}
936
937            for task in self.cooker.configuration.runall:
938                if not task.startswith("do_"):
939                    task = "do_{0}".format(task)
940                runall_tids = set()
941                for tid in reduced_tasklist:
942                    wanttid = "{0}:{1}".format(fn_from_tid(tid), task)
943                    if wanttid in self.runtaskentries:
944                        runall_tids.add(wanttid)
945
946                for tid in list(runall_tids):
947                    mark_active(tid, 1)
948                    if self.cooker.configuration.force:
949                        invalidate_task(tid, False)
950
951        delcount = set()
952        for tid in list(self.runtaskentries.keys()):
953            if tid not in runq_build:
954                delcount.add(tid)
955                del self.runtaskentries[tid]
956
957        if self.cooker.configuration.runall:
958            if not self.runtaskentries:
959                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets)))
960
961        self.init_progress_reporter.next_stage()
962
963        # Handle runonly
964        if self.cooker.configuration.runonly:
965            # re-run the mark_active and then drop unused tasks from new list
966            runq_build = {}
967
968            for task in self.cooker.configuration.runonly:
969                if not task.startswith("do_"):
970                    task = "do_{0}".format(task)
971                runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task]
972
973                for tid in runonly_tids:
974                    mark_active(tid, 1)
975                    if self.cooker.configuration.force:
976                        invalidate_task(tid, False)
977
978            for tid in list(self.runtaskentries.keys()):
979                if tid not in runq_build:
980                    delcount.add(tid)
981                    del self.runtaskentries[tid]
982
983            if not self.runtaskentries:
984                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets)))
985
986        #
987        # Step D - Sanity checks and computation
988        #
989
990        # Check to make sure we still have tasks to run
991        if not self.runtaskentries:
992            if not taskData[''].halt:
993                bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
994            else:
995                bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
996
997        logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries))
998
999        logger.verbose("Assign Weightings")
1000
1001        self.init_progress_reporter.next_stage()
1002
1003        # Generate a list of reverse dependencies to ease future calculations
1004        for tid in self.runtaskentries:
1005            for dep in self.runtaskentries[tid].depends:
1006                self.runtaskentries[dep].revdeps.add(tid)
1007
1008        self.init_progress_reporter.next_stage()
1009
1010        # Identify tasks at the end of dependency chains
1011        # Error on circular dependency loops (length two)
1012        endpoints = []
1013        for tid in self.runtaskentries:
1014            revdeps = self.runtaskentries[tid].revdeps
1015            if not revdeps:
1016                endpoints.append(tid)
1017            for dep in revdeps:
1018                if dep in self.runtaskentries[tid].depends:
1019                    bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep))
1020
1021
1022        logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
1023
1024        self.init_progress_reporter.next_stage()
1025
1026        # Calculate task weights
1027        # Check of higher length circular dependencies
1028        self.runq_weight = self.calculate_task_weights(endpoints)
1029
1030        self.init_progress_reporter.next_stage()
1031
1032        # Sanity Check - Check for multiple tasks building the same provider
1033        for mc in self.dataCaches:
1034            prov_list = {}
1035            seen_fn = []
1036            for tid in self.runtaskentries:
1037                (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1038                if taskfn in seen_fn:
1039                    continue
1040                if mc != tidmc:
1041                    continue
1042                seen_fn.append(taskfn)
1043                for prov in self.dataCaches[mc].fn_provides[taskfn]:
1044                    if prov not in prov_list:
1045                        prov_list[prov] = [taskfn]
1046                    elif taskfn not in prov_list[prov]:
1047                        prov_list[prov].append(taskfn)
1048            for prov in prov_list:
1049                if len(prov_list[prov]) < 2:
1050                    continue
1051                if prov in self.multi_provider_allowed:
1052                    continue
1053                seen_pn = []
1054                # If two versions of the same PN are being built its fatal, we don't support it.
1055                for fn in prov_list[prov]:
1056                    pn = self.dataCaches[mc].pkg_fn[fn]
1057                    if pn not in seen_pn:
1058                        seen_pn.append(pn)
1059                    else:
1060                        bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
1061                msgs = ["Multiple .bb files are due to be built which each provide %s:\n  %s" % (prov, "\n  ".join(prov_list[prov]))]
1062                #
1063                # Construct a list of things which uniquely depend on each provider
1064                # since this may help the user figure out which dependency is triggering this warning
1065                #
1066                msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.")
1067                deplist = {}
1068                commondeps = None
1069                for provfn in prov_list[prov]:
1070                    deps = set()
1071                    for tid in self.runtaskentries:
1072                        fn = fn_from_tid(tid)
1073                        if fn != provfn:
1074                            continue
1075                        for dep in self.runtaskentries[tid].revdeps:
1076                            fn = fn_from_tid(dep)
1077                            if fn == provfn:
1078                                continue
1079                            deps.add(dep)
1080                    if not commondeps:
1081                        commondeps = set(deps)
1082                    else:
1083                        commondeps &= deps
1084                    deplist[provfn] = deps
1085                for provfn in deplist:
1086                    msgs.append("\n%s has unique dependees:\n  %s" % (provfn, "\n  ".join(deplist[provfn] - commondeps)))
1087                #
1088                # Construct a list of provides and runtime providers for each recipe
1089                # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC)
1090                #
1091                msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.")
1092                provide_results = {}
1093                rprovide_results = {}
1094                commonprovs = None
1095                commonrprovs = None
1096                for provfn in prov_list[prov]:
1097                    provides = set(self.dataCaches[mc].fn_provides[provfn])
1098                    rprovides = set()
1099                    for rprovide in self.dataCaches[mc].rproviders:
1100                        if provfn in self.dataCaches[mc].rproviders[rprovide]:
1101                            rprovides.add(rprovide)
1102                    for package in self.dataCaches[mc].packages:
1103                        if provfn in self.dataCaches[mc].packages[package]:
1104                            rprovides.add(package)
1105                    for package in self.dataCaches[mc].packages_dynamic:
1106                        if provfn in self.dataCaches[mc].packages_dynamic[package]:
1107                            rprovides.add(package)
1108                    if not commonprovs:
1109                        commonprovs = set(provides)
1110                    else:
1111                        commonprovs &= provides
1112                    provide_results[provfn] = provides
1113                    if not commonrprovs:
1114                        commonrprovs = set(rprovides)
1115                    else:
1116                        commonrprovs &= rprovides
1117                    rprovide_results[provfn] = rprovides
1118                #msgs.append("\nCommon provides:\n  %s" % ("\n  ".join(commonprovs)))
1119                #msgs.append("\nCommon rprovides:\n  %s" % ("\n  ".join(commonrprovs)))
1120                for provfn in prov_list[prov]:
1121                    msgs.append("\n%s has unique provides:\n  %s" % (provfn, "\n  ".join(provide_results[provfn] - commonprovs)))
1122                    msgs.append("\n%s has unique rprovides:\n  %s" % (provfn, "\n  ".join(rprovide_results[provfn] - commonrprovs)))
1123
1124                if self.warn_multi_bb:
1125                    logger.verbnote("".join(msgs))
1126                else:
1127                    logger.error("".join(msgs))
1128
1129        self.init_progress_reporter.next_stage()
1130        self.init_progress_reporter.next_stage()
1131
1132        # Iterate over the task list looking for tasks with a 'setscene' function
1133        self.runq_setscene_tids = set()
1134        if not self.cooker.configuration.nosetscene:
1135            for tid in self.runtaskentries:
1136                (mc, fn, taskname, _) = split_tid_mcfn(tid)
1137                setscenetid = tid + "_setscene"
1138                if setscenetid not in taskData[mc].taskentries:
1139                    continue
1140                self.runq_setscene_tids.add(tid)
1141
1142        self.init_progress_reporter.next_stage()
1143
1144        # Invalidate task if force mode active
1145        if self.cooker.configuration.force:
1146            for tid in self.target_tids:
1147                invalidate_task(tid, False)
1148
1149        # Invalidate task if invalidate mode active
1150        if self.cooker.configuration.invalidate_stamp:
1151            for tid in self.target_tids:
1152                fn = fn_from_tid(tid)
1153                for st in self.cooker.configuration.invalidate_stamp.split(','):
1154                    if not st.startswith("do_"):
1155                        st = "do_%s" % st
1156                    invalidate_task(fn + ":" + st, True)
1157
1158        self.init_progress_reporter.next_stage()
1159
1160        # Create and print to the logs a virtual/xxxx -> PN (fn) table
1161        for mc in taskData:
1162            virtmap = taskData[mc].get_providermap(prefix="virtual/")
1163            virtpnmap = {}
1164            for v in virtmap:
1165                virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]]
1166                bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v]))
1167            if hasattr(bb.parse.siggen, "tasks_resolved"):
1168                bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc])
1169
1170        self.init_progress_reporter.next_stage()
1171
1172        bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids)
1173
1174        # Iterate over the task list and call into the siggen code
1175        dealtwith = set()
1176        todeal = set(self.runtaskentries)
1177        while todeal:
1178            for tid in todeal.copy():
1179                if not (self.runtaskentries[tid].depends - dealtwith):
1180                    dealtwith.add(tid)
1181                    todeal.remove(tid)
1182                    self.prepare_task_hash(tid)
1183
1184        bb.parse.siggen.writeout_file_checksum_cache()
1185
1186        #self.dump_data()
1187        return len(self.runtaskentries)
1188
1189    def prepare_task_hash(self, tid):
1190        dc = bb.parse.siggen.get_data_caches(self.dataCaches, mc_from_tid(tid))
1191        bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, dc)
1192        self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, dc)
1193        self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
1194
1195    def dump_data(self):
1196        """
1197        Dump some debug information on the internal data structures
1198        """
1199        logger.debug3("run_tasks:")
1200        for tid in self.runtaskentries:
1201            logger.debug3(" %s: %s   Deps %s RevDeps %s", tid,
1202                         self.runtaskentries[tid].weight,
1203                         self.runtaskentries[tid].depends,
1204                         self.runtaskentries[tid].revdeps)
1205
1206class RunQueueWorker():
1207    def __init__(self, process, pipe):
1208        self.process = process
1209        self.pipe = pipe
1210
1211class RunQueue:
1212    def __init__(self, cooker, cfgData, dataCaches, taskData, targets):
1213
1214        self.cooker = cooker
1215        self.cfgData = cfgData
1216        self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets)
1217
1218        self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None
1219        self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None
1220
1221        self.state = runQueuePrepare
1222
1223        # For disk space monitor
1224        # Invoked at regular time intervals via the bitbake heartbeat event
1225        # while the build is running. We generate a unique name for the handler
1226        # here, just in case that there ever is more than one RunQueue instance,
1227        # start the handler when reaching runQueueSceneInit, and stop it when
1228        # done with the build.
1229        self.dm = monitordisk.diskMonitor(cfgData)
1230        self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self))
1231        self.dm_event_handler_registered = False
1232        self.rqexe = None
1233        self.worker = {}
1234        self.fakeworker = {}
1235
1236    def _start_worker(self, mc, fakeroot = False, rqexec = None):
1237        logger.debug("Starting bitbake-worker")
1238        magic = "decafbad"
1239        if self.cooker.configuration.profile:
1240            magic = "decafbadbad"
1241        fakerootlogs = None
1242        if fakeroot:
1243            magic = magic + "beef"
1244            mcdata = self.cooker.databuilder.mcdata[mc]
1245            fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD"))
1246            fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split()
1247            env = os.environ.copy()
1248            for key, value in (var.split('=') for var in fakerootenv):
1249                env[key] = value
1250            worker = subprocess.Popen(fakerootcmd + ["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
1251            fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs
1252        else:
1253            worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1254        bb.utils.nonblockingfd(worker.stdout)
1255        workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs)
1256
1257        workerdata = {
1258            "taskdeps" : self.rqdata.dataCaches[mc].task_deps,
1259            "fakerootenv" : self.rqdata.dataCaches[mc].fakerootenv,
1260            "fakerootdirs" : self.rqdata.dataCaches[mc].fakerootdirs,
1261            "fakerootnoenv" : self.rqdata.dataCaches[mc].fakerootnoenv,
1262            "sigdata" : bb.parse.siggen.get_taskdata(),
1263            "logdefaultlevel" : bb.msg.loggerDefaultLogLevel,
1264            "build_verbose_shell" : self.cooker.configuration.build_verbose_shell,
1265            "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout,
1266            "logdefaultdomain" : bb.msg.loggerDefaultDomains,
1267            "prhost" : self.cooker.prhost,
1268            "buildname" : self.cfgData.getVar("BUILDNAME"),
1269            "date" : self.cfgData.getVar("DATE"),
1270            "time" : self.cfgData.getVar("TIME"),
1271            "hashservaddr" : self.cooker.hashservaddr,
1272            "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
1273        }
1274
1275        worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>")
1276        worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>")
1277        worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
1278        worker.stdin.flush()
1279
1280        return RunQueueWorker(worker, workerpipe)
1281
1282    def _teardown_worker(self, worker):
1283        if not worker:
1284            return
1285        logger.debug("Teardown for bitbake-worker")
1286        try:
1287           worker.process.stdin.write(b"<quit></quit>")
1288           worker.process.stdin.flush()
1289           worker.process.stdin.close()
1290        except IOError:
1291           pass
1292        while worker.process.returncode is None:
1293            worker.pipe.read()
1294            worker.process.poll()
1295        while worker.pipe.read():
1296            continue
1297        worker.pipe.close()
1298
1299    def start_worker(self):
1300        if self.worker:
1301            self.teardown_workers()
1302        self.teardown = False
1303        for mc in self.rqdata.dataCaches:
1304            self.worker[mc] = self._start_worker(mc)
1305
1306    def start_fakeworker(self, rqexec, mc):
1307        if not mc in self.fakeworker:
1308            self.fakeworker[mc] = self._start_worker(mc, True, rqexec)
1309
1310    def teardown_workers(self):
1311        self.teardown = True
1312        for mc in self.worker:
1313            self._teardown_worker(self.worker[mc])
1314        self.worker = {}
1315        for mc in self.fakeworker:
1316            self._teardown_worker(self.fakeworker[mc])
1317        self.fakeworker = {}
1318
1319    def read_workers(self):
1320        for mc in self.worker:
1321            self.worker[mc].pipe.read()
1322        for mc in self.fakeworker:
1323            self.fakeworker[mc].pipe.read()
1324
1325    def active_fds(self):
1326        fds = []
1327        for mc in self.worker:
1328            fds.append(self.worker[mc].pipe.input)
1329        for mc in self.fakeworker:
1330            fds.append(self.fakeworker[mc].pipe.input)
1331        return fds
1332
1333    def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None):
1334        def get_timestamp(f):
1335            try:
1336                if not os.access(f, os.F_OK):
1337                    return None
1338                return os.stat(f)[stat.ST_MTIME]
1339            except:
1340                return None
1341
1342        (mc, fn, tn, taskfn) = split_tid_mcfn(tid)
1343        if taskname is None:
1344            taskname = tn
1345
1346        stampfile = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn)
1347
1348        # If the stamp is missing, it's not current
1349        if not os.access(stampfile, os.F_OK):
1350            logger.debug2("Stampfile %s not available", stampfile)
1351            return False
1352        # If it's a 'nostamp' task, it's not current
1353        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1354        if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
1355            logger.debug2("%s.%s is nostamp\n", fn, taskname)
1356            return False
1357
1358        if taskname != "do_setscene" and taskname.endswith("_setscene"):
1359            return True
1360
1361        if cache is None:
1362            cache = {}
1363
1364        iscurrent = True
1365        t1 = get_timestamp(stampfile)
1366        for dep in self.rqdata.runtaskentries[tid].depends:
1367            if iscurrent:
1368                (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep)
1369                stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCaches[mc2], taskfn2)
1370                stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCaches[mc2], taskfn2)
1371                t2 = get_timestamp(stampfile2)
1372                t3 = get_timestamp(stampfile3)
1373                if t3 and not t2:
1374                    continue
1375                if t3 and t3 > t2:
1376                    continue
1377                if fn == fn2:
1378                    if not t2:
1379                        logger.debug2('Stampfile %s does not exist', stampfile2)
1380                        iscurrent = False
1381                        break
1382                    if t1 < t2:
1383                        logger.debug2('Stampfile %s < %s', stampfile, stampfile2)
1384                        iscurrent = False
1385                        break
1386                    if recurse and iscurrent:
1387                        if dep in cache:
1388                            iscurrent = cache[dep]
1389                            if not iscurrent:
1390                                logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1391                        else:
1392                            iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1393                            cache[dep] = iscurrent
1394        if recurse:
1395            cache[tid] = iscurrent
1396        return iscurrent
1397
1398    def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True):
1399        valid = set()
1400        if self.hashvalidate:
1401            sq_data = {}
1402            sq_data['hash'] = {}
1403            sq_data['hashfn'] = {}
1404            sq_data['unihash'] = {}
1405            for tid in tocheck:
1406                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1407                sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash
1408                sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn]
1409                sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash
1410
1411            valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary)
1412
1413        return valid
1414
1415    def validate_hash(self, sq_data, d, siginfo, currentcount, summary):
1416        locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary}
1417
1418        # Metadata has **kwargs so args can be added, sq_data can also gain new fields
1419        call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)"
1420
1421        return bb.utils.better_eval(call, locs)
1422
1423    def _execute_runqueue(self):
1424        """
1425        Run the tasks in a queue prepared by rqdata.prepare()
1426        Upon failure, optionally try to recover the build using any alternate providers
1427        (if the halt on failure configuration option isn't set)
1428        """
1429
1430        retval = True
1431
1432        if self.state is runQueuePrepare:
1433            # NOTE: if you add, remove or significantly refactor the stages of this
1434            # process then you should recalculate the weightings here. This is quite
1435            # easy to do - just change the next line temporarily to pass debug=True as
1436            # the last parameter and you'll get a printout of the weightings as well
1437            # as a map to the lines where next_stage() was called. Of course this isn't
1438            # critical, but it helps to keep the progress reporting accurate.
1439            self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data,
1440                                                            "Initialising tasks",
1441                                                            [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244])
1442            if self.rqdata.prepare() == 0:
1443                self.state = runQueueComplete
1444            else:
1445                self.state = runQueueSceneInit
1446                bb.parse.siggen.save_unitaskhashes()
1447
1448        if self.state is runQueueSceneInit:
1449            self.rqdata.init_progress_reporter.next_stage()
1450
1451            # we are ready to run,  emit dependency info to any UI or class which
1452            # needs it
1453            depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1454            self.rqdata.init_progress_reporter.next_stage()
1455            bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1456
1457            if not self.dm_event_handler_registered:
1458                 res = bb.event.register(self.dm_event_handler_name,
1459                                         lambda x: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False,
1460                                         ('bb.event.HeartbeatEvent',), data=self.cfgData)
1461                 self.dm_event_handler_registered = True
1462
1463            dump = self.cooker.configuration.dump_signatures
1464            if dump:
1465                self.rqdata.init_progress_reporter.finish()
1466                if 'printdiff' in dump:
1467                    invalidtasks = self.print_diffscenetasks()
1468                self.dump_signatures(dump)
1469                if 'printdiff' in dump:
1470                    self.write_diffscenetasks(invalidtasks)
1471                self.state = runQueueComplete
1472
1473        if self.state is runQueueSceneInit:
1474            self.rqdata.init_progress_reporter.next_stage()
1475            self.start_worker()
1476            self.rqdata.init_progress_reporter.next_stage()
1477            self.rqexe = RunQueueExecute(self)
1478
1479            # If we don't have any setscene functions, skip execution
1480            if not self.rqdata.runq_setscene_tids:
1481                logger.info('No setscene tasks')
1482                for tid in self.rqdata.runtaskentries:
1483                    if not self.rqdata.runtaskentries[tid].depends:
1484                        self.rqexe.setbuildable(tid)
1485                    self.rqexe.tasks_notcovered.add(tid)
1486                self.rqexe.sqdone = True
1487            logger.info('Executing Tasks')
1488            self.state = runQueueRunning
1489
1490        if self.state is runQueueRunning:
1491            retval = self.rqexe.execute()
1492
1493        if self.state is runQueueCleanUp:
1494            retval = self.rqexe.finish()
1495
1496        build_done = self.state is runQueueComplete or self.state is runQueueFailed
1497
1498        if build_done and self.dm_event_handler_registered:
1499            bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData)
1500            self.dm_event_handler_registered = False
1501
1502        if build_done and self.rqexe:
1503            bb.parse.siggen.save_unitaskhashes()
1504            self.teardown_workers()
1505            if self.rqexe:
1506                if self.rqexe.stats.failed:
1507                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1508                else:
1509                    # Let's avoid the word "failed" if nothing actually did
1510                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1511
1512        if self.state is runQueueFailed:
1513            raise bb.runqueue.TaskFailure(self.rqexe.failed_tids)
1514
1515        if self.state is runQueueComplete:
1516            # All done
1517            return False
1518
1519        # Loop
1520        return retval
1521
1522    def execute_runqueue(self):
1523        # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1524        try:
1525            return self._execute_runqueue()
1526        except bb.runqueue.TaskFailure:
1527            raise
1528        except SystemExit:
1529            raise
1530        except bb.BBHandledException:
1531            try:
1532                self.teardown_workers()
1533            except:
1534                pass
1535            self.state = runQueueComplete
1536            raise
1537        except Exception as err:
1538            logger.exception("An uncaught exception occurred in runqueue")
1539            try:
1540                self.teardown_workers()
1541            except:
1542                pass
1543            self.state = runQueueComplete
1544            raise
1545
1546    def finish_runqueue(self, now = False):
1547        if not self.rqexe:
1548            self.state = runQueueComplete
1549            return
1550
1551        if now:
1552            self.rqexe.finish_now()
1553        else:
1554            self.rqexe.finish()
1555
1556    def rq_dump_sigfn(self, fn, options):
1557        bb_cache = bb.cache.NoCache(self.cooker.databuilder)
1558        mc = bb.runqueue.mc_from_tid(fn)
1559        the_data = bb_cache.loadDataFull(fn, self.cooker.collections[mc].get_file_appends(fn))
1560        siggen = bb.parse.siggen
1561        dataCaches = self.rqdata.dataCaches
1562        siggen.dump_sigfn(fn, dataCaches, options)
1563
1564    def dump_signatures(self, options):
1565        fns = set()
1566        bb.note("Reparsing files to collect dependency data")
1567
1568        for tid in self.rqdata.runtaskentries:
1569            fn = fn_from_tid(tid)
1570            fns.add(fn)
1571
1572        max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1)
1573        # We cannot use the real multiprocessing.Pool easily due to some local data
1574        # that can't be pickled. This is a cheap multi-process solution.
1575        launched = []
1576        while fns:
1577            if len(launched) < max_process:
1578                p = Process(target=self.rq_dump_sigfn, args=(fns.pop(), options))
1579                p.start()
1580                launched.append(p)
1581            for q in launched:
1582                # The finished processes are joined when calling is_alive()
1583                if not q.is_alive():
1584                    launched.remove(q)
1585        for p in launched:
1586                p.join()
1587
1588        bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options)
1589
1590        return
1591
1592    def print_diffscenetasks(self):
1593
1594        noexec = []
1595        tocheck = set()
1596
1597        for tid in self.rqdata.runtaskentries:
1598            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1599            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1600
1601            if 'noexec' in taskdep and taskname in taskdep['noexec']:
1602                noexec.append(tid)
1603                continue
1604
1605            tocheck.add(tid)
1606
1607        valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False)
1608
1609        # Tasks which are both setscene and noexec never care about dependencies
1610        # We therefore find tasks which are setscene and noexec and mark their
1611        # unique dependencies as valid.
1612        for tid in noexec:
1613            if tid not in self.rqdata.runq_setscene_tids:
1614                continue
1615            for dep in self.rqdata.runtaskentries[tid].depends:
1616                hasnoexecparents = True
1617                for dep2 in self.rqdata.runtaskentries[dep].revdeps:
1618                    if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec:
1619                        continue
1620                    hasnoexecparents = False
1621                    break
1622                if hasnoexecparents:
1623                    valid_new.add(dep)
1624
1625        invalidtasks = set()
1626        for tid in self.rqdata.runtaskentries:
1627            if tid not in valid_new and tid not in noexec:
1628                invalidtasks.add(tid)
1629
1630        found = set()
1631        processed = set()
1632        for tid in invalidtasks:
1633            toprocess = set([tid])
1634            while toprocess:
1635                next = set()
1636                for t in toprocess:
1637                    for dep in self.rqdata.runtaskentries[t].depends:
1638                        if dep in invalidtasks:
1639                            found.add(tid)
1640                        if dep not in processed:
1641                            processed.add(dep)
1642                            next.add(dep)
1643                toprocess = next
1644                if tid in found:
1645                    toprocess = set()
1646
1647        tasklist = []
1648        for tid in invalidtasks.difference(found):
1649            tasklist.append(tid)
1650
1651        if tasklist:
1652            bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1653
1654        return invalidtasks.difference(found)
1655
1656    def write_diffscenetasks(self, invalidtasks):
1657
1658        # Define recursion callback
1659        def recursecb(key, hash1, hash2):
1660            hashes = [hash1, hash2]
1661            hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1662
1663            recout = []
1664            if len(hashfiles) == 2:
1665                out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb)
1666                recout.extend(list('    ' + l for l in out2))
1667            else:
1668                recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1669
1670            return recout
1671
1672
1673        for tid in invalidtasks:
1674            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1675            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1676            h = self.rqdata.runtaskentries[tid].hash
1677            matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc])
1678            match = None
1679            for m in matches:
1680                if h in m:
1681                    match = m
1682            if match is None:
1683                bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h)
1684            matches = {k : v for k, v in iter(matches.items()) if h not in k}
1685            if matches:
1686                latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1]
1687                prevh = __find_sha256__.search(latestmatch).group(0)
1688                output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1689                bb.plain("\nTask %s:%s couldn't be used from the cache because:\n  We need hash %s, closest matching task was %s\n  " % (pn, taskname, h, prevh) + '\n  '.join(output))
1690
1691
1692class RunQueueExecute:
1693
1694    def __init__(self, rq):
1695        self.rq = rq
1696        self.cooker = rq.cooker
1697        self.cfgData = rq.cfgData
1698        self.rqdata = rq.rqdata
1699
1700        self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1)
1701        self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed"
1702
1703        self.sq_buildable = set()
1704        self.sq_running = set()
1705        self.sq_live = set()
1706
1707        self.updated_taskhash_queue = []
1708        self.pending_migrations = set()
1709
1710        self.runq_buildable = set()
1711        self.runq_running = set()
1712        self.runq_complete = set()
1713        self.runq_tasksrun = set()
1714
1715        self.build_stamps = {}
1716        self.build_stamps2 = []
1717        self.failed_tids = []
1718        self.sq_deferred = {}
1719
1720        self.stampcache = {}
1721
1722        self.holdoff_tasks = set()
1723        self.holdoff_need_update = True
1724        self.sqdone = False
1725
1726        self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids))
1727
1728        for mc in rq.worker:
1729            rq.worker[mc].pipe.setrunqueueexec(self)
1730        for mc in rq.fakeworker:
1731            rq.fakeworker[mc].pipe.setrunqueueexec(self)
1732
1733        if self.number_tasks <= 0:
1734             bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
1735
1736        # List of setscene tasks which we've covered
1737        self.scenequeue_covered = set()
1738        # List of tasks which are covered (including setscene ones)
1739        self.tasks_covered = set()
1740        self.tasks_scenequeue_done = set()
1741        self.scenequeue_notcovered = set()
1742        self.tasks_notcovered = set()
1743        self.scenequeue_notneeded = set()
1744
1745        # We can't skip specified target tasks which aren't setscene tasks
1746        self.cantskip = set(self.rqdata.target_tids)
1747        self.cantskip.difference_update(self.rqdata.runq_setscene_tids)
1748        self.cantskip.intersection_update(self.rqdata.runtaskentries)
1749
1750        schedulers = self.get_schedulers()
1751        for scheduler in schedulers:
1752            if self.scheduler == scheduler.name:
1753                self.sched = scheduler(self, self.rqdata)
1754                logger.debug("Using runqueue scheduler '%s'", scheduler.name)
1755                break
1756        else:
1757            bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
1758                     (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1759
1760        #if self.rqdata.runq_setscene_tids:
1761        self.sqdata = SQData()
1762        build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self)
1763
1764    def runqueue_process_waitpid(self, task, status, fakerootlog=None):
1765
1766        # self.build_stamps[pid] may not exist when use shared work directory.
1767        if task in self.build_stamps:
1768            self.build_stamps2.remove(self.build_stamps[task])
1769            del self.build_stamps[task]
1770
1771        if task in self.sq_live:
1772            if status != 0:
1773                self.sq_task_fail(task, status)
1774            else:
1775                self.sq_task_complete(task)
1776            self.sq_live.remove(task)
1777            self.stats.updateActiveSetscene(len(self.sq_live))
1778        else:
1779            if status != 0:
1780                self.task_fail(task, status, fakerootlog=fakerootlog)
1781            else:
1782                self.task_complete(task)
1783        return True
1784
1785    def finish_now(self):
1786        for mc in self.rq.worker:
1787            try:
1788                self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
1789                self.rq.worker[mc].process.stdin.flush()
1790            except IOError:
1791                # worker must have died?
1792                pass
1793        for mc in self.rq.fakeworker:
1794            try:
1795                self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
1796                self.rq.fakeworker[mc].process.stdin.flush()
1797            except IOError:
1798                # worker must have died?
1799                pass
1800
1801        if self.failed_tids:
1802            self.rq.state = runQueueFailed
1803            return
1804
1805        self.rq.state = runQueueComplete
1806        return
1807
1808    def finish(self):
1809        self.rq.state = runQueueCleanUp
1810
1811        active = self.stats.active + len(self.sq_live)
1812        if active > 0:
1813            bb.event.fire(runQueueExitWait(active), self.cfgData)
1814            self.rq.read_workers()
1815            return self.rq.active_fds()
1816
1817        if self.failed_tids:
1818            self.rq.state = runQueueFailed
1819            return True
1820
1821        self.rq.state = runQueueComplete
1822        return True
1823
1824    # Used by setscene only
1825    def check_dependencies(self, task, taskdeps):
1826        if not self.rq.depvalidate:
1827            return False
1828
1829        # Must not edit parent data
1830        taskdeps = set(taskdeps)
1831
1832        taskdata = {}
1833        taskdeps.add(task)
1834        for dep in taskdeps:
1835            (mc, fn, taskname, taskfn) = split_tid_mcfn(dep)
1836            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1837            taskdata[dep] = [pn, taskname, fn]
1838        call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
1839        locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data }
1840        valid = bb.utils.better_eval(call, locs)
1841        return valid
1842
1843    def can_start_task(self):
1844        active = self.stats.active + len(self.sq_live)
1845        can_start = active < self.number_tasks
1846        return can_start
1847
1848    def get_schedulers(self):
1849        schedulers = set(obj for obj in globals().values()
1850                             if type(obj) is type and
1851                                issubclass(obj, RunQueueScheduler))
1852
1853        user_schedulers = self.cfgData.getVar("BB_SCHEDULERS")
1854        if user_schedulers:
1855            for sched in user_schedulers.split():
1856                if not "." in sched:
1857                    bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1858                    continue
1859
1860                modname, name = sched.rsplit(".", 1)
1861                try:
1862                    module = __import__(modname, fromlist=(name,))
1863                except ImportError as exc:
1864                    logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1865                    raise SystemExit(1)
1866                else:
1867                    schedulers.add(getattr(module, name))
1868        return schedulers
1869
1870    def setbuildable(self, task):
1871        self.runq_buildable.add(task)
1872        self.sched.newbuildable(task)
1873
1874    def task_completeoutright(self, task):
1875        """
1876        Mark a task as completed
1877        Look at the reverse dependencies and mark any task with
1878        completed dependencies as buildable
1879        """
1880        self.runq_complete.add(task)
1881        for revdep in self.rqdata.runtaskentries[task].revdeps:
1882            if revdep in self.runq_running:
1883                continue
1884            if revdep in self.runq_buildable:
1885                continue
1886            alldeps = True
1887            for dep in self.rqdata.runtaskentries[revdep].depends:
1888                if dep not in self.runq_complete:
1889                    alldeps = False
1890                    break
1891            if alldeps:
1892                self.setbuildable(revdep)
1893                logger.debug("Marking task %s as buildable", revdep)
1894
1895        for t in self.sq_deferred.copy():
1896            if self.sq_deferred[t] == task:
1897                logger.debug2("Deferred task %s now buildable" % t)
1898                del self.sq_deferred[t]
1899                update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
1900
1901    def task_complete(self, task):
1902        self.stats.taskCompleted()
1903        bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1904        self.task_completeoutright(task)
1905        self.runq_tasksrun.add(task)
1906
1907    def task_fail(self, task, exitcode, fakerootlog=None):
1908        """
1909        Called when a task has failed
1910        Updates the state engine with the failure
1911        """
1912        self.stats.taskFailed()
1913        self.failed_tids.append(task)
1914
1915        fakeroot_log = []
1916        if fakerootlog and os.path.exists(fakerootlog):
1917            with open(fakerootlog) as fakeroot_log_file:
1918                fakeroot_failed = False
1919                for line in reversed(fakeroot_log_file.readlines()):
1920                    for fakeroot_error in ['mismatch', 'error', 'fatal']:
1921                        if fakeroot_error in line.lower():
1922                            fakeroot_failed = True
1923                    if 'doing new pid setup and server start' in line:
1924                        break
1925                    fakeroot_log.append(line)
1926
1927            if not fakeroot_failed:
1928                fakeroot_log = []
1929
1930        bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData)
1931
1932        if self.rqdata.taskData[''].halt:
1933            self.rq.state = runQueueCleanUp
1934
1935    def task_skip(self, task, reason):
1936        self.runq_running.add(task)
1937        self.setbuildable(task)
1938        bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
1939        self.task_completeoutright(task)
1940        self.stats.taskSkipped()
1941        self.stats.taskCompleted()
1942
1943    def summarise_scenequeue_errors(self):
1944        err = False
1945        if not self.sqdone:
1946            logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
1947            completeevent = sceneQueueComplete(self.stats, self.rq)
1948            bb.event.fire(completeevent, self.cfgData)
1949        if self.sq_deferred:
1950            logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred))
1951            err = True
1952        if self.updated_taskhash_queue:
1953            logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue))
1954            err = True
1955        if self.holdoff_tasks:
1956            logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks))
1957            err = True
1958
1959        for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered):
1960            # No task should end up in both covered and uncovered, that is a bug.
1961            logger.error("Setscene task %s in both covered and notcovered." % tid)
1962
1963        for tid in self.rqdata.runq_setscene_tids:
1964            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
1965                err = True
1966                logger.error("Setscene Task %s was never marked as covered or not covered" % tid)
1967            if tid not in self.sq_buildable:
1968                err = True
1969                logger.error("Setscene Task %s was never marked as buildable" % tid)
1970            if tid not in self.sq_running:
1971                err = True
1972                logger.error("Setscene Task %s was never marked as running" % tid)
1973
1974        for x in self.rqdata.runtaskentries:
1975            if x not in self.tasks_covered and x not in self.tasks_notcovered:
1976                logger.error("Task %s was never moved from the setscene queue" % x)
1977                err = True
1978            if x not in self.tasks_scenequeue_done:
1979                logger.error("Task %s was never processed by the setscene code" % x)
1980                err = True
1981            if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable:
1982                logger.error("Task %s was never marked as buildable by the setscene code" % x)
1983                err = True
1984        return err
1985
1986
1987    def execute(self):
1988        """
1989        Run the tasks in a queue prepared by prepare_runqueue
1990        """
1991
1992        self.rq.read_workers()
1993        if self.updated_taskhash_queue or self.pending_migrations:
1994            self.process_possible_migrations()
1995
1996        if not hasattr(self, "sorted_setscene_tids"):
1997            # Don't want to sort this set every execution
1998            self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids)
1999
2000        task = None
2001        if not self.sqdone and self.can_start_task():
2002            # Find the next setscene to run
2003            for nexttask in self.sorted_setscene_tids:
2004                if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values():
2005                    if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]):
2006                        if nexttask not in self.rqdata.target_tids:
2007                            logger.debug2("Skipping setscene for task %s" % nexttask)
2008                            self.sq_task_skip(nexttask)
2009                            self.scenequeue_notneeded.add(nexttask)
2010                            if nexttask in self.sq_deferred:
2011                                del self.sq_deferred[nexttask]
2012                            return True
2013                    # If covered tasks are running, need to wait for them to complete
2014                    for t in self.sqdata.sq_covered_tasks[nexttask]:
2015                        if t in self.runq_running and t not in self.runq_complete:
2016                            continue
2017                    if nexttask in self.sq_deferred:
2018                        if self.sq_deferred[nexttask] not in self.runq_complete:
2019                            continue
2020                        logger.debug("Task %s no longer deferred" % nexttask)
2021                        del self.sq_deferred[nexttask]
2022                        valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False)
2023                        if not valid:
2024                            logger.debug("%s didn't become valid, skipping setscene" % nexttask)
2025                            self.sq_task_failoutright(nexttask)
2026                            return True
2027                    if nexttask in self.sqdata.outrightfail:
2028                        logger.debug2('No package found, so skipping setscene task %s', nexttask)
2029                        self.sq_task_failoutright(nexttask)
2030                        return True
2031                    if nexttask in self.sqdata.unskippable:
2032                        logger.debug2("Setscene task %s is unskippable" % nexttask)
2033                    task = nexttask
2034                    break
2035        if task is not None:
2036            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2037            taskname = taskname + "_setscene"
2038            if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache):
2039                logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task)
2040                self.sq_task_failoutright(task)
2041                return True
2042
2043            if self.cooker.configuration.force:
2044                if task in self.rqdata.target_tids:
2045                    self.sq_task_failoutright(task)
2046                    return True
2047
2048            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2049                logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task)
2050                self.sq_task_skip(task)
2051                return True
2052
2053            if self.cooker.configuration.skipsetscene:
2054                logger.debug2('No setscene tasks should be executed. Skipping %s', task)
2055                self.sq_task_failoutright(task)
2056                return True
2057
2058            startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
2059            bb.event.fire(startevent, self.cfgData)
2060
2061            taskdepdata = self.sq_build_taskdepdata(task)
2062
2063            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2064            taskhash = self.rqdata.get_task_hash(task)
2065            unihash = self.rqdata.get_task_unihash(task)
2066            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
2067                if not mc in self.rq.fakeworker:
2068                    self.rq.start_fakeworker(self, mc)
2069                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
2070                self.rq.fakeworker[mc].process.stdin.flush()
2071            else:
2072                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
2073                self.rq.worker[mc].process.stdin.flush()
2074
2075            self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
2076            self.build_stamps2.append(self.build_stamps[task])
2077            self.sq_running.add(task)
2078            self.sq_live.add(task)
2079            self.stats.updateActiveSetscene(len(self.sq_live))
2080            if self.can_start_task():
2081                return True
2082
2083        self.update_holdofftasks()
2084
2085        if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks:
2086            hashequiv_logger.verbose("Setscene tasks completed")
2087
2088            err = self.summarise_scenequeue_errors()
2089            if err:
2090                self.rq.state = runQueueFailed
2091                return True
2092
2093            if self.cooker.configuration.setsceneonly:
2094                self.rq.state = runQueueComplete
2095                return True
2096            self.sqdone = True
2097
2098            if self.stats.total == 0:
2099                # nothing to do
2100                self.rq.state = runQueueComplete
2101                return True
2102
2103        if self.cooker.configuration.setsceneonly:
2104            task = None
2105        else:
2106            task = self.sched.next()
2107        if task is not None:
2108            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2109
2110            if self.rqdata.setscene_ignore_tasks is not None:
2111                if self.check_setscene_ignore_tasks(task):
2112                    self.task_fail(task, "setscene ignore_tasks")
2113                    return True
2114
2115            if task in self.tasks_covered:
2116                logger.debug2("Setscene covered task %s", task)
2117                self.task_skip(task, "covered")
2118                return True
2119
2120            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2121                logger.debug2("Stamp current task %s", task)
2122
2123                self.task_skip(task, "existing")
2124                self.runq_tasksrun.add(task)
2125                return True
2126
2127            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2128            if 'noexec' in taskdep and taskname in taskdep['noexec']:
2129                startevent = runQueueTaskStarted(task, self.stats, self.rq,
2130                                                 noexec=True)
2131                bb.event.fire(startevent, self.cfgData)
2132                self.runq_running.add(task)
2133                self.stats.taskActive()
2134                if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2135                    bb.build.make_stamp(taskname, self.rqdata.dataCaches[mc], taskfn)
2136                self.task_complete(task)
2137                return True
2138            else:
2139                startevent = runQueueTaskStarted(task, self.stats, self.rq)
2140                bb.event.fire(startevent, self.cfgData)
2141
2142            taskdepdata = self.build_taskdepdata(task)
2143
2144            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2145            taskhash = self.rqdata.get_task_hash(task)
2146            unihash = self.rqdata.get_task_unihash(task)
2147            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2148                if not mc in self.rq.fakeworker:
2149                    try:
2150                        self.rq.start_fakeworker(self, mc)
2151                    except OSError as exc:
2152                        logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc)))
2153                        self.rq.state = runQueueFailed
2154                        self.stats.taskFailed()
2155                        return True
2156                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
2157                self.rq.fakeworker[mc].process.stdin.flush()
2158            else:
2159                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
2160                self.rq.worker[mc].process.stdin.flush()
2161
2162            self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
2163            self.build_stamps2.append(self.build_stamps[task])
2164            self.runq_running.add(task)
2165            self.stats.taskActive()
2166            if self.can_start_task():
2167                return True
2168
2169        if self.stats.active > 0 or self.sq_live:
2170            self.rq.read_workers()
2171            return self.rq.active_fds()
2172
2173        # No more tasks can be run. If we have deferred setscene tasks we should run them.
2174        if self.sq_deferred:
2175            tid = self.sq_deferred.pop(list(self.sq_deferred.keys())[0])
2176            logger.warning("Runqeueue deadlocked on deferred tasks, forcing task %s" % tid)
2177            if tid not in self.runq_complete:
2178                self.sq_task_failoutright(tid)
2179            return True
2180
2181        if self.failed_tids:
2182            self.rq.state = runQueueFailed
2183            return True
2184
2185        # Sanity Checks
2186        err = self.summarise_scenequeue_errors()
2187        for task in self.rqdata.runtaskentries:
2188            if task not in self.runq_buildable:
2189                logger.error("Task %s never buildable!", task)
2190                err = True
2191            elif task not in self.runq_running:
2192                logger.error("Task %s never ran!", task)
2193                err = True
2194            elif task not in self.runq_complete:
2195                logger.error("Task %s never completed!", task)
2196                err = True
2197
2198        if err:
2199            self.rq.state = runQueueFailed
2200        else:
2201            self.rq.state = runQueueComplete
2202
2203        return True
2204
2205    def filtermcdeps(self, task, mc, deps):
2206        ret = set()
2207        for dep in deps:
2208            thismc = mc_from_tid(dep)
2209            if thismc != mc:
2210                continue
2211            ret.add(dep)
2212        return ret
2213
2214    # We filter out multiconfig dependencies from taskdepdata we pass to the tasks
2215    # as most code can't handle them
2216    def build_taskdepdata(self, task):
2217        taskdepdata = {}
2218        mc = mc_from_tid(task)
2219        next = self.rqdata.runtaskentries[task].depends.copy()
2220        next.add(task)
2221        next = self.filtermcdeps(task, mc, next)
2222        while next:
2223            additional = []
2224            for revdep in next:
2225                (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2226                pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2227                deps = self.rqdata.runtaskentries[revdep].depends
2228                provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
2229                taskhash = self.rqdata.runtaskentries[revdep].hash
2230                unihash = self.rqdata.runtaskentries[revdep].unihash
2231                deps = self.filtermcdeps(task, mc, deps)
2232                taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash]
2233                for revdep2 in deps:
2234                    if revdep2 not in taskdepdata:
2235                        additional.append(revdep2)
2236            next = additional
2237
2238        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2239        return taskdepdata
2240
2241    def update_holdofftasks(self):
2242
2243        if not self.holdoff_need_update:
2244            return
2245
2246        notcovered = set(self.scenequeue_notcovered)
2247        notcovered |= self.cantskip
2248        for tid in self.scenequeue_notcovered:
2249            notcovered |= self.sqdata.sq_covered_tasks[tid]
2250        notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids)
2251        notcovered.intersection_update(self.tasks_scenequeue_done)
2252
2253        covered = set(self.scenequeue_covered)
2254        for tid in self.scenequeue_covered:
2255            covered |= self.sqdata.sq_covered_tasks[tid]
2256        covered.difference_update(notcovered)
2257        covered.intersection_update(self.tasks_scenequeue_done)
2258
2259        for tid in notcovered | covered:
2260            if not self.rqdata.runtaskentries[tid].depends:
2261                self.setbuildable(tid)
2262            elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
2263                 self.setbuildable(tid)
2264
2265        self.tasks_covered = covered
2266        self.tasks_notcovered = notcovered
2267
2268        self.holdoff_tasks = set()
2269
2270        for tid in self.rqdata.runq_setscene_tids:
2271            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2272                self.holdoff_tasks.add(tid)
2273
2274        for tid in self.holdoff_tasks.copy():
2275            for dep in self.sqdata.sq_covered_tasks[tid]:
2276                if dep not in self.runq_complete:
2277                    self.holdoff_tasks.add(dep)
2278
2279        self.holdoff_need_update = False
2280
2281    def process_possible_migrations(self):
2282
2283        changed = set()
2284        toprocess = set()
2285        for tid, unihash in self.updated_taskhash_queue.copy():
2286            if tid in self.runq_running and tid not in self.runq_complete:
2287                continue
2288
2289            self.updated_taskhash_queue.remove((tid, unihash))
2290
2291            if unihash != self.rqdata.runtaskentries[tid].unihash:
2292                # Make sure we rehash any other tasks with the same task hash that we're deferred against.
2293                torehash = [tid]
2294                for deftid in self.sq_deferred:
2295                    if self.sq_deferred[deftid] == tid:
2296                        torehash.append(deftid)
2297                for hashtid in torehash:
2298                    hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash))
2299                    self.rqdata.runtaskentries[hashtid].unihash = unihash
2300                    bb.parse.siggen.set_unihash(hashtid, unihash)
2301                    toprocess.add(hashtid)
2302                if torehash:
2303                    # Need to save after set_unihash above
2304                    bb.parse.siggen.save_unitaskhashes()
2305
2306        # Work out all tasks which depend upon these
2307        total = set()
2308        next = set()
2309        for p in toprocess:
2310            next |= self.rqdata.runtaskentries[p].revdeps
2311        while next:
2312            current = next.copy()
2313            total = total | next
2314            next = set()
2315            for ntid in current:
2316                next |= self.rqdata.runtaskentries[ntid].revdeps
2317            next.difference_update(total)
2318
2319        # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
2320        next = set()
2321        for p in total:
2322            if not self.rqdata.runtaskentries[p].depends:
2323                next.add(p)
2324            elif self.rqdata.runtaskentries[p].depends.isdisjoint(total):
2325                next.add(p)
2326
2327        # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled
2328        while next:
2329            current = next.copy()
2330            next = set()
2331            for tid in current:
2332                if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
2333                    continue
2334                orighash = self.rqdata.runtaskentries[tid].hash
2335                dc = bb.parse.siggen.get_data_caches(self.rqdata.dataCaches, mc_from_tid(tid))
2336                newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, dc)
2337                origuni = self.rqdata.runtaskentries[tid].unihash
2338                newuni = bb.parse.siggen.get_unihash(tid)
2339                # FIXME, need to check it can come from sstate at all for determinism?
2340                remapped = False
2341                if newuni == origuni:
2342                    # Nothing to do, we match, skip code below
2343                    remapped = True
2344                elif tid in self.scenequeue_covered or tid in self.sq_live:
2345                    # Already ran this setscene task or it running. Report the new taskhash
2346                    bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches)
2347                    hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid))
2348                    remapped = True
2349
2350                if not remapped:
2351                    #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni))
2352                    self.rqdata.runtaskentries[tid].hash = newhash
2353                    self.rqdata.runtaskentries[tid].unihash = newuni
2354                    changed.add(tid)
2355
2356                next |= self.rqdata.runtaskentries[tid].revdeps
2357                total.remove(tid)
2358                next.intersection_update(total)
2359
2360        if changed:
2361            for mc in self.rq.worker:
2362                self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
2363            for mc in self.rq.fakeworker:
2364                self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
2365
2366            hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))
2367
2368        for tid in changed:
2369            if tid not in self.rqdata.runq_setscene_tids:
2370                continue
2371            if tid not in self.pending_migrations:
2372                self.pending_migrations.add(tid)
2373
2374        update_tasks = []
2375        for tid in self.pending_migrations.copy():
2376            if tid in self.runq_running or tid in self.sq_live:
2377                # Too late, task already running, not much we can do now
2378                self.pending_migrations.remove(tid)
2379                continue
2380
2381            valid = True
2382            # Check no tasks this covers are running
2383            for dep in self.sqdata.sq_covered_tasks[tid]:
2384                if dep in self.runq_running and dep not in self.runq_complete:
2385                    hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid))
2386                    valid = False
2387                    break
2388            if not valid:
2389                continue
2390
2391            self.pending_migrations.remove(tid)
2392            changed = True
2393
2394            if tid in self.tasks_scenequeue_done:
2395                self.tasks_scenequeue_done.remove(tid)
2396            for dep in self.sqdata.sq_covered_tasks[tid]:
2397                if dep in self.runq_complete and dep not in self.runq_tasksrun:
2398                    bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep)
2399                    self.failed_tids.append(tid)
2400                    self.rq.state = runQueueCleanUp
2401                    return
2402
2403                if dep not in self.runq_complete:
2404                    if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable:
2405                        self.tasks_scenequeue_done.remove(dep)
2406
2407            if tid in self.sq_buildable:
2408                self.sq_buildable.remove(tid)
2409            if tid in self.sq_running:
2410                self.sq_running.remove(tid)
2411            harddepfail = False
2412            for t in self.sqdata.sq_harddeps:
2413                if tid in self.sqdata.sq_harddeps[t] and t in self.scenequeue_notcovered:
2414                    harddepfail = True
2415                    break
2416            if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2417                if tid not in self.sq_buildable:
2418                    self.sq_buildable.add(tid)
2419            if not self.sqdata.sq_revdeps[tid]:
2420                self.sq_buildable.add(tid)
2421
2422            if tid in self.sqdata.outrightfail:
2423                self.sqdata.outrightfail.remove(tid)
2424            if tid in self.scenequeue_notcovered:
2425                self.scenequeue_notcovered.remove(tid)
2426            if tid in self.scenequeue_covered:
2427                self.scenequeue_covered.remove(tid)
2428            if tid in self.scenequeue_notneeded:
2429                self.scenequeue_notneeded.remove(tid)
2430
2431            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2432            self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True)
2433
2434            if tid in self.stampcache:
2435                del self.stampcache[tid]
2436
2437            if tid in self.build_stamps:
2438                del self.build_stamps[tid]
2439
2440            update_tasks.append((tid, harddepfail, tid in self.sqdata.valid))
2441
2442        if update_tasks:
2443            self.sqdone = False
2444            for tid in [t[0] for t in update_tasks]:
2445                h = pending_hash_index(tid, self.rqdata)
2446                if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]:
2447                    self.sq_deferred[tid] = self.sqdata.hashes[h]
2448                    bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h]))
2449            update_scenequeue_data([t[0] for t in update_tasks], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
2450
2451        for (tid, harddepfail, origvalid) in update_tasks:
2452            if tid in self.sqdata.valid and not origvalid:
2453                hashequiv_logger.verbose("Setscene task %s became valid" % tid)
2454            if harddepfail:
2455                self.sq_task_failoutright(tid)
2456
2457        if changed:
2458            self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
2459            self.holdoff_need_update = True
2460
2461    def scenequeue_updatecounters(self, task, fail=False):
2462
2463        for dep in sorted(self.sqdata.sq_deps[task]):
2464            if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]:
2465                if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered:
2466                    # dependency could be already processed, e.g. noexec setscene task
2467                    continue
2468                noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache)
2469                if noexec or stamppresent:
2470                    continue
2471                logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
2472                self.sq_task_failoutright(dep)
2473                continue
2474            if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2475                if dep not in self.sq_buildable:
2476                    self.sq_buildable.add(dep)
2477
2478        next = set([task])
2479        while next:
2480            new = set()
2481            for t in sorted(next):
2482                self.tasks_scenequeue_done.add(t)
2483                # Look down the dependency chain for non-setscene things which this task depends on
2484                # and mark as 'done'
2485                for dep in self.rqdata.runtaskentries[t].depends:
2486                    if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done:
2487                        continue
2488                    if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done):
2489                        new.add(dep)
2490            next = new
2491
2492        self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
2493        self.holdoff_need_update = True
2494
2495    def sq_task_completeoutright(self, task):
2496        """
2497        Mark a task as completed
2498        Look at the reverse dependencies and mark any task with
2499        completed dependencies as buildable
2500        """
2501
2502        logger.debug('Found task %s which could be accelerated', task)
2503        self.scenequeue_covered.add(task)
2504        self.scenequeue_updatecounters(task)
2505
2506    def sq_check_taskfail(self, task):
2507        if self.rqdata.setscene_ignore_tasks is not None:
2508            realtask = task.split('_setscene')[0]
2509            (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask)
2510            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2511            if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
2512                logger.error('Task %s.%s failed' % (pn, taskname + "_setscene"))
2513                self.rq.state = runQueueCleanUp
2514
2515    def sq_task_complete(self, task):
2516        bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
2517        self.sq_task_completeoutright(task)
2518
2519    def sq_task_fail(self, task, result):
2520        bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
2521        self.scenequeue_notcovered.add(task)
2522        self.scenequeue_updatecounters(task, True)
2523        self.sq_check_taskfail(task)
2524
2525    def sq_task_failoutright(self, task):
2526        self.sq_running.add(task)
2527        self.sq_buildable.add(task)
2528        self.scenequeue_notcovered.add(task)
2529        self.scenequeue_updatecounters(task, True)
2530
2531    def sq_task_skip(self, task):
2532        self.sq_running.add(task)
2533        self.sq_buildable.add(task)
2534        self.sq_task_completeoutright(task)
2535
2536    def sq_build_taskdepdata(self, task):
2537        def getsetscenedeps(tid):
2538            deps = set()
2539            (mc, fn, taskname, _) = split_tid_mcfn(tid)
2540            realtid = tid + "_setscene"
2541            idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends
2542            for (depname, idependtask) in idepends:
2543                if depname not in self.rqdata.taskData[mc].build_targets:
2544                    continue
2545
2546                depfn = self.rqdata.taskData[mc].build_targets[depname][0]
2547                if depfn is None:
2548                     continue
2549                deptid = depfn + ":" + idependtask.replace("_setscene", "")
2550                deps.add(deptid)
2551            return deps
2552
2553        taskdepdata = {}
2554        next = getsetscenedeps(task)
2555        next.add(task)
2556        while next:
2557            additional = []
2558            for revdep in next:
2559                (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2560                pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2561                deps = getsetscenedeps(revdep)
2562                provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
2563                taskhash = self.rqdata.runtaskentries[revdep].hash
2564                unihash = self.rqdata.runtaskentries[revdep].unihash
2565                taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash]
2566                for revdep2 in deps:
2567                    if revdep2 not in taskdepdata:
2568                        additional.append(revdep2)
2569            next = additional
2570
2571        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2572        return taskdepdata
2573
2574    def check_setscene_ignore_tasks(self, tid):
2575        # Check task that is going to run against the ignore tasks list
2576        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2577        # Ignore covered tasks
2578        if tid in self.tasks_covered:
2579            return False
2580        # Ignore stamped tasks
2581        if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache):
2582            return False
2583        # Ignore noexec tasks
2584        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2585        if 'noexec' in taskdep and taskname in taskdep['noexec']:
2586            return False
2587
2588        pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2589        if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
2590            if tid in self.rqdata.runq_setscene_tids:
2591                msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)]
2592            else:
2593                msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)]
2594            for t in self.scenequeue_notcovered:
2595                msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash))
2596            msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered))
2597            logger.error("".join(msg))
2598            return True
2599        return False
2600
2601class SQData(object):
2602    def __init__(self):
2603        # SceneQueue dependencies
2604        self.sq_deps = {}
2605        # SceneQueue reverse dependencies
2606        self.sq_revdeps = {}
2607        # Injected inter-setscene task dependencies
2608        self.sq_harddeps = {}
2609        # Cache of stamp files so duplicates can't run in parallel
2610        self.stamps = {}
2611        # Setscene tasks directly depended upon by the build
2612        self.unskippable = set()
2613        # List of setscene tasks which aren't present
2614        self.outrightfail = set()
2615        # A list of normal tasks a setscene task covers
2616        self.sq_covered_tasks = {}
2617
2618def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
2619
2620    sq_revdeps = {}
2621    sq_revdeps_squash = {}
2622    sq_collated_deps = {}
2623
2624    # We need to construct a dependency graph for the setscene functions. Intermediate
2625    # dependencies between the setscene tasks only complicate the code. This code
2626    # therefore aims to collapse the huge runqueue dependency tree into a smaller one
2627    # only containing the setscene functions.
2628
2629    rqdata.init_progress_reporter.next_stage()
2630
2631    # First process the chains up to the first setscene task.
2632    endpoints = {}
2633    for tid in rqdata.runtaskentries:
2634        sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps)
2635        sq_revdeps_squash[tid] = set()
2636        if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids:
2637            #bb.warn("Added endpoint %s" % (tid))
2638            endpoints[tid] = set()
2639
2640    rqdata.init_progress_reporter.next_stage()
2641
2642    # Secondly process the chains between setscene tasks.
2643    for tid in rqdata.runq_setscene_tids:
2644        sq_collated_deps[tid] = set()
2645        #bb.warn("Added endpoint 2 %s" % (tid))
2646        for dep in rqdata.runtaskentries[tid].depends:
2647                if tid in sq_revdeps[dep]:
2648                    sq_revdeps[dep].remove(tid)
2649                if dep not in endpoints:
2650                    endpoints[dep] = set()
2651                #bb.warn("  Added endpoint 3 %s" % (dep))
2652                endpoints[dep].add(tid)
2653
2654    rqdata.init_progress_reporter.next_stage()
2655
2656    def process_endpoints(endpoints):
2657        newendpoints = {}
2658        for point, task in endpoints.items():
2659            tasks = set()
2660            if task:
2661                tasks |= task
2662            if sq_revdeps_squash[point]:
2663                tasks |= sq_revdeps_squash[point]
2664            if point not in rqdata.runq_setscene_tids:
2665                for t in tasks:
2666                    sq_collated_deps[t].add(point)
2667            sq_revdeps_squash[point] = set()
2668            if point in rqdata.runq_setscene_tids:
2669                sq_revdeps_squash[point] = tasks
2670                continue
2671            for dep in rqdata.runtaskentries[point].depends:
2672                if point in sq_revdeps[dep]:
2673                    sq_revdeps[dep].remove(point)
2674                if tasks:
2675                    sq_revdeps_squash[dep] |= tasks
2676                if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids:
2677                    newendpoints[dep] = task
2678        if newendpoints:
2679            process_endpoints(newendpoints)
2680
2681    process_endpoints(endpoints)
2682
2683    rqdata.init_progress_reporter.next_stage()
2684
2685    # Build a list of tasks which are "unskippable"
2686    # These are direct endpoints referenced by the build upto and including setscene tasks
2687    # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon
2688    new = True
2689    for tid in rqdata.runtaskentries:
2690        if not rqdata.runtaskentries[tid].revdeps:
2691            sqdata.unskippable.add(tid)
2692    sqdata.unskippable |= sqrq.cantskip
2693    while new:
2694        new = False
2695        orig = sqdata.unskippable.copy()
2696        for tid in sorted(orig, reverse=True):
2697            if tid in rqdata.runq_setscene_tids:
2698                continue
2699            if not rqdata.runtaskentries[tid].depends:
2700                # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
2701                sqrq.setbuildable(tid)
2702            sqdata.unskippable |= rqdata.runtaskentries[tid].depends
2703            if sqdata.unskippable != orig:
2704                new = True
2705
2706    sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids)
2707
2708    rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries))
2709
2710    # Sanity check all dependencies could be changed to setscene task references
2711    for taskcounter, tid in enumerate(rqdata.runtaskentries):
2712        if tid in rqdata.runq_setscene_tids:
2713            pass
2714        elif sq_revdeps_squash[tid]:
2715            bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.")
2716        else:
2717            del sq_revdeps_squash[tid]
2718        rqdata.init_progress_reporter.update(taskcounter)
2719
2720    rqdata.init_progress_reporter.next_stage()
2721
2722    # Resolve setscene inter-task dependencies
2723    # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
2724    # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
2725    for tid in rqdata.runq_setscene_tids:
2726        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2727        realtid = tid + "_setscene"
2728        idepends = rqdata.taskData[mc].taskentries[realtid].idepends
2729        sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", rqdata.dataCaches[mc], taskfn, noextra=True)
2730        for (depname, idependtask) in idepends:
2731
2732            if depname not in rqdata.taskData[mc].build_targets:
2733                continue
2734
2735            depfn = rqdata.taskData[mc].build_targets[depname][0]
2736            if depfn is None:
2737                continue
2738            deptid = depfn + ":" + idependtask.replace("_setscene", "")
2739            if deptid not in rqdata.runtaskentries:
2740                bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask))
2741
2742            if not deptid in sqdata.sq_harddeps:
2743                sqdata.sq_harddeps[deptid] = set()
2744            sqdata.sq_harddeps[deptid].add(tid)
2745
2746            sq_revdeps_squash[tid].add(deptid)
2747            # Have to zero this to avoid circular dependencies
2748            sq_revdeps_squash[deptid] = set()
2749
2750    rqdata.init_progress_reporter.next_stage()
2751
2752    for task in sqdata.sq_harddeps:
2753        for dep in sqdata.sq_harddeps[task]:
2754            sq_revdeps_squash[dep].add(task)
2755
2756    rqdata.init_progress_reporter.next_stage()
2757
2758    #for tid in sq_revdeps_squash:
2759    #    data = ""
2760    #    for dep in sq_revdeps_squash[tid]:
2761    #        data = data + "\n   %s" % dep
2762    #    bb.warn("Task %s_setscene: is %s " % (tid, data))
2763
2764    sqdata.sq_revdeps = sq_revdeps_squash
2765    sqdata.sq_covered_tasks = sq_collated_deps
2766
2767    # Build reverse version of revdeps to populate deps structure
2768    for tid in sqdata.sq_revdeps:
2769        sqdata.sq_deps[tid] = set()
2770    for tid in sqdata.sq_revdeps:
2771        for dep in sqdata.sq_revdeps[tid]:
2772            sqdata.sq_deps[dep].add(tid)
2773
2774    rqdata.init_progress_reporter.next_stage()
2775
2776    sqdata.multiconfigs = set()
2777    for tid in sqdata.sq_revdeps:
2778        sqdata.multiconfigs.add(mc_from_tid(tid))
2779        if not sqdata.sq_revdeps[tid]:
2780            sqrq.sq_buildable.add(tid)
2781
2782    rqdata.init_progress_reporter.finish()
2783
2784    sqdata.noexec = set()
2785    sqdata.stamppresent = set()
2786    sqdata.valid = set()
2787
2788    sqdata.hashes = {}
2789    sqrq.sq_deferred = {}
2790    for mc in sorted(sqdata.multiconfigs):
2791        for tid in sorted(sqdata.sq_revdeps):
2792            if mc_from_tid(tid) != mc:
2793                continue
2794            h = pending_hash_index(tid, rqdata)
2795            if h not in sqdata.hashes:
2796                sqdata.hashes[h] = tid
2797            else:
2798                sqrq.sq_deferred[tid] = sqdata.hashes[h]
2799                bb.note("Deferring %s after %s" % (tid, sqdata.hashes[h]))
2800
2801    update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True)
2802
2803    # Compute a list of 'stale' sstate tasks where the current hash does not match the one
2804    # in any stamp files. Pass the list out to metadata as an event.
2805    found = {}
2806    for tid in rqdata.runq_setscene_tids:
2807        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2808        stamps = bb.build.find_stale_stamps(taskname, rqdata.dataCaches[mc], taskfn)
2809        if stamps:
2810            if mc not in found:
2811                found[mc] = {}
2812            found[mc][tid] = stamps
2813    for mc in found:
2814        event = bb.event.StaleSetSceneTasks(found[mc])
2815        bb.event.fire(event, cooker.databuilder.mcdata[mc])
2816
2817def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False):
2818
2819    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2820
2821    taskdep = rqdata.dataCaches[mc].task_deps[taskfn]
2822
2823    if 'noexec' in taskdep and taskname in taskdep['noexec']:
2824        bb.build.make_stamp(taskname + "_setscene", rqdata.dataCaches[mc], taskfn)
2825        return True, False
2826
2827    if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache):
2828        logger.debug2('Setscene stamp current for task %s', tid)
2829        return False, True
2830
2831    if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache):
2832        logger.debug2('Normal stamp current for task %s', tid)
2833        return False, True
2834
2835    return False, False
2836
2837def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True):
2838
2839    tocheck = set()
2840
2841    for tid in sorted(tids):
2842        if tid in sqdata.stamppresent:
2843            sqdata.stamppresent.remove(tid)
2844        if tid in sqdata.valid:
2845            sqdata.valid.remove(tid)
2846        if tid in sqdata.outrightfail:
2847            sqdata.outrightfail.remove(tid)
2848
2849        noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True)
2850
2851        if noexec:
2852            sqdata.noexec.add(tid)
2853            sqrq.sq_task_skip(tid)
2854            continue
2855
2856        if stamppresent:
2857            sqdata.stamppresent.add(tid)
2858            sqrq.sq_task_skip(tid)
2859            continue
2860
2861        tocheck.add(tid)
2862
2863    sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary)
2864
2865    for tid in tids:
2866        if tid in sqdata.stamppresent:
2867            continue
2868        if tid in sqdata.valid:
2869            continue
2870        if tid in sqdata.noexec:
2871            continue
2872        if tid in sqrq.scenequeue_covered:
2873            continue
2874        if tid in sqrq.scenequeue_notcovered:
2875            continue
2876        if tid in sqrq.sq_deferred:
2877            continue
2878        sqdata.outrightfail.add(tid)
2879
2880class TaskFailure(Exception):
2881    """
2882    Exception raised when a task in a runqueue fails
2883    """
2884    def __init__(self, x):
2885        self.args = x
2886
2887
2888class runQueueExitWait(bb.event.Event):
2889    """
2890    Event when waiting for task processes to exit
2891    """
2892
2893    def __init__(self, remain):
2894        self.remain = remain
2895        self.message = "Waiting for %s active tasks to finish" % remain
2896        bb.event.Event.__init__(self)
2897
2898class runQueueEvent(bb.event.Event):
2899    """
2900    Base runQueue event class
2901    """
2902    def __init__(self, task, stats, rq):
2903        self.taskid = task
2904        self.taskstring = task
2905        self.taskname = taskname_from_tid(task)
2906        self.taskfile = fn_from_tid(task)
2907        self.taskhash = rq.rqdata.get_task_hash(task)
2908        self.stats = stats.copy()
2909        bb.event.Event.__init__(self)
2910
2911class sceneQueueEvent(runQueueEvent):
2912    """
2913    Base sceneQueue event class
2914    """
2915    def __init__(self, task, stats, rq, noexec=False):
2916        runQueueEvent.__init__(self, task, stats, rq)
2917        self.taskstring = task + "_setscene"
2918        self.taskname = taskname_from_tid(task) + "_setscene"
2919        self.taskfile = fn_from_tid(task)
2920        self.taskhash = rq.rqdata.get_task_hash(task)
2921
2922class runQueueTaskStarted(runQueueEvent):
2923    """
2924    Event notifying a task was started
2925    """
2926    def __init__(self, task, stats, rq, noexec=False):
2927        runQueueEvent.__init__(self, task, stats, rq)
2928        self.noexec = noexec
2929
2930class sceneQueueTaskStarted(sceneQueueEvent):
2931    """
2932    Event notifying a setscene task was started
2933    """
2934    def __init__(self, task, stats, rq, noexec=False):
2935        sceneQueueEvent.__init__(self, task, stats, rq)
2936        self.noexec = noexec
2937
2938class runQueueTaskFailed(runQueueEvent):
2939    """
2940    Event notifying a task failed
2941    """
2942    def __init__(self, task, stats, exitcode, rq, fakeroot_log=None):
2943        runQueueEvent.__init__(self, task, stats, rq)
2944        self.exitcode = exitcode
2945        self.fakeroot_log = fakeroot_log
2946
2947    def __str__(self):
2948        if self.fakeroot_log:
2949            return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log)
2950        else:
2951            return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode)
2952
2953class sceneQueueTaskFailed(sceneQueueEvent):
2954    """
2955    Event notifying a setscene task failed
2956    """
2957    def __init__(self, task, stats, exitcode, rq):
2958        sceneQueueEvent.__init__(self, task, stats, rq)
2959        self.exitcode = exitcode
2960
2961    def __str__(self):
2962        return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode)
2963
2964class sceneQueueComplete(sceneQueueEvent):
2965    """
2966    Event when all the sceneQueue tasks are complete
2967    """
2968    def __init__(self, stats, rq):
2969        self.stats = stats.copy()
2970        bb.event.Event.__init__(self)
2971
2972class runQueueTaskCompleted(runQueueEvent):
2973    """
2974    Event notifying a task completed
2975    """
2976
2977class sceneQueueTaskCompleted(sceneQueueEvent):
2978    """
2979    Event notifying a setscene task completed
2980    """
2981
2982class runQueueTaskSkipped(runQueueEvent):
2983    """
2984    Event notifying a task was skipped
2985    """
2986    def __init__(self, task, stats, rq, reason):
2987        runQueueEvent.__init__(self, task, stats, rq)
2988        self.reason = reason
2989
2990class taskUniHashUpdate(bb.event.Event):
2991    """
2992    Base runQueue event class
2993    """
2994    def __init__(self, task, unihash):
2995        self.taskid = task
2996        self.unihash = unihash
2997        bb.event.Event.__init__(self)
2998
2999class runQueuePipe():
3000    """
3001    Abstraction for a pipe between a worker thread and the server
3002    """
3003    def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None):
3004        self.input = pipein
3005        if pipeout:
3006            pipeout.close()
3007        bb.utils.nonblockingfd(self.input)
3008        self.queue = b""
3009        self.d = d
3010        self.rq = rq
3011        self.rqexec = rqexec
3012        self.fakerootlogs = fakerootlogs
3013
3014    def setrunqueueexec(self, rqexec):
3015        self.rqexec = rqexec
3016
3017    def read(self):
3018        for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]:
3019            for worker in workers.values():
3020                worker.process.poll()
3021                if worker.process.returncode is not None and not self.rq.teardown:
3022                    bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode)))
3023                    self.rq.finish_runqueue(True)
3024
3025        start = len(self.queue)
3026        try:
3027            self.queue = self.queue + (self.input.read(102400) or b"")
3028        except (OSError, IOError) as e:
3029            if e.errno != errno.EAGAIN:
3030                raise
3031        end = len(self.queue)
3032        found = True
3033        while found and self.queue:
3034            found = False
3035            index = self.queue.find(b"</event>")
3036            while index != -1 and self.queue.startswith(b"<event>"):
3037                try:
3038                    event = pickle.loads(self.queue[7:index])
3039                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3040                    if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e):
3041                        # The pickled data could contain "</event>" so search for the next occurance
3042                        # unpickling again, this should be the only way an unpickle error could occur
3043                        index = self.queue.find(b"</event>", index + 1)
3044                        continue
3045                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
3046                bb.event.fire_from_worker(event, self.d)
3047                if isinstance(event, taskUniHashUpdate):
3048                    self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash))
3049                found = True
3050                self.queue = self.queue[index+8:]
3051                index = self.queue.find(b"</event>")
3052            index = self.queue.find(b"</exitcode>")
3053            while index != -1 and self.queue.startswith(b"<exitcode>"):
3054                try:
3055                    task, status = pickle.loads(self.queue[10:index])
3056                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3057                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
3058                (_, _, _, taskfn) = split_tid_mcfn(task)
3059                fakerootlog = None
3060                if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs:
3061                    fakerootlog = self.fakerootlogs[taskfn]
3062                self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog)
3063                found = True
3064                self.queue = self.queue[index+11:]
3065                index = self.queue.find(b"</exitcode>")
3066        return (end > start)
3067
3068    def close(self):
3069        while self.read():
3070            continue
3071        if self.queue:
3072            print("Warning, worker left partial message: %s" % self.queue)
3073        self.input.close()
3074
3075def get_setscene_enforce_ignore_tasks(d, targets):
3076    if d.getVar('BB_SETSCENE_ENFORCE') != '1':
3077        return None
3078    ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split()
3079    outlist = []
3080    for item in ignore_tasks[:]:
3081        if item.startswith('%:'):
3082            for (mc, target, task, fn) in targets:
3083                outlist.append(target + ':' + item.split(':')[1])
3084        else:
3085            outlist.append(item)
3086    return outlist
3087
3088def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks):
3089    import fnmatch
3090    if ignore_tasks is not None:
3091        item = '%s:%s' % (pn, taskname)
3092        for ignore_tasks in ignore_tasks:
3093            if fnmatch.fnmatch(item, ignore_tasks):
3094                return True
3095        return False
3096    return True
3097