xref: /openbmc/openbmc/poky/bitbake/lib/bb/runqueue.py (revision 5c833f23)
1"""
2BitBake 'RunQueue' implementation
3
4Handles preparation and execution of a queue of tasks
5"""
6
7# Copyright (C) 2006-2007  Richard Purdie
8#
9# SPDX-License-Identifier: GPL-2.0-only
10#
11
12import copy
13import os
14import sys
15import stat
16import errno
17import logging
18import re
19import bb
20from bb import msg, event
21from bb import monitordisk
22import subprocess
23import pickle
24from multiprocessing import Process
25import shlex
26import pprint
27import time
28
29bblogger = logging.getLogger("BitBake")
30logger = logging.getLogger("BitBake.RunQueue")
31hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv")
32
33__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' )
34
35def fn_from_tid(tid):
36     return tid.rsplit(":", 1)[0]
37
38def taskname_from_tid(tid):
39    return tid.rsplit(":", 1)[1]
40
41def mc_from_tid(tid):
42    if tid.startswith('mc:') and tid.count(':') >= 2:
43        return tid.split(':')[1]
44    return ""
45
46def split_tid(tid):
47    (mc, fn, taskname, _) = split_tid_mcfn(tid)
48    return (mc, fn, taskname)
49
50def split_mc(n):
51    if n.startswith("mc:") and n.count(':') >= 2:
52        _, mc, n = n.split(":", 2)
53        return (mc, n)
54    return ('', n)
55
56def split_tid_mcfn(tid):
57    if tid.startswith('mc:') and tid.count(':') >= 2:
58        elems = tid.split(':')
59        mc = elems[1]
60        fn = ":".join(elems[2:-1])
61        taskname = elems[-1]
62        mcfn = "mc:" + mc + ":" + fn
63    else:
64        tid = tid.rsplit(":", 1)
65        mc = ""
66        fn = tid[0]
67        taskname = tid[1]
68        mcfn = fn
69
70    return (mc, fn, taskname, mcfn)
71
72def build_tid(mc, fn, taskname):
73    if mc:
74        return "mc:" + mc + ":" + fn + ":" + taskname
75    return fn + ":" + taskname
76
77# Index used to pair up potentially matching multiconfig tasks
78# We match on PN, taskname and hash being equal
79def pending_hash_index(tid, rqdata):
80    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
81    pn = rqdata.dataCaches[mc].pkg_fn[taskfn]
82    h = rqdata.runtaskentries[tid].unihash
83    return pn + ":" + "taskname" + h
84
85class RunQueueStats:
86    """
87    Holds statistics on the tasks handled by the associated runQueue
88    """
89    def __init__(self, total, setscene_total):
90        self.completed = 0
91        self.skipped = 0
92        self.failed = 0
93        self.active = 0
94        self.setscene_active = 0
95        self.setscene_covered = 0
96        self.setscene_notcovered = 0
97        self.setscene_total = setscene_total
98        self.total = total
99
100    def copy(self):
101        obj = self.__class__(self.total, self.setscene_total)
102        obj.__dict__.update(self.__dict__)
103        return obj
104
105    def taskFailed(self):
106        self.active = self.active - 1
107        self.failed = self.failed + 1
108
109    def taskCompleted(self):
110        self.active = self.active - 1
111        self.completed = self.completed + 1
112
113    def taskSkipped(self):
114        self.active = self.active + 1
115        self.skipped = self.skipped + 1
116
117    def taskActive(self):
118        self.active = self.active + 1
119
120    def updateCovered(self, covered, notcovered):
121        self.setscene_covered = covered
122        self.setscene_notcovered = notcovered
123
124    def updateActiveSetscene(self, active):
125        self.setscene_active = active
126
127# These values indicate the next step due to be run in the
128# runQueue state machine
129runQueuePrepare = 2
130runQueueSceneInit = 3
131runQueueRunning = 6
132runQueueFailed = 7
133runQueueCleanUp = 8
134runQueueComplete = 9
135
136class RunQueueScheduler(object):
137    """
138    Control the order tasks are scheduled in.
139    """
140    name = "basic"
141
142    def __init__(self, runqueue, rqdata):
143        """
144        The default scheduler just returns the first buildable task (the
145        priority map is sorted by task number)
146        """
147        self.rq = runqueue
148        self.rqdata = rqdata
149        self.numTasks = len(self.rqdata.runtaskentries)
150
151        self.prio_map = [self.rqdata.runtaskentries.keys()]
152
153        self.buildable = set()
154        self.skip_maxthread = {}
155        self.stamps = {}
156        for tid in self.rqdata.runtaskentries:
157            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
158            self.stamps[tid] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
159            if tid in self.rq.runq_buildable:
160                self.buildable.append(tid)
161
162        self.rev_prio_map = None
163        self.is_pressure_usable()
164
165    def is_pressure_usable(self):
166        """
167        If monitoring pressure, return True if pressure files can be open and read. For example
168        openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported)
169        is returned.
170        """
171        if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure:
172            try:
173                with open("/proc/pressure/cpu") as cpu_pressure_fds, \
174                    open("/proc/pressure/io") as io_pressure_fds, \
175                    open("/proc/pressure/memory") as memory_pressure_fds:
176
177                    self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
178                    self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
179                    self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
180                    self.prev_pressure_time = time.time()
181                self.check_pressure = True
182            except:
183                bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure")
184                self.check_pressure = False
185        else:
186            self.check_pressure = False
187
188    def exceeds_max_pressure(self):
189        """
190        Monitor the difference in total pressure at least once per second, if
191        BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold.
192        """
193        if self.check_pressure:
194            with open("/proc/pressure/cpu") as cpu_pressure_fds, \
195                open("/proc/pressure/io") as io_pressure_fds, \
196                open("/proc/pressure/memory") as memory_pressure_fds:
197                # extract "total" from /proc/pressure/{cpu|io}
198                curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
199                curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
200                curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
201                exceeds_cpu_pressure =  self.rq.max_cpu_pressure and (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) > self.rq.max_cpu_pressure
202                exceeds_io_pressure =  self.rq.max_io_pressure and (float(curr_io_pressure) - float(self.prev_io_pressure)) > self.rq.max_io_pressure
203                exceeds_memory_pressure = self.rq.max_memory_pressure and (float(curr_memory_pressure) - float(self.prev_memory_pressure)) > self.rq.max_memory_pressure
204                now = time.time()
205                if now - self.prev_pressure_time > 1.0:
206                    self.prev_cpu_pressure = curr_cpu_pressure
207                    self.prev_io_pressure = curr_io_pressure
208                    self.prev_memory_pressure = curr_memory_pressure
209                    self.prev_pressure_time = now
210            return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure)
211        return False
212
213    def next_buildable_task(self):
214        """
215        Return the id of the first task we find that is buildable
216        """
217        # Once tasks are running we don't need to worry about them again
218        self.buildable.difference_update(self.rq.runq_running)
219        buildable = set(self.buildable)
220        buildable.difference_update(self.rq.holdoff_tasks)
221        buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered)
222        if not buildable:
223            return None
224
225        # Bitbake requires that at least one task be active. Only check for pressure if
226        # this is the case, otherwise the pressure limitation could result in no tasks
227        # being active and no new tasks started thereby, at times, breaking the scheduler.
228        if self.rq.stats.active and self.exceeds_max_pressure():
229            return None
230
231        # Filter out tasks that have a max number of threads that have been exceeded
232        skip_buildable = {}
233        for running in self.rq.runq_running.difference(self.rq.runq_complete):
234            rtaskname = taskname_from_tid(running)
235            if rtaskname not in self.skip_maxthread:
236                self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads")
237            if not self.skip_maxthread[rtaskname]:
238                continue
239            if rtaskname in skip_buildable:
240                skip_buildable[rtaskname] += 1
241            else:
242                skip_buildable[rtaskname] = 1
243
244        if len(buildable) == 1:
245            tid = buildable.pop()
246            taskname = taskname_from_tid(tid)
247            if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
248                return None
249            stamp = self.stamps[tid]
250            if stamp not in self.rq.build_stamps.values():
251                return tid
252
253        if not self.rev_prio_map:
254            self.rev_prio_map = {}
255            for tid in self.rqdata.runtaskentries:
256                self.rev_prio_map[tid] = self.prio_map.index(tid)
257
258        best = None
259        bestprio = None
260        for tid in buildable:
261            taskname = taskname_from_tid(tid)
262            if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
263                continue
264            prio = self.rev_prio_map[tid]
265            if bestprio is None or bestprio > prio:
266                stamp = self.stamps[tid]
267                if stamp in self.rq.build_stamps.values():
268                    continue
269                bestprio = prio
270                best = tid
271
272        return best
273
274    def next(self):
275        """
276        Return the id of the task we should build next
277        """
278        if self.rq.can_start_task():
279            return self.next_buildable_task()
280
281    def newbuildable(self, task):
282        self.buildable.add(task)
283
284    def removebuildable(self, task):
285        self.buildable.remove(task)
286
287    def describe_task(self, taskid):
288        result = 'ID %s' % taskid
289        if self.rev_prio_map:
290            result = result + (' pri %d' % self.rev_prio_map[taskid])
291        return result
292
293    def dump_prio(self, comment):
294        bb.debug(3, '%s (most important first):\n%s' %
295                 (comment,
296                  '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for
297                             index, taskid in enumerate(self.prio_map)])))
298
299class RunQueueSchedulerSpeed(RunQueueScheduler):
300    """
301    A scheduler optimised for speed. The priority map is sorted by task weight,
302    heavier weighted tasks (tasks needed by the most other tasks) are run first.
303    """
304    name = "speed"
305
306    def __init__(self, runqueue, rqdata):
307        """
308        The priority map is sorted by task weight.
309        """
310        RunQueueScheduler.__init__(self, runqueue, rqdata)
311
312        weights = {}
313        for tid in self.rqdata.runtaskentries:
314            weight = self.rqdata.runtaskentries[tid].weight
315            if not weight in weights:
316                weights[weight] = []
317            weights[weight].append(tid)
318
319        self.prio_map = []
320        for weight in sorted(weights):
321            for w in weights[weight]:
322                self.prio_map.append(w)
323
324        self.prio_map.reverse()
325
326class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
327    """
328    A scheduler optimised to complete .bb files as quickly as possible. The
329    priority map is sorted by task weight, but then reordered so once a given
330    .bb file starts to build, it's completed as quickly as possible by
331    running all tasks related to the same .bb file one after the after.
332    This works well where disk space is at a premium and classes like OE's
333    rm_work are in force.
334    """
335    name = "completion"
336
337    def __init__(self, runqueue, rqdata):
338        super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata)
339
340        # Extract list of tasks for each recipe, with tasks sorted
341        # ascending from "must run first" (typically do_fetch) to
342        # "runs last" (do_build). The speed scheduler prioritizes
343        # tasks that must run first before the ones that run later;
344        # this is what we depend on here.
345        task_lists = {}
346        for taskid in self.prio_map:
347            fn, taskname = taskid.rsplit(':', 1)
348            task_lists.setdefault(fn, []).append(taskname)
349
350        # Now unify the different task lists. The strategy is that
351        # common tasks get skipped and new ones get inserted after the
352        # preceeding common one(s) as they are found. Because task
353        # lists should differ only by their number of tasks, but not
354        # the ordering of the common tasks, this should result in a
355        # deterministic result that is a superset of the individual
356        # task ordering.
357        all_tasks = []
358        for recipe, new_tasks in task_lists.items():
359            index = 0
360            old_task = all_tasks[index] if index < len(all_tasks) else None
361            for new_task in new_tasks:
362                if old_task == new_task:
363                    # Common task, skip it. This is the fast-path which
364                    # avoids a full search.
365                    index += 1
366                    old_task = all_tasks[index] if index < len(all_tasks) else None
367                else:
368                    try:
369                        index = all_tasks.index(new_task)
370                        # Already present, just not at the current
371                        # place. We re-synchronized by changing the
372                        # index so that it matches again. Now
373                        # move on to the next existing task.
374                        index += 1
375                        old_task = all_tasks[index] if index < len(all_tasks) else None
376                    except ValueError:
377                        # Not present. Insert before old_task, which
378                        # remains the same (but gets shifted back).
379                        all_tasks.insert(index, new_task)
380                        index += 1
381        bb.debug(3, 'merged task list: %s'  % all_tasks)
382
383        # Now reverse the order so that tasks that finish the work on one
384        # recipe are considered more imporant (= come first). The ordering
385        # is now so that do_build is most important.
386        all_tasks.reverse()
387
388        # Group tasks of the same kind before tasks of less important
389        # kinds at the head of the queue (because earlier = lower
390        # priority number = runs earlier), while preserving the
391        # ordering by recipe. If recipe foo is more important than
392        # bar, then the goal is to work on foo's do_populate_sysroot
393        # before bar's do_populate_sysroot and on the more important
394        # tasks of foo before any of the less important tasks in any
395        # other recipe (if those other recipes are more important than
396        # foo).
397        #
398        # All of this only applies when tasks are runable. Explicit
399        # dependencies still override this ordering by priority.
400        #
401        # Here's an example why this priority re-ordering helps with
402        # minimizing disk usage. Consider a recipe foo with a higher
403        # priority than bar where foo DEPENDS on bar. Then the
404        # implicit rule (from base.bbclass) is that foo's do_configure
405        # depends on bar's do_populate_sysroot. This ensures that
406        # bar's do_populate_sysroot gets done first. Normally the
407        # tasks from foo would continue to run once that is done, and
408        # bar only gets completed and cleaned up later. By ordering
409        # bar's task that depend on bar's do_populate_sysroot before foo's
410        # do_configure, that problem gets avoided.
411        task_index = 0
412        self.dump_prio('original priorities')
413        for task in all_tasks:
414            for index in range(task_index, self.numTasks):
415                taskid = self.prio_map[index]
416                taskname = taskid.rsplit(':', 1)[1]
417                if taskname == task:
418                    del self.prio_map[index]
419                    self.prio_map.insert(task_index, taskid)
420                    task_index += 1
421        self.dump_prio('completion priorities')
422
423class RunTaskEntry(object):
424    def __init__(self):
425        self.depends = set()
426        self.revdeps = set()
427        self.hash = None
428        self.unihash = None
429        self.task = None
430        self.weight = 1
431
432class RunQueueData:
433    """
434    BitBake Run Queue implementation
435    """
436    def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets):
437        self.cooker = cooker
438        self.dataCaches = dataCaches
439        self.taskData = taskData
440        self.targets = targets
441        self.rq = rq
442        self.warn_multi_bb = False
443
444        self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split()
445        self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets)
446        self.setscene_ignore_tasks_checked = False
447        self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1")
448        self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter()
449
450        self.reset()
451
452    def reset(self):
453        self.runtaskentries = {}
454
455    def runq_depends_names(self, ids):
456        import re
457        ret = []
458        for id in ids:
459            nam = os.path.basename(id)
460            nam = re.sub("_[^,]*,", ",", nam)
461            ret.extend([nam])
462        return ret
463
464    def get_task_hash(self, tid):
465        return self.runtaskentries[tid].hash
466
467    def get_task_unihash(self, tid):
468        return self.runtaskentries[tid].unihash
469
470    def get_user_idstring(self, tid, task_name_suffix = ""):
471        return tid + task_name_suffix
472
473    def get_short_user_idstring(self, task, task_name_suffix = ""):
474        (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
475        pn = self.dataCaches[mc].pkg_fn[taskfn]
476        taskname = taskname_from_tid(task) + task_name_suffix
477        return "%s:%s" % (pn, taskname)
478
479    def circular_depchains_handler(self, tasks):
480        """
481        Some tasks aren't buildable, likely due to circular dependency issues.
482        Identify the circular dependencies and print them in a user readable format.
483        """
484        from copy import deepcopy
485
486        valid_chains = []
487        explored_deps = {}
488        msgs = []
489
490        class TooManyLoops(Exception):
491            pass
492
493        def chain_reorder(chain):
494            """
495            Reorder a dependency chain so the lowest task id is first
496            """
497            lowest = 0
498            new_chain = []
499            for entry in range(len(chain)):
500                if chain[entry] < chain[lowest]:
501                    lowest = entry
502            new_chain.extend(chain[lowest:])
503            new_chain.extend(chain[:lowest])
504            return new_chain
505
506        def chain_compare_equal(chain1, chain2):
507            """
508            Compare two dependency chains and see if they're the same
509            """
510            if len(chain1) != len(chain2):
511                return False
512            for index in range(len(chain1)):
513                if chain1[index] != chain2[index]:
514                    return False
515            return True
516
517        def chain_array_contains(chain, chain_array):
518            """
519            Return True if chain_array contains chain
520            """
521            for ch in chain_array:
522                if chain_compare_equal(ch, chain):
523                    return True
524            return False
525
526        def find_chains(tid, prev_chain):
527            prev_chain.append(tid)
528            total_deps = []
529            total_deps.extend(self.runtaskentries[tid].revdeps)
530            for revdep in self.runtaskentries[tid].revdeps:
531                if revdep in prev_chain:
532                    idx = prev_chain.index(revdep)
533                    # To prevent duplicates, reorder the chain to start with the lowest taskid
534                    # and search through an array of those we've already printed
535                    chain = prev_chain[idx:]
536                    new_chain = chain_reorder(chain)
537                    if not chain_array_contains(new_chain, valid_chains):
538                        valid_chains.append(new_chain)
539                        msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
540                        for dep in new_chain:
541                            msgs.append("  Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends)))
542                        msgs.append("\n")
543                    if len(valid_chains) > 10:
544                        msgs.append("Halted dependency loops search after 10 matches.\n")
545                        raise TooManyLoops
546                    continue
547                scan = False
548                if revdep not in explored_deps:
549                    scan = True
550                elif revdep in explored_deps[revdep]:
551                    scan = True
552                else:
553                    for dep in prev_chain:
554                        if dep in explored_deps[revdep]:
555                            scan = True
556                if scan:
557                    find_chains(revdep, copy.deepcopy(prev_chain))
558                for dep in explored_deps[revdep]:
559                    if dep not in total_deps:
560                        total_deps.append(dep)
561
562            explored_deps[tid] = total_deps
563
564        try:
565            for task in tasks:
566                find_chains(task, [])
567        except TooManyLoops:
568            pass
569
570        return msgs
571
572    def calculate_task_weights(self, endpoints):
573        """
574        Calculate a number representing the "weight" of each task. Heavier weighted tasks
575        have more dependencies and hence should be executed sooner for maximum speed.
576
577        This function also sanity checks the task list finding tasks that are not
578        possible to execute due to circular dependencies.
579        """
580
581        numTasks = len(self.runtaskentries)
582        weight = {}
583        deps_left = {}
584        task_done = {}
585
586        for tid in self.runtaskentries:
587            task_done[tid] = False
588            weight[tid] = 1
589            deps_left[tid] = len(self.runtaskentries[tid].revdeps)
590
591        for tid in endpoints:
592            weight[tid] = 10
593            task_done[tid] = True
594
595        while True:
596            next_points = []
597            for tid in endpoints:
598                for revdep in self.runtaskentries[tid].depends:
599                    weight[revdep] = weight[revdep] + weight[tid]
600                    deps_left[revdep] = deps_left[revdep] - 1
601                    if deps_left[revdep] == 0:
602                        next_points.append(revdep)
603                        task_done[revdep] = True
604            endpoints = next_points
605            if not next_points:
606                break
607
608        # Circular dependency sanity check
609        problem_tasks = []
610        for tid in self.runtaskentries:
611            if task_done[tid] is False or deps_left[tid] != 0:
612                problem_tasks.append(tid)
613                logger.debug2("Task %s is not buildable", tid)
614                logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid])
615            self.runtaskentries[tid].weight = weight[tid]
616
617        if problem_tasks:
618            message = "%s unbuildable tasks were found.\n" % len(problem_tasks)
619            message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
620            message = message + "Identifying dependency loops (this may take a short while)...\n"
621            logger.error(message)
622
623            msgs = self.circular_depchains_handler(problem_tasks)
624
625            message = "\n"
626            for msg in msgs:
627                message = message + msg
628            bb.msg.fatal("RunQueue", message)
629
630        return weight
631
632    def prepare(self):
633        """
634        Turn a set of taskData into a RunQueue and compute data needed
635        to optimise the execution order.
636        """
637
638        runq_build = {}
639        recursivetasks = {}
640        recursiveitasks = {}
641        recursivetasksselfref = set()
642
643        taskData = self.taskData
644
645        found = False
646        for mc in self.taskData:
647            if taskData[mc].taskentries:
648                found = True
649                break
650        if not found:
651            # Nothing to do
652            return 0
653
654        self.init_progress_reporter.start()
655        self.init_progress_reporter.next_stage()
656
657        # Step A - Work out a list of tasks to run
658        #
659        # Taskdata gives us a list of possible providers for every build and run
660        # target ordered by priority. It also gives information on each of those
661        # providers.
662        #
663        # To create the actual list of tasks to execute we fix the list of
664        # providers and then resolve the dependencies into task IDs. This
665        # process is repeated for each type of dependency (tdepends, deptask,
666        # rdeptast, recrdeptask, idepends).
667
668        def add_build_dependencies(depids, tasknames, depends, mc):
669            for depname in depids:
670                # Won't be in build_targets if ASSUME_PROVIDED
671                if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]:
672                    continue
673                depdata = taskData[mc].build_targets[depname][0]
674                if depdata is None:
675                    continue
676                for taskname in tasknames:
677                    t = depdata + ":" + taskname
678                    if t in taskData[mc].taskentries:
679                        depends.add(t)
680
681        def add_runtime_dependencies(depids, tasknames, depends, mc):
682            for depname in depids:
683                if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]:
684                    continue
685                depdata = taskData[mc].run_targets[depname][0]
686                if depdata is None:
687                    continue
688                for taskname in tasknames:
689                    t = depdata + ":" + taskname
690                    if t in taskData[mc].taskentries:
691                        depends.add(t)
692
693        def add_mc_dependencies(mc, tid):
694            mcdeps = taskData[mc].get_mcdepends()
695            for dep in mcdeps:
696                mcdependency = dep.split(':')
697                pn = mcdependency[3]
698                frommc = mcdependency[1]
699                mcdep = mcdependency[2]
700                deptask = mcdependency[4]
701                if mc == frommc:
702                    fn = taskData[mcdep].build_targets[pn][0]
703                    newdep = '%s:%s' % (fn,deptask)
704                    taskData[mc].taskentries[tid].tdepends.append(newdep)
705
706        for mc in taskData:
707            for tid in taskData[mc].taskentries:
708
709                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
710                #runtid = build_tid(mc, fn, taskname)
711
712                #logger.debug2("Processing %s,%s:%s", mc, fn, taskname)
713
714                depends = set()
715                task_deps = self.dataCaches[mc].task_deps[taskfn]
716
717                self.runtaskentries[tid] = RunTaskEntry()
718
719                if fn in taskData[mc].failed_fns:
720                    continue
721
722                # We add multiconfig dependencies before processing internal task deps (tdepends)
723                if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']:
724                    add_mc_dependencies(mc, tid)
725
726                # Resolve task internal dependencies
727                #
728                # e.g. addtask before X after Y
729                for t in taskData[mc].taskentries[tid].tdepends:
730                    (depmc, depfn, deptaskname, _) = split_tid_mcfn(t)
731                    depends.add(build_tid(depmc, depfn, deptaskname))
732
733                # Resolve 'deptask' dependencies
734                #
735                # e.g. do_sometask[deptask] = "do_someothertask"
736                # (makes sure sometask runs after someothertask of all DEPENDS)
737                if 'deptask' in task_deps and taskname in task_deps['deptask']:
738                    tasknames = task_deps['deptask'][taskname].split()
739                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
740
741                # Resolve 'rdeptask' dependencies
742                #
743                # e.g. do_sometask[rdeptask] = "do_someothertask"
744                # (makes sure sometask runs after someothertask of all RDEPENDS)
745                if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']:
746                    tasknames = task_deps['rdeptask'][taskname].split()
747                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
748
749                # Resolve inter-task dependencies
750                #
751                # e.g. do_sometask[depends] = "targetname:do_someothertask"
752                # (makes sure sometask runs after targetname's someothertask)
753                idepends = taskData[mc].taskentries[tid].idepends
754                for (depname, idependtask) in idepends:
755                    if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps:
756                        # Won't be in build_targets if ASSUME_PROVIDED
757                        depdata = taskData[mc].build_targets[depname][0]
758                        if depdata is not None:
759                            t = depdata + ":" + idependtask
760                            depends.add(t)
761                            if t not in taskData[mc].taskentries:
762                                bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
763                irdepends = taskData[mc].taskentries[tid].irdepends
764                for (depname, idependtask) in irdepends:
765                    if depname in taskData[mc].run_targets:
766                        # Won't be in run_targets if ASSUME_PROVIDED
767                        if not taskData[mc].run_targets[depname]:
768                            continue
769                        depdata = taskData[mc].run_targets[depname][0]
770                        if depdata is not None:
771                            t = depdata + ":" + idependtask
772                            depends.add(t)
773                            if t not in taskData[mc].taskentries:
774                                bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
775
776                # Resolve recursive 'recrdeptask' dependencies (Part A)
777                #
778                # e.g. do_sometask[recrdeptask] = "do_someothertask"
779                # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
780                # We cover the recursive part of the dependencies below
781                if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']:
782                    tasknames = task_deps['recrdeptask'][taskname].split()
783                    recursivetasks[tid] = tasknames
784                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
785                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
786                    if taskname in tasknames:
787                        recursivetasksselfref.add(tid)
788
789                    if 'recideptask' in task_deps and taskname in task_deps['recideptask']:
790                        recursiveitasks[tid] = []
791                        for t in task_deps['recideptask'][taskname].split():
792                            newdep = build_tid(mc, fn, t)
793                            recursiveitasks[tid].append(newdep)
794
795                self.runtaskentries[tid].depends = depends
796                # Remove all self references
797                self.runtaskentries[tid].depends.discard(tid)
798
799        #self.dump_data()
800
801        self.init_progress_reporter.next_stage()
802
803        # Resolve recursive 'recrdeptask' dependencies (Part B)
804        #
805        # e.g. do_sometask[recrdeptask] = "do_someothertask"
806        # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
807        # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed
808
809        # Generating/interating recursive lists of dependencies is painful and potentially slow
810        # Precompute recursive task dependencies here by:
811        #     a) create a temp list of reverse dependencies (revdeps)
812        #     b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0)
813        #     c) combine the total list of dependencies in cumulativedeps
814        #     d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower)
815
816
817        revdeps = {}
818        deps = {}
819        cumulativedeps = {}
820        for tid in self.runtaskentries:
821            deps[tid] = set(self.runtaskentries[tid].depends)
822            revdeps[tid] = set()
823            cumulativedeps[tid] = set()
824        # Generate a temp list of reverse dependencies
825        for tid in self.runtaskentries:
826            for dep in self.runtaskentries[tid].depends:
827                revdeps[dep].add(tid)
828        # Find the dependency chain endpoints
829        endpoints = set()
830        for tid in self.runtaskentries:
831            if not deps[tid]:
832                endpoints.add(tid)
833        # Iterate the chains collating dependencies
834        while endpoints:
835            next = set()
836            for tid in endpoints:
837                for dep in revdeps[tid]:
838                    cumulativedeps[dep].add(fn_from_tid(tid))
839                    cumulativedeps[dep].update(cumulativedeps[tid])
840                    if tid in deps[dep]:
841                        deps[dep].remove(tid)
842                    if not deps[dep]:
843                        next.add(dep)
844            endpoints = next
845        #for tid in deps:
846        #    if deps[tid]:
847        #        bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid]))
848
849        # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to
850        # resolve these recursively until we aren't adding any further extra dependencies
851        extradeps = True
852        while extradeps:
853            extradeps = 0
854            for tid in recursivetasks:
855                tasknames = recursivetasks[tid]
856
857                totaldeps = set(self.runtaskentries[tid].depends)
858                if tid in recursiveitasks:
859                    totaldeps.update(recursiveitasks[tid])
860                    for dep in recursiveitasks[tid]:
861                        if dep not in self.runtaskentries:
862                            continue
863                        totaldeps.update(self.runtaskentries[dep].depends)
864
865                deps = set()
866                for dep in totaldeps:
867                    if dep in cumulativedeps:
868                        deps.update(cumulativedeps[dep])
869
870                for t in deps:
871                    for taskname in tasknames:
872                        newtid = t + ":" + taskname
873                        if newtid == tid:
874                            continue
875                        if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends:
876                            extradeps += 1
877                            self.runtaskentries[tid].depends.add(newtid)
878
879                # Handle recursive tasks which depend upon other recursive tasks
880                deps = set()
881                for dep in self.runtaskentries[tid].depends.intersection(recursivetasks):
882                    deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends))
883                for newtid in deps:
884                    for taskname in tasknames:
885                        if not newtid.endswith(":" + taskname):
886                            continue
887                        if newtid in self.runtaskentries:
888                            extradeps += 1
889                            self.runtaskentries[tid].depends.add(newtid)
890
891            bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps)
892
893        # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work
894        for tid in recursivetasksselfref:
895            self.runtaskentries[tid].depends.difference_update(recursivetasksselfref)
896
897        self.init_progress_reporter.next_stage()
898
899        #self.dump_data()
900
901        # Step B - Mark all active tasks
902        #
903        # Start with the tasks we were asked to run and mark all dependencies
904        # as active too. If the task is to be 'forced', clear its stamp. Once
905        # all active tasks are marked, prune the ones we don't need.
906
907        logger.verbose("Marking Active Tasks")
908
909        def mark_active(tid, depth):
910            """
911            Mark an item as active along with its depends
912            (calls itself recursively)
913            """
914
915            if tid in runq_build:
916                return
917
918            runq_build[tid] = 1
919
920            depends = self.runtaskentries[tid].depends
921            for depend in depends:
922                mark_active(depend, depth+1)
923
924        def invalidate_task(tid, error_nostamp):
925            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
926            taskdep = self.dataCaches[mc].task_deps[taskfn]
927            if fn + ":" + taskname not in taskData[mc].taskentries:
928                logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname)
929            if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
930                if error_nostamp:
931                    bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
932                else:
933                    bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
934            else:
935                logger.verbose("Invalidate task %s, %s", taskname, fn)
936                bb.parse.siggen.invalidate_task(taskname, self.dataCaches[mc], taskfn)
937
938        self.target_tids = []
939        for (mc, target, task, fn) in self.targets:
940
941            if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]:
942                continue
943
944            if target in taskData[mc].failed_deps:
945                continue
946
947            parents = False
948            if task.endswith('-'):
949                parents = True
950                task = task[:-1]
951
952            if fn in taskData[mc].failed_fns:
953                continue
954
955            # fn already has mc prefix
956            tid = fn + ":" + task
957            self.target_tids.append(tid)
958            if tid not in taskData[mc].taskentries:
959                import difflib
960                tasks = []
961                for x in taskData[mc].taskentries:
962                    if x.startswith(fn + ":"):
963                        tasks.append(taskname_from_tid(x))
964                close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7)
965                if close_matches:
966                    extra = ". Close matches:\n  %s" % "\n  ".join(close_matches)
967                else:
968                    extra = ""
969                bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra))
970
971            # For tasks called "XXXX-", ony run their dependencies
972            if parents:
973                for i in self.runtaskentries[tid].depends:
974                    mark_active(i, 1)
975            else:
976                mark_active(tid, 1)
977
978        self.init_progress_reporter.next_stage()
979
980        # Step C - Prune all inactive tasks
981        #
982        # Once all active tasks are marked, prune the ones we don't need.
983
984        # Handle --runall
985        if self.cooker.configuration.runall:
986            # re-run the mark_active and then drop unused tasks from new list
987            reduced_tasklist = set(self.runtaskentries.keys())
988            for tid in list(self.runtaskentries.keys()):
989                if tid not in runq_build:
990                   reduced_tasklist.remove(tid)
991            runq_build = {}
992
993            for task in self.cooker.configuration.runall:
994                if not task.startswith("do_"):
995                    task = "do_{0}".format(task)
996                runall_tids = set()
997                for tid in reduced_tasklist:
998                    wanttid = "{0}:{1}".format(fn_from_tid(tid), task)
999                    if wanttid in self.runtaskentries:
1000                        runall_tids.add(wanttid)
1001
1002                for tid in list(runall_tids):
1003                    mark_active(tid, 1)
1004                    if self.cooker.configuration.force:
1005                        invalidate_task(tid, False)
1006
1007        delcount = set()
1008        for tid in list(self.runtaskentries.keys()):
1009            if tid not in runq_build:
1010                delcount.add(tid)
1011                del self.runtaskentries[tid]
1012
1013        if self.cooker.configuration.runall:
1014            if not self.runtaskentries:
1015                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets)))
1016
1017        self.init_progress_reporter.next_stage()
1018
1019        # Handle runonly
1020        if self.cooker.configuration.runonly:
1021            # re-run the mark_active and then drop unused tasks from new list
1022            runq_build = {}
1023
1024            for task in self.cooker.configuration.runonly:
1025                if not task.startswith("do_"):
1026                    task = "do_{0}".format(task)
1027                runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task]
1028
1029                for tid in runonly_tids:
1030                    mark_active(tid, 1)
1031                    if self.cooker.configuration.force:
1032                        invalidate_task(tid, False)
1033
1034            for tid in list(self.runtaskentries.keys()):
1035                if tid not in runq_build:
1036                    delcount.add(tid)
1037                    del self.runtaskentries[tid]
1038
1039            if not self.runtaskentries:
1040                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets)))
1041
1042        #
1043        # Step D - Sanity checks and computation
1044        #
1045
1046        # Check to make sure we still have tasks to run
1047        if not self.runtaskentries:
1048            if not taskData[''].halt:
1049                bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
1050            else:
1051                bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
1052
1053        logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries))
1054
1055        logger.verbose("Assign Weightings")
1056
1057        self.init_progress_reporter.next_stage()
1058
1059        # Generate a list of reverse dependencies to ease future calculations
1060        for tid in self.runtaskentries:
1061            for dep in self.runtaskentries[tid].depends:
1062                self.runtaskentries[dep].revdeps.add(tid)
1063
1064        self.init_progress_reporter.next_stage()
1065
1066        # Identify tasks at the end of dependency chains
1067        # Error on circular dependency loops (length two)
1068        endpoints = []
1069        for tid in self.runtaskentries:
1070            revdeps = self.runtaskentries[tid].revdeps
1071            if not revdeps:
1072                endpoints.append(tid)
1073            for dep in revdeps:
1074                if dep in self.runtaskentries[tid].depends:
1075                    bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep))
1076
1077
1078        logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
1079
1080        self.init_progress_reporter.next_stage()
1081
1082        # Calculate task weights
1083        # Check of higher length circular dependencies
1084        self.runq_weight = self.calculate_task_weights(endpoints)
1085
1086        self.init_progress_reporter.next_stage()
1087
1088        # Sanity Check - Check for multiple tasks building the same provider
1089        for mc in self.dataCaches:
1090            prov_list = {}
1091            seen_fn = []
1092            for tid in self.runtaskentries:
1093                (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1094                if taskfn in seen_fn:
1095                    continue
1096                if mc != tidmc:
1097                    continue
1098                seen_fn.append(taskfn)
1099                for prov in self.dataCaches[mc].fn_provides[taskfn]:
1100                    if prov not in prov_list:
1101                        prov_list[prov] = [taskfn]
1102                    elif taskfn not in prov_list[prov]:
1103                        prov_list[prov].append(taskfn)
1104            for prov in prov_list:
1105                if len(prov_list[prov]) < 2:
1106                    continue
1107                if prov in self.multi_provider_allowed:
1108                    continue
1109                seen_pn = []
1110                # If two versions of the same PN are being built its fatal, we don't support it.
1111                for fn in prov_list[prov]:
1112                    pn = self.dataCaches[mc].pkg_fn[fn]
1113                    if pn not in seen_pn:
1114                        seen_pn.append(pn)
1115                    else:
1116                        bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
1117                msgs = ["Multiple .bb files are due to be built which each provide %s:\n  %s" % (prov, "\n  ".join(prov_list[prov]))]
1118                #
1119                # Construct a list of things which uniquely depend on each provider
1120                # since this may help the user figure out which dependency is triggering this warning
1121                #
1122                msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.")
1123                deplist = {}
1124                commondeps = None
1125                for provfn in prov_list[prov]:
1126                    deps = set()
1127                    for tid in self.runtaskentries:
1128                        fn = fn_from_tid(tid)
1129                        if fn != provfn:
1130                            continue
1131                        for dep in self.runtaskentries[tid].revdeps:
1132                            fn = fn_from_tid(dep)
1133                            if fn == provfn:
1134                                continue
1135                            deps.add(dep)
1136                    if not commondeps:
1137                        commondeps = set(deps)
1138                    else:
1139                        commondeps &= deps
1140                    deplist[provfn] = deps
1141                for provfn in deplist:
1142                    msgs.append("\n%s has unique dependees:\n  %s" % (provfn, "\n  ".join(deplist[provfn] - commondeps)))
1143                #
1144                # Construct a list of provides and runtime providers for each recipe
1145                # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC)
1146                #
1147                msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.")
1148                provide_results = {}
1149                rprovide_results = {}
1150                commonprovs = None
1151                commonrprovs = None
1152                for provfn in prov_list[prov]:
1153                    provides = set(self.dataCaches[mc].fn_provides[provfn])
1154                    rprovides = set()
1155                    for rprovide in self.dataCaches[mc].rproviders:
1156                        if provfn in self.dataCaches[mc].rproviders[rprovide]:
1157                            rprovides.add(rprovide)
1158                    for package in self.dataCaches[mc].packages:
1159                        if provfn in self.dataCaches[mc].packages[package]:
1160                            rprovides.add(package)
1161                    for package in self.dataCaches[mc].packages_dynamic:
1162                        if provfn in self.dataCaches[mc].packages_dynamic[package]:
1163                            rprovides.add(package)
1164                    if not commonprovs:
1165                        commonprovs = set(provides)
1166                    else:
1167                        commonprovs &= provides
1168                    provide_results[provfn] = provides
1169                    if not commonrprovs:
1170                        commonrprovs = set(rprovides)
1171                    else:
1172                        commonrprovs &= rprovides
1173                    rprovide_results[provfn] = rprovides
1174                #msgs.append("\nCommon provides:\n  %s" % ("\n  ".join(commonprovs)))
1175                #msgs.append("\nCommon rprovides:\n  %s" % ("\n  ".join(commonrprovs)))
1176                for provfn in prov_list[prov]:
1177                    msgs.append("\n%s has unique provides:\n  %s" % (provfn, "\n  ".join(provide_results[provfn] - commonprovs)))
1178                    msgs.append("\n%s has unique rprovides:\n  %s" % (provfn, "\n  ".join(rprovide_results[provfn] - commonrprovs)))
1179
1180                if self.warn_multi_bb:
1181                    logger.verbnote("".join(msgs))
1182                else:
1183                    logger.error("".join(msgs))
1184
1185        self.init_progress_reporter.next_stage()
1186        self.init_progress_reporter.next_stage()
1187
1188        # Iterate over the task list looking for tasks with a 'setscene' function
1189        self.runq_setscene_tids = set()
1190        if not self.cooker.configuration.nosetscene:
1191            for tid in self.runtaskentries:
1192                (mc, fn, taskname, _) = split_tid_mcfn(tid)
1193                setscenetid = tid + "_setscene"
1194                if setscenetid not in taskData[mc].taskentries:
1195                    continue
1196                self.runq_setscene_tids.add(tid)
1197
1198        self.init_progress_reporter.next_stage()
1199
1200        # Invalidate task if force mode active
1201        if self.cooker.configuration.force:
1202            for tid in self.target_tids:
1203                invalidate_task(tid, False)
1204
1205        # Invalidate task if invalidate mode active
1206        if self.cooker.configuration.invalidate_stamp:
1207            for tid in self.target_tids:
1208                fn = fn_from_tid(tid)
1209                for st in self.cooker.configuration.invalidate_stamp.split(','):
1210                    if not st.startswith("do_"):
1211                        st = "do_%s" % st
1212                    invalidate_task(fn + ":" + st, True)
1213
1214        self.init_progress_reporter.next_stage()
1215
1216        # Create and print to the logs a virtual/xxxx -> PN (fn) table
1217        for mc in taskData:
1218            virtmap = taskData[mc].get_providermap(prefix="virtual/")
1219            virtpnmap = {}
1220            for v in virtmap:
1221                virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]]
1222                bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v]))
1223            if hasattr(bb.parse.siggen, "tasks_resolved"):
1224                bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc])
1225
1226        self.init_progress_reporter.next_stage()
1227
1228        bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids)
1229
1230        # Iterate over the task list and call into the siggen code
1231        dealtwith = set()
1232        todeal = set(self.runtaskentries)
1233        while todeal:
1234            for tid in todeal.copy():
1235                if not (self.runtaskentries[tid].depends - dealtwith):
1236                    dealtwith.add(tid)
1237                    todeal.remove(tid)
1238                    self.prepare_task_hash(tid)
1239
1240        bb.parse.siggen.writeout_file_checksum_cache()
1241
1242        #self.dump_data()
1243        return len(self.runtaskentries)
1244
1245    def prepare_task_hash(self, tid):
1246        dc = bb.parse.siggen.get_data_caches(self.dataCaches, mc_from_tid(tid))
1247        bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, dc)
1248        self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, dc)
1249        self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
1250
1251    def dump_data(self):
1252        """
1253        Dump some debug information on the internal data structures
1254        """
1255        logger.debug3("run_tasks:")
1256        for tid in self.runtaskentries:
1257            logger.debug3(" %s: %s   Deps %s RevDeps %s", tid,
1258                         self.runtaskentries[tid].weight,
1259                         self.runtaskentries[tid].depends,
1260                         self.runtaskentries[tid].revdeps)
1261
1262class RunQueueWorker():
1263    def __init__(self, process, pipe):
1264        self.process = process
1265        self.pipe = pipe
1266
1267class RunQueue:
1268    def __init__(self, cooker, cfgData, dataCaches, taskData, targets):
1269
1270        self.cooker = cooker
1271        self.cfgData = cfgData
1272        self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets)
1273
1274        self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None
1275        self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None
1276
1277        self.state = runQueuePrepare
1278
1279        # For disk space monitor
1280        # Invoked at regular time intervals via the bitbake heartbeat event
1281        # while the build is running. We generate a unique name for the handler
1282        # here, just in case that there ever is more than one RunQueue instance,
1283        # start the handler when reaching runQueueSceneInit, and stop it when
1284        # done with the build.
1285        self.dm = monitordisk.diskMonitor(cfgData)
1286        self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self))
1287        self.dm_event_handler_registered = False
1288        self.rqexe = None
1289        self.worker = {}
1290        self.fakeworker = {}
1291
1292    def _start_worker(self, mc, fakeroot = False, rqexec = None):
1293        logger.debug("Starting bitbake-worker")
1294        magic = "decafbad"
1295        if self.cooker.configuration.profile:
1296            magic = "decafbadbad"
1297        fakerootlogs = None
1298        if fakeroot:
1299            magic = magic + "beef"
1300            mcdata = self.cooker.databuilder.mcdata[mc]
1301            fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD"))
1302            fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split()
1303            env = os.environ.copy()
1304            for key, value in (var.split('=') for var in fakerootenv):
1305                env[key] = value
1306            worker = subprocess.Popen(fakerootcmd + ["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
1307            fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs
1308        else:
1309            worker = subprocess.Popen(["bitbake-worker", magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1310        bb.utils.nonblockingfd(worker.stdout)
1311        workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs)
1312
1313        workerdata = {
1314            "taskdeps" : self.rqdata.dataCaches[mc].task_deps,
1315            "fakerootenv" : self.rqdata.dataCaches[mc].fakerootenv,
1316            "fakerootdirs" : self.rqdata.dataCaches[mc].fakerootdirs,
1317            "fakerootnoenv" : self.rqdata.dataCaches[mc].fakerootnoenv,
1318            "sigdata" : bb.parse.siggen.get_taskdata(),
1319            "logdefaultlevel" : bb.msg.loggerDefaultLogLevel,
1320            "build_verbose_shell" : self.cooker.configuration.build_verbose_shell,
1321            "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout,
1322            "logdefaultdomain" : bb.msg.loggerDefaultDomains,
1323            "prhost" : self.cooker.prhost,
1324            "buildname" : self.cfgData.getVar("BUILDNAME"),
1325            "date" : self.cfgData.getVar("DATE"),
1326            "time" : self.cfgData.getVar("TIME"),
1327            "hashservaddr" : self.cooker.hashservaddr,
1328            "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
1329        }
1330
1331        worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>")
1332        worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>")
1333        worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
1334        worker.stdin.flush()
1335
1336        return RunQueueWorker(worker, workerpipe)
1337
1338    def _teardown_worker(self, worker):
1339        if not worker:
1340            return
1341        logger.debug("Teardown for bitbake-worker")
1342        try:
1343           worker.process.stdin.write(b"<quit></quit>")
1344           worker.process.stdin.flush()
1345           worker.process.stdin.close()
1346        except IOError:
1347           pass
1348        while worker.process.returncode is None:
1349            worker.pipe.read()
1350            worker.process.poll()
1351        while worker.pipe.read():
1352            continue
1353        worker.pipe.close()
1354
1355    def start_worker(self):
1356        if self.worker:
1357            self.teardown_workers()
1358        self.teardown = False
1359        for mc in self.rqdata.dataCaches:
1360            self.worker[mc] = self._start_worker(mc)
1361
1362    def start_fakeworker(self, rqexec, mc):
1363        if not mc in self.fakeworker:
1364            self.fakeworker[mc] = self._start_worker(mc, True, rqexec)
1365
1366    def teardown_workers(self):
1367        self.teardown = True
1368        for mc in self.worker:
1369            self._teardown_worker(self.worker[mc])
1370        self.worker = {}
1371        for mc in self.fakeworker:
1372            self._teardown_worker(self.fakeworker[mc])
1373        self.fakeworker = {}
1374
1375    def read_workers(self):
1376        for mc in self.worker:
1377            self.worker[mc].pipe.read()
1378        for mc in self.fakeworker:
1379            self.fakeworker[mc].pipe.read()
1380
1381    def active_fds(self):
1382        fds = []
1383        for mc in self.worker:
1384            fds.append(self.worker[mc].pipe.input)
1385        for mc in self.fakeworker:
1386            fds.append(self.fakeworker[mc].pipe.input)
1387        return fds
1388
1389    def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None):
1390        def get_timestamp(f):
1391            try:
1392                if not os.access(f, os.F_OK):
1393                    return None
1394                return os.stat(f)[stat.ST_MTIME]
1395            except:
1396                return None
1397
1398        (mc, fn, tn, taskfn) = split_tid_mcfn(tid)
1399        if taskname is None:
1400            taskname = tn
1401
1402        stampfile = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn)
1403
1404        # If the stamp is missing, it's not current
1405        if not os.access(stampfile, os.F_OK):
1406            logger.debug2("Stampfile %s not available", stampfile)
1407            return False
1408        # If it's a 'nostamp' task, it's not current
1409        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1410        if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
1411            logger.debug2("%s.%s is nostamp\n", fn, taskname)
1412            return False
1413
1414        if taskname != "do_setscene" and taskname.endswith("_setscene"):
1415            return True
1416
1417        if cache is None:
1418            cache = {}
1419
1420        iscurrent = True
1421        t1 = get_timestamp(stampfile)
1422        for dep in self.rqdata.runtaskentries[tid].depends:
1423            if iscurrent:
1424                (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep)
1425                stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCaches[mc2], taskfn2)
1426                stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCaches[mc2], taskfn2)
1427                t2 = get_timestamp(stampfile2)
1428                t3 = get_timestamp(stampfile3)
1429                if t3 and not t2:
1430                    continue
1431                if t3 and t3 > t2:
1432                    continue
1433                if fn == fn2:
1434                    if not t2:
1435                        logger.debug2('Stampfile %s does not exist', stampfile2)
1436                        iscurrent = False
1437                        break
1438                    if t1 < t2:
1439                        logger.debug2('Stampfile %s < %s', stampfile, stampfile2)
1440                        iscurrent = False
1441                        break
1442                    if recurse and iscurrent:
1443                        if dep in cache:
1444                            iscurrent = cache[dep]
1445                            if not iscurrent:
1446                                logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1447                        else:
1448                            iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1449                            cache[dep] = iscurrent
1450        if recurse:
1451            cache[tid] = iscurrent
1452        return iscurrent
1453
1454    def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True):
1455        valid = set()
1456        if self.hashvalidate:
1457            sq_data = {}
1458            sq_data['hash'] = {}
1459            sq_data['hashfn'] = {}
1460            sq_data['unihash'] = {}
1461            for tid in tocheck:
1462                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1463                sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash
1464                sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn]
1465                sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash
1466
1467            valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary)
1468
1469        return valid
1470
1471    def validate_hash(self, sq_data, d, siginfo, currentcount, summary):
1472        locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary}
1473
1474        # Metadata has **kwargs so args can be added, sq_data can also gain new fields
1475        call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)"
1476
1477        return bb.utils.better_eval(call, locs)
1478
1479    def _execute_runqueue(self):
1480        """
1481        Run the tasks in a queue prepared by rqdata.prepare()
1482        Upon failure, optionally try to recover the build using any alternate providers
1483        (if the halt on failure configuration option isn't set)
1484        """
1485
1486        retval = True
1487
1488        if self.state is runQueuePrepare:
1489            # NOTE: if you add, remove or significantly refactor the stages of this
1490            # process then you should recalculate the weightings here. This is quite
1491            # easy to do - just change the next line temporarily to pass debug=True as
1492            # the last parameter and you'll get a printout of the weightings as well
1493            # as a map to the lines where next_stage() was called. Of course this isn't
1494            # critical, but it helps to keep the progress reporting accurate.
1495            self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data,
1496                                                            "Initialising tasks",
1497                                                            [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244])
1498            if self.rqdata.prepare() == 0:
1499                self.state = runQueueComplete
1500            else:
1501                self.state = runQueueSceneInit
1502                bb.parse.siggen.save_unitaskhashes()
1503
1504        if self.state is runQueueSceneInit:
1505            self.rqdata.init_progress_reporter.next_stage()
1506
1507            # we are ready to run,  emit dependency info to any UI or class which
1508            # needs it
1509            depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1510            self.rqdata.init_progress_reporter.next_stage()
1511            bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1512
1513            if not self.dm_event_handler_registered:
1514                 res = bb.event.register(self.dm_event_handler_name,
1515                                         lambda x: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False,
1516                                         ('bb.event.HeartbeatEvent',), data=self.cfgData)
1517                 self.dm_event_handler_registered = True
1518
1519            dump = self.cooker.configuration.dump_signatures
1520            if dump:
1521                self.rqdata.init_progress_reporter.finish()
1522                if 'printdiff' in dump:
1523                    invalidtasks = self.print_diffscenetasks()
1524                self.dump_signatures(dump)
1525                if 'printdiff' in dump:
1526                    self.write_diffscenetasks(invalidtasks)
1527                self.state = runQueueComplete
1528
1529        if self.state is runQueueSceneInit:
1530            self.rqdata.init_progress_reporter.next_stage()
1531            self.start_worker()
1532            self.rqdata.init_progress_reporter.next_stage()
1533            self.rqexe = RunQueueExecute(self)
1534
1535            # If we don't have any setscene functions, skip execution
1536            if not self.rqdata.runq_setscene_tids:
1537                logger.info('No setscene tasks')
1538                for tid in self.rqdata.runtaskentries:
1539                    if not self.rqdata.runtaskentries[tid].depends:
1540                        self.rqexe.setbuildable(tid)
1541                    self.rqexe.tasks_notcovered.add(tid)
1542                self.rqexe.sqdone = True
1543            logger.info('Executing Tasks')
1544            self.state = runQueueRunning
1545
1546        if self.state is runQueueRunning:
1547            retval = self.rqexe.execute()
1548
1549        if self.state is runQueueCleanUp:
1550            retval = self.rqexe.finish()
1551
1552        build_done = self.state is runQueueComplete or self.state is runQueueFailed
1553
1554        if build_done and self.dm_event_handler_registered:
1555            bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData)
1556            self.dm_event_handler_registered = False
1557
1558        if build_done and self.rqexe:
1559            bb.parse.siggen.save_unitaskhashes()
1560            self.teardown_workers()
1561            if self.rqexe:
1562                if self.rqexe.stats.failed:
1563                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1564                else:
1565                    # Let's avoid the word "failed" if nothing actually did
1566                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1567
1568        if self.state is runQueueFailed:
1569            raise bb.runqueue.TaskFailure(self.rqexe.failed_tids)
1570
1571        if self.state is runQueueComplete:
1572            # All done
1573            return False
1574
1575        # Loop
1576        return retval
1577
1578    def execute_runqueue(self):
1579        # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1580        try:
1581            return self._execute_runqueue()
1582        except bb.runqueue.TaskFailure:
1583            raise
1584        except SystemExit:
1585            raise
1586        except bb.BBHandledException:
1587            try:
1588                self.teardown_workers()
1589            except:
1590                pass
1591            self.state = runQueueComplete
1592            raise
1593        except Exception as err:
1594            logger.exception("An uncaught exception occurred in runqueue")
1595            try:
1596                self.teardown_workers()
1597            except:
1598                pass
1599            self.state = runQueueComplete
1600            raise
1601
1602    def finish_runqueue(self, now = False):
1603        if not self.rqexe:
1604            self.state = runQueueComplete
1605            return
1606
1607        if now:
1608            self.rqexe.finish_now()
1609        else:
1610            self.rqexe.finish()
1611
1612    def rq_dump_sigfn(self, fn, options):
1613        bb_cache = bb.cache.NoCache(self.cooker.databuilder)
1614        mc = bb.runqueue.mc_from_tid(fn)
1615        the_data = bb_cache.loadDataFull(fn, self.cooker.collections[mc].get_file_appends(fn))
1616        siggen = bb.parse.siggen
1617        dataCaches = self.rqdata.dataCaches
1618        siggen.dump_sigfn(fn, dataCaches, options)
1619
1620    def dump_signatures(self, options):
1621        fns = set()
1622        bb.note("Reparsing files to collect dependency data")
1623
1624        for tid in self.rqdata.runtaskentries:
1625            fn = fn_from_tid(tid)
1626            fns.add(fn)
1627
1628        max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1)
1629        # We cannot use the real multiprocessing.Pool easily due to some local data
1630        # that can't be pickled. This is a cheap multi-process solution.
1631        launched = []
1632        while fns:
1633            if len(launched) < max_process:
1634                p = Process(target=self.rq_dump_sigfn, args=(fns.pop(), options))
1635                p.start()
1636                launched.append(p)
1637            for q in launched:
1638                # The finished processes are joined when calling is_alive()
1639                if not q.is_alive():
1640                    launched.remove(q)
1641        for p in launched:
1642                p.join()
1643
1644        bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options)
1645
1646        return
1647
1648    def print_diffscenetasks(self):
1649
1650        noexec = []
1651        tocheck = set()
1652
1653        for tid in self.rqdata.runtaskentries:
1654            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1655            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1656
1657            if 'noexec' in taskdep and taskname in taskdep['noexec']:
1658                noexec.append(tid)
1659                continue
1660
1661            tocheck.add(tid)
1662
1663        valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False)
1664
1665        # Tasks which are both setscene and noexec never care about dependencies
1666        # We therefore find tasks which are setscene and noexec and mark their
1667        # unique dependencies as valid.
1668        for tid in noexec:
1669            if tid not in self.rqdata.runq_setscene_tids:
1670                continue
1671            for dep in self.rqdata.runtaskentries[tid].depends:
1672                hasnoexecparents = True
1673                for dep2 in self.rqdata.runtaskentries[dep].revdeps:
1674                    if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec:
1675                        continue
1676                    hasnoexecparents = False
1677                    break
1678                if hasnoexecparents:
1679                    valid_new.add(dep)
1680
1681        invalidtasks = set()
1682        for tid in self.rqdata.runtaskentries:
1683            if tid not in valid_new and tid not in noexec:
1684                invalidtasks.add(tid)
1685
1686        found = set()
1687        processed = set()
1688        for tid in invalidtasks:
1689            toprocess = set([tid])
1690            while toprocess:
1691                next = set()
1692                for t in toprocess:
1693                    for dep in self.rqdata.runtaskentries[t].depends:
1694                        if dep in invalidtasks:
1695                            found.add(tid)
1696                        if dep not in processed:
1697                            processed.add(dep)
1698                            next.add(dep)
1699                toprocess = next
1700                if tid in found:
1701                    toprocess = set()
1702
1703        tasklist = []
1704        for tid in invalidtasks.difference(found):
1705            tasklist.append(tid)
1706
1707        if tasklist:
1708            bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1709
1710        return invalidtasks.difference(found)
1711
1712    def write_diffscenetasks(self, invalidtasks):
1713
1714        # Define recursion callback
1715        def recursecb(key, hash1, hash2):
1716            hashes = [hash1, hash2]
1717            hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1718
1719            recout = []
1720            if len(hashfiles) == 2:
1721                out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb)
1722                recout.extend(list('    ' + l for l in out2))
1723            else:
1724                recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1725
1726            return recout
1727
1728
1729        for tid in invalidtasks:
1730            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1731            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1732            h = self.rqdata.runtaskentries[tid].hash
1733            matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc])
1734            match = None
1735            for m in matches:
1736                if h in m:
1737                    match = m
1738            if match is None:
1739                bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h)
1740            matches = {k : v for k, v in iter(matches.items()) if h not in k}
1741            if matches:
1742                latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1]
1743                prevh = __find_sha256__.search(latestmatch).group(0)
1744                output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1745                bb.plain("\nTask %s:%s couldn't be used from the cache because:\n  We need hash %s, closest matching task was %s\n  " % (pn, taskname, h, prevh) + '\n  '.join(output))
1746
1747
1748class RunQueueExecute:
1749
1750    def __init__(self, rq):
1751        self.rq = rq
1752        self.cooker = rq.cooker
1753        self.cfgData = rq.cfgData
1754        self.rqdata = rq.rqdata
1755
1756        self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1)
1757        self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed"
1758        self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU")
1759        self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO")
1760        self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY")
1761
1762        self.sq_buildable = set()
1763        self.sq_running = set()
1764        self.sq_live = set()
1765
1766        self.updated_taskhash_queue = []
1767        self.pending_migrations = set()
1768
1769        self.runq_buildable = set()
1770        self.runq_running = set()
1771        self.runq_complete = set()
1772        self.runq_tasksrun = set()
1773
1774        self.build_stamps = {}
1775        self.build_stamps2 = []
1776        self.failed_tids = []
1777        self.sq_deferred = {}
1778
1779        self.stampcache = {}
1780
1781        self.holdoff_tasks = set()
1782        self.holdoff_need_update = True
1783        self.sqdone = False
1784
1785        self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids))
1786
1787        for mc in rq.worker:
1788            rq.worker[mc].pipe.setrunqueueexec(self)
1789        for mc in rq.fakeworker:
1790            rq.fakeworker[mc].pipe.setrunqueueexec(self)
1791
1792        if self.number_tasks <= 0:
1793             bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
1794
1795        lower_limit = 1.0
1796        upper_limit = 1000000.0
1797        if self.max_cpu_pressure:
1798            self.max_cpu_pressure = float(self.max_cpu_pressure)
1799            if self.max_cpu_pressure < lower_limit:
1800                bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit))
1801            if self.max_cpu_pressure > upper_limit:
1802                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure))
1803
1804        if self.max_io_pressure:
1805            self.max_io_pressure = float(self.max_io_pressure)
1806            if self.max_io_pressure < lower_limit:
1807                bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit))
1808            if self.max_io_pressure > upper_limit:
1809                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
1810
1811        if self.max_memory_pressure:
1812            self.max_memory_pressure = float(self.max_memory_pressure)
1813            if self.max_memory_pressure < lower_limit:
1814                bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit))
1815            if self.max_memory_pressure > upper_limit:
1816                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
1817
1818        # List of setscene tasks which we've covered
1819        self.scenequeue_covered = set()
1820        # List of tasks which are covered (including setscene ones)
1821        self.tasks_covered = set()
1822        self.tasks_scenequeue_done = set()
1823        self.scenequeue_notcovered = set()
1824        self.tasks_notcovered = set()
1825        self.scenequeue_notneeded = set()
1826
1827        # We can't skip specified target tasks which aren't setscene tasks
1828        self.cantskip = set(self.rqdata.target_tids)
1829        self.cantskip.difference_update(self.rqdata.runq_setscene_tids)
1830        self.cantskip.intersection_update(self.rqdata.runtaskentries)
1831
1832        schedulers = self.get_schedulers()
1833        for scheduler in schedulers:
1834            if self.scheduler == scheduler.name:
1835                self.sched = scheduler(self, self.rqdata)
1836                logger.debug("Using runqueue scheduler '%s'", scheduler.name)
1837                break
1838        else:
1839            bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
1840                     (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1841
1842        #if self.rqdata.runq_setscene_tids:
1843        self.sqdata = SQData()
1844        build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self)
1845
1846    def runqueue_process_waitpid(self, task, status, fakerootlog=None):
1847
1848        # self.build_stamps[pid] may not exist when use shared work directory.
1849        if task in self.build_stamps:
1850            self.build_stamps2.remove(self.build_stamps[task])
1851            del self.build_stamps[task]
1852
1853        if task in self.sq_live:
1854            if status != 0:
1855                self.sq_task_fail(task, status)
1856            else:
1857                self.sq_task_complete(task)
1858            self.sq_live.remove(task)
1859            self.stats.updateActiveSetscene(len(self.sq_live))
1860        else:
1861            if status != 0:
1862                self.task_fail(task, status, fakerootlog=fakerootlog)
1863            else:
1864                self.task_complete(task)
1865        return True
1866
1867    def finish_now(self):
1868        for mc in self.rq.worker:
1869            try:
1870                self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
1871                self.rq.worker[mc].process.stdin.flush()
1872            except IOError:
1873                # worker must have died?
1874                pass
1875        for mc in self.rq.fakeworker:
1876            try:
1877                self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
1878                self.rq.fakeworker[mc].process.stdin.flush()
1879            except IOError:
1880                # worker must have died?
1881                pass
1882
1883        if self.failed_tids:
1884            self.rq.state = runQueueFailed
1885            return
1886
1887        self.rq.state = runQueueComplete
1888        return
1889
1890    def finish(self):
1891        self.rq.state = runQueueCleanUp
1892
1893        active = self.stats.active + len(self.sq_live)
1894        if active > 0:
1895            bb.event.fire(runQueueExitWait(active), self.cfgData)
1896            self.rq.read_workers()
1897            return self.rq.active_fds()
1898
1899        if self.failed_tids:
1900            self.rq.state = runQueueFailed
1901            return True
1902
1903        self.rq.state = runQueueComplete
1904        return True
1905
1906    # Used by setscene only
1907    def check_dependencies(self, task, taskdeps):
1908        if not self.rq.depvalidate:
1909            return False
1910
1911        # Must not edit parent data
1912        taskdeps = set(taskdeps)
1913
1914        taskdata = {}
1915        taskdeps.add(task)
1916        for dep in taskdeps:
1917            (mc, fn, taskname, taskfn) = split_tid_mcfn(dep)
1918            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1919            taskdata[dep] = [pn, taskname, fn]
1920        call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
1921        locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data }
1922        valid = bb.utils.better_eval(call, locs)
1923        return valid
1924
1925    def can_start_task(self):
1926        active = self.stats.active + len(self.sq_live)
1927        can_start = active < self.number_tasks
1928        return can_start
1929
1930    def get_schedulers(self):
1931        schedulers = set(obj for obj in globals().values()
1932                             if type(obj) is type and
1933                                issubclass(obj, RunQueueScheduler))
1934
1935        user_schedulers = self.cfgData.getVar("BB_SCHEDULERS")
1936        if user_schedulers:
1937            for sched in user_schedulers.split():
1938                if not "." in sched:
1939                    bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1940                    continue
1941
1942                modname, name = sched.rsplit(".", 1)
1943                try:
1944                    module = __import__(modname, fromlist=(name,))
1945                except ImportError as exc:
1946                    logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1947                    raise SystemExit(1)
1948                else:
1949                    schedulers.add(getattr(module, name))
1950        return schedulers
1951
1952    def setbuildable(self, task):
1953        self.runq_buildable.add(task)
1954        self.sched.newbuildable(task)
1955
1956    def task_completeoutright(self, task):
1957        """
1958        Mark a task as completed
1959        Look at the reverse dependencies and mark any task with
1960        completed dependencies as buildable
1961        """
1962        self.runq_complete.add(task)
1963        for revdep in self.rqdata.runtaskentries[task].revdeps:
1964            if revdep in self.runq_running:
1965                continue
1966            if revdep in self.runq_buildable:
1967                continue
1968            alldeps = True
1969            for dep in self.rqdata.runtaskentries[revdep].depends:
1970                if dep not in self.runq_complete:
1971                    alldeps = False
1972                    break
1973            if alldeps:
1974                self.setbuildable(revdep)
1975                logger.debug("Marking task %s as buildable", revdep)
1976
1977        for t in self.sq_deferred.copy():
1978            if self.sq_deferred[t] == task:
1979                logger.debug2("Deferred task %s now buildable" % t)
1980                del self.sq_deferred[t]
1981                update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
1982
1983    def task_complete(self, task):
1984        self.stats.taskCompleted()
1985        bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
1986        self.task_completeoutright(task)
1987        self.runq_tasksrun.add(task)
1988
1989    def task_fail(self, task, exitcode, fakerootlog=None):
1990        """
1991        Called when a task has failed
1992        Updates the state engine with the failure
1993        """
1994        self.stats.taskFailed()
1995        self.failed_tids.append(task)
1996
1997        fakeroot_log = []
1998        if fakerootlog and os.path.exists(fakerootlog):
1999            with open(fakerootlog) as fakeroot_log_file:
2000                fakeroot_failed = False
2001                for line in reversed(fakeroot_log_file.readlines()):
2002                    for fakeroot_error in ['mismatch', 'error', 'fatal']:
2003                        if fakeroot_error in line.lower():
2004                            fakeroot_failed = True
2005                    if 'doing new pid setup and server start' in line:
2006                        break
2007                    fakeroot_log.append(line)
2008
2009            if not fakeroot_failed:
2010                fakeroot_log = []
2011
2012        bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData)
2013
2014        if self.rqdata.taskData[''].halt:
2015            self.rq.state = runQueueCleanUp
2016
2017    def task_skip(self, task, reason):
2018        self.runq_running.add(task)
2019        self.setbuildable(task)
2020        bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
2021        self.task_completeoutright(task)
2022        self.stats.taskSkipped()
2023        self.stats.taskCompleted()
2024
2025    def summarise_scenequeue_errors(self):
2026        err = False
2027        if not self.sqdone:
2028            logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
2029            completeevent = sceneQueueComplete(self.stats, self.rq)
2030            bb.event.fire(completeevent, self.cfgData)
2031        if self.sq_deferred:
2032            logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred))
2033            err = True
2034        if self.updated_taskhash_queue:
2035            logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue))
2036            err = True
2037        if self.holdoff_tasks:
2038            logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks))
2039            err = True
2040
2041        for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered):
2042            # No task should end up in both covered and uncovered, that is a bug.
2043            logger.error("Setscene task %s in both covered and notcovered." % tid)
2044
2045        for tid in self.rqdata.runq_setscene_tids:
2046            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2047                err = True
2048                logger.error("Setscene Task %s was never marked as covered or not covered" % tid)
2049            if tid not in self.sq_buildable:
2050                err = True
2051                logger.error("Setscene Task %s was never marked as buildable" % tid)
2052            if tid not in self.sq_running:
2053                err = True
2054                logger.error("Setscene Task %s was never marked as running" % tid)
2055
2056        for x in self.rqdata.runtaskentries:
2057            if x not in self.tasks_covered and x not in self.tasks_notcovered:
2058                logger.error("Task %s was never moved from the setscene queue" % x)
2059                err = True
2060            if x not in self.tasks_scenequeue_done:
2061                logger.error("Task %s was never processed by the setscene code" % x)
2062                err = True
2063            if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable:
2064                logger.error("Task %s was never marked as buildable by the setscene code" % x)
2065                err = True
2066        return err
2067
2068
2069    def execute(self):
2070        """
2071        Run the tasks in a queue prepared by prepare_runqueue
2072        """
2073
2074        self.rq.read_workers()
2075        if self.updated_taskhash_queue or self.pending_migrations:
2076            self.process_possible_migrations()
2077
2078        if not hasattr(self, "sorted_setscene_tids"):
2079            # Don't want to sort this set every execution
2080            self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids)
2081
2082        task = None
2083        if not self.sqdone and self.can_start_task():
2084            # Find the next setscene to run
2085            for nexttask in self.sorted_setscene_tids:
2086                if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values():
2087                    if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]):
2088                        if nexttask not in self.rqdata.target_tids:
2089                            logger.debug2("Skipping setscene for task %s" % nexttask)
2090                            self.sq_task_skip(nexttask)
2091                            self.scenequeue_notneeded.add(nexttask)
2092                            if nexttask in self.sq_deferred:
2093                                del self.sq_deferred[nexttask]
2094                            return True
2095                    # If covered tasks are running, need to wait for them to complete
2096                    for t in self.sqdata.sq_covered_tasks[nexttask]:
2097                        if t in self.runq_running and t not in self.runq_complete:
2098                            continue
2099                    if nexttask in self.sq_deferred:
2100                        if self.sq_deferred[nexttask] not in self.runq_complete:
2101                            continue
2102                        logger.debug("Task %s no longer deferred" % nexttask)
2103                        del self.sq_deferred[nexttask]
2104                        valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False)
2105                        if not valid:
2106                            logger.debug("%s didn't become valid, skipping setscene" % nexttask)
2107                            self.sq_task_failoutright(nexttask)
2108                            return True
2109                    if nexttask in self.sqdata.outrightfail:
2110                        logger.debug2('No package found, so skipping setscene task %s', nexttask)
2111                        self.sq_task_failoutright(nexttask)
2112                        return True
2113                    if nexttask in self.sqdata.unskippable:
2114                        logger.debug2("Setscene task %s is unskippable" % nexttask)
2115                    task = nexttask
2116                    break
2117        if task is not None:
2118            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2119            taskname = taskname + "_setscene"
2120            if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache):
2121                logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task)
2122                self.sq_task_failoutright(task)
2123                return True
2124
2125            if self.cooker.configuration.force:
2126                if task in self.rqdata.target_tids:
2127                    self.sq_task_failoutright(task)
2128                    return True
2129
2130            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2131                logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task)
2132                self.sq_task_skip(task)
2133                return True
2134
2135            if self.cooker.configuration.skipsetscene:
2136                logger.debug2('No setscene tasks should be executed. Skipping %s', task)
2137                self.sq_task_failoutright(task)
2138                return True
2139
2140            startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
2141            bb.event.fire(startevent, self.cfgData)
2142
2143            taskdepdata = self.sq_build_taskdepdata(task)
2144
2145            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2146            taskhash = self.rqdata.get_task_hash(task)
2147            unihash = self.rqdata.get_task_unihash(task)
2148            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
2149                if not mc in self.rq.fakeworker:
2150                    self.rq.start_fakeworker(self, mc)
2151                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
2152                self.rq.fakeworker[mc].process.stdin.flush()
2153            else:
2154                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, True, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, False)) + b"</runtask>")
2155                self.rq.worker[mc].process.stdin.flush()
2156
2157            self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
2158            self.build_stamps2.append(self.build_stamps[task])
2159            self.sq_running.add(task)
2160            self.sq_live.add(task)
2161            self.stats.updateActiveSetscene(len(self.sq_live))
2162            if self.can_start_task():
2163                return True
2164
2165        self.update_holdofftasks()
2166
2167        if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks:
2168            hashequiv_logger.verbose("Setscene tasks completed")
2169
2170            err = self.summarise_scenequeue_errors()
2171            if err:
2172                self.rq.state = runQueueFailed
2173                return True
2174
2175            if self.cooker.configuration.setsceneonly:
2176                self.rq.state = runQueueComplete
2177                return True
2178            self.sqdone = True
2179
2180            if self.stats.total == 0:
2181                # nothing to do
2182                self.rq.state = runQueueComplete
2183                return True
2184
2185        if self.cooker.configuration.setsceneonly:
2186            task = None
2187        else:
2188            task = self.sched.next()
2189        if task is not None:
2190            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2191
2192            if self.rqdata.setscene_ignore_tasks is not None:
2193                if self.check_setscene_ignore_tasks(task):
2194                    self.task_fail(task, "setscene ignore_tasks")
2195                    return True
2196
2197            if task in self.tasks_covered:
2198                logger.debug2("Setscene covered task %s", task)
2199                self.task_skip(task, "covered")
2200                return True
2201
2202            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2203                logger.debug2("Stamp current task %s", task)
2204
2205                self.task_skip(task, "existing")
2206                self.runq_tasksrun.add(task)
2207                return True
2208
2209            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2210            if 'noexec' in taskdep and taskname in taskdep['noexec']:
2211                startevent = runQueueTaskStarted(task, self.stats, self.rq,
2212                                                 noexec=True)
2213                bb.event.fire(startevent, self.cfgData)
2214                self.runq_running.add(task)
2215                self.stats.taskActive()
2216                if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2217                    bb.build.make_stamp(taskname, self.rqdata.dataCaches[mc], taskfn)
2218                self.task_complete(task)
2219                return True
2220            else:
2221                startevent = runQueueTaskStarted(task, self.stats, self.rq)
2222                bb.event.fire(startevent, self.cfgData)
2223
2224            taskdepdata = self.build_taskdepdata(task)
2225
2226            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2227            taskhash = self.rqdata.get_task_hash(task)
2228            unihash = self.rqdata.get_task_unihash(task)
2229            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2230                if not mc in self.rq.fakeworker:
2231                    try:
2232                        self.rq.start_fakeworker(self, mc)
2233                    except OSError as exc:
2234                        logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc)))
2235                        self.rq.state = runQueueFailed
2236                        self.stats.taskFailed()
2237                        return True
2238                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
2239                self.rq.fakeworker[mc].process.stdin.flush()
2240            else:
2241                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps((taskfn, task, taskname, taskhash, unihash, False, self.cooker.collections[mc].get_file_appends(taskfn), taskdepdata, self.rqdata.setscene_enforce)) + b"</runtask>")
2242                self.rq.worker[mc].process.stdin.flush()
2243
2244            self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCaches[mc], taskfn, noextra=True)
2245            self.build_stamps2.append(self.build_stamps[task])
2246            self.runq_running.add(task)
2247            self.stats.taskActive()
2248            if self.can_start_task():
2249                return True
2250
2251        if self.stats.active > 0 or self.sq_live:
2252            self.rq.read_workers()
2253            return self.rq.active_fds()
2254
2255        # No more tasks can be run. If we have deferred setscene tasks we should run them.
2256        if self.sq_deferred:
2257            deferred_tid = list(self.sq_deferred.keys())[0]
2258            blocking_tid = self.sq_deferred.pop(deferred_tid)
2259            logger.warning("Runqueue deadlocked on deferred tasks, forcing task %s blocked by %s" % (deferred_tid, blocking_tid))
2260            return True
2261
2262        if self.failed_tids:
2263            self.rq.state = runQueueFailed
2264            return True
2265
2266        # Sanity Checks
2267        err = self.summarise_scenequeue_errors()
2268        for task in self.rqdata.runtaskentries:
2269            if task not in self.runq_buildable:
2270                logger.error("Task %s never buildable!", task)
2271                err = True
2272            elif task not in self.runq_running:
2273                logger.error("Task %s never ran!", task)
2274                err = True
2275            elif task not in self.runq_complete:
2276                logger.error("Task %s never completed!", task)
2277                err = True
2278
2279        if err:
2280            self.rq.state = runQueueFailed
2281        else:
2282            self.rq.state = runQueueComplete
2283
2284        return True
2285
2286    def filtermcdeps(self, task, mc, deps):
2287        ret = set()
2288        for dep in deps:
2289            thismc = mc_from_tid(dep)
2290            if thismc != mc:
2291                continue
2292            ret.add(dep)
2293        return ret
2294
2295    # We filter out multiconfig dependencies from taskdepdata we pass to the tasks
2296    # as most code can't handle them
2297    def build_taskdepdata(self, task):
2298        taskdepdata = {}
2299        mc = mc_from_tid(task)
2300        next = self.rqdata.runtaskentries[task].depends.copy()
2301        next.add(task)
2302        next = self.filtermcdeps(task, mc, next)
2303        while next:
2304            additional = []
2305            for revdep in next:
2306                (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2307                pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2308                deps = self.rqdata.runtaskentries[revdep].depends
2309                provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
2310                taskhash = self.rqdata.runtaskentries[revdep].hash
2311                unihash = self.rqdata.runtaskentries[revdep].unihash
2312                deps = self.filtermcdeps(task, mc, deps)
2313                taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash]
2314                for revdep2 in deps:
2315                    if revdep2 not in taskdepdata:
2316                        additional.append(revdep2)
2317            next = additional
2318
2319        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2320        return taskdepdata
2321
2322    def update_holdofftasks(self):
2323
2324        if not self.holdoff_need_update:
2325            return
2326
2327        notcovered = set(self.scenequeue_notcovered)
2328        notcovered |= self.cantskip
2329        for tid in self.scenequeue_notcovered:
2330            notcovered |= self.sqdata.sq_covered_tasks[tid]
2331        notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids)
2332        notcovered.intersection_update(self.tasks_scenequeue_done)
2333
2334        covered = set(self.scenequeue_covered)
2335        for tid in self.scenequeue_covered:
2336            covered |= self.sqdata.sq_covered_tasks[tid]
2337        covered.difference_update(notcovered)
2338        covered.intersection_update(self.tasks_scenequeue_done)
2339
2340        for tid in notcovered | covered:
2341            if not self.rqdata.runtaskentries[tid].depends:
2342                self.setbuildable(tid)
2343            elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
2344                 self.setbuildable(tid)
2345
2346        self.tasks_covered = covered
2347        self.tasks_notcovered = notcovered
2348
2349        self.holdoff_tasks = set()
2350
2351        for tid in self.rqdata.runq_setscene_tids:
2352            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2353                self.holdoff_tasks.add(tid)
2354
2355        for tid in self.holdoff_tasks.copy():
2356            for dep in self.sqdata.sq_covered_tasks[tid]:
2357                if dep not in self.runq_complete:
2358                    self.holdoff_tasks.add(dep)
2359
2360        self.holdoff_need_update = False
2361
2362    def process_possible_migrations(self):
2363
2364        changed = set()
2365        toprocess = set()
2366        for tid, unihash in self.updated_taskhash_queue.copy():
2367            if tid in self.runq_running and tid not in self.runq_complete:
2368                continue
2369
2370            self.updated_taskhash_queue.remove((tid, unihash))
2371
2372            if unihash != self.rqdata.runtaskentries[tid].unihash:
2373                # Make sure we rehash any other tasks with the same task hash that we're deferred against.
2374                torehash = [tid]
2375                for deftid in self.sq_deferred:
2376                    if self.sq_deferred[deftid] == tid:
2377                        torehash.append(deftid)
2378                for hashtid in torehash:
2379                    hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash))
2380                    self.rqdata.runtaskentries[hashtid].unihash = unihash
2381                    bb.parse.siggen.set_unihash(hashtid, unihash)
2382                    toprocess.add(hashtid)
2383                if torehash:
2384                    # Need to save after set_unihash above
2385                    bb.parse.siggen.save_unitaskhashes()
2386
2387        # Work out all tasks which depend upon these
2388        total = set()
2389        next = set()
2390        for p in toprocess:
2391            next |= self.rqdata.runtaskentries[p].revdeps
2392        while next:
2393            current = next.copy()
2394            total = total | next
2395            next = set()
2396            for ntid in current:
2397                next |= self.rqdata.runtaskentries[ntid].revdeps
2398            next.difference_update(total)
2399
2400        # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
2401        next = set()
2402        for p in total:
2403            if not self.rqdata.runtaskentries[p].depends:
2404                next.add(p)
2405            elif self.rqdata.runtaskentries[p].depends.isdisjoint(total):
2406                next.add(p)
2407
2408        # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled
2409        while next:
2410            current = next.copy()
2411            next = set()
2412            for tid in current:
2413                if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
2414                    continue
2415                orighash = self.rqdata.runtaskentries[tid].hash
2416                dc = bb.parse.siggen.get_data_caches(self.rqdata.dataCaches, mc_from_tid(tid))
2417                newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, dc)
2418                origuni = self.rqdata.runtaskentries[tid].unihash
2419                newuni = bb.parse.siggen.get_unihash(tid)
2420                # FIXME, need to check it can come from sstate at all for determinism?
2421                remapped = False
2422                if newuni == origuni:
2423                    # Nothing to do, we match, skip code below
2424                    remapped = True
2425                elif tid in self.scenequeue_covered or tid in self.sq_live:
2426                    # Already ran this setscene task or it running. Report the new taskhash
2427                    bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches)
2428                    hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid))
2429                    remapped = True
2430
2431                if not remapped:
2432                    #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni))
2433                    self.rqdata.runtaskentries[tid].hash = newhash
2434                    self.rqdata.runtaskentries[tid].unihash = newuni
2435                    changed.add(tid)
2436
2437                next |= self.rqdata.runtaskentries[tid].revdeps
2438                total.remove(tid)
2439                next.intersection_update(total)
2440
2441        if changed:
2442            for mc in self.rq.worker:
2443                self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
2444            for mc in self.rq.fakeworker:
2445                self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
2446
2447            hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))
2448
2449        for tid in changed:
2450            if tid not in self.rqdata.runq_setscene_tids:
2451                continue
2452            if tid not in self.pending_migrations:
2453                self.pending_migrations.add(tid)
2454
2455        update_tasks = []
2456        for tid in self.pending_migrations.copy():
2457            if tid in self.runq_running or tid in self.sq_live:
2458                # Too late, task already running, not much we can do now
2459                self.pending_migrations.remove(tid)
2460                continue
2461
2462            valid = True
2463            # Check no tasks this covers are running
2464            for dep in self.sqdata.sq_covered_tasks[tid]:
2465                if dep in self.runq_running and dep not in self.runq_complete:
2466                    hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid))
2467                    valid = False
2468                    break
2469            if not valid:
2470                continue
2471
2472            self.pending_migrations.remove(tid)
2473            changed = True
2474
2475            if tid in self.tasks_scenequeue_done:
2476                self.tasks_scenequeue_done.remove(tid)
2477            for dep in self.sqdata.sq_covered_tasks[tid]:
2478                if dep in self.runq_complete and dep not in self.runq_tasksrun:
2479                    bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep)
2480                    self.failed_tids.append(tid)
2481                    self.rq.state = runQueueCleanUp
2482                    return
2483
2484                if dep not in self.runq_complete:
2485                    if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable:
2486                        self.tasks_scenequeue_done.remove(dep)
2487
2488            if tid in self.sq_buildable:
2489                self.sq_buildable.remove(tid)
2490            if tid in self.sq_running:
2491                self.sq_running.remove(tid)
2492            harddepfail = False
2493            for t in self.sqdata.sq_harddeps:
2494                if tid in self.sqdata.sq_harddeps[t] and t in self.scenequeue_notcovered:
2495                    harddepfail = True
2496                    break
2497            if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2498                if tid not in self.sq_buildable:
2499                    self.sq_buildable.add(tid)
2500            if not self.sqdata.sq_revdeps[tid]:
2501                self.sq_buildable.add(tid)
2502
2503            if tid in self.sqdata.outrightfail:
2504                self.sqdata.outrightfail.remove(tid)
2505            if tid in self.scenequeue_notcovered:
2506                self.scenequeue_notcovered.remove(tid)
2507            if tid in self.scenequeue_covered:
2508                self.scenequeue_covered.remove(tid)
2509            if tid in self.scenequeue_notneeded:
2510                self.scenequeue_notneeded.remove(tid)
2511
2512            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2513            self.sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", self.rqdata.dataCaches[mc], taskfn, noextra=True)
2514
2515            if tid in self.stampcache:
2516                del self.stampcache[tid]
2517
2518            if tid in self.build_stamps:
2519                del self.build_stamps[tid]
2520
2521            update_tasks.append((tid, harddepfail, tid in self.sqdata.valid))
2522
2523        if update_tasks:
2524            self.sqdone = False
2525            for mc in sorted(self.sqdata.multiconfigs):
2526                for tid in sorted([t[0] for t in update_tasks]):
2527                    if mc_from_tid(tid) != mc:
2528                        continue
2529                    h = pending_hash_index(tid, self.rqdata)
2530                    if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]:
2531                        self.sq_deferred[tid] = self.sqdata.hashes[h]
2532                        bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h]))
2533            update_scenequeue_data([t[0] for t in update_tasks], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
2534
2535        for (tid, harddepfail, origvalid) in update_tasks:
2536            if tid in self.sqdata.valid and not origvalid:
2537                hashequiv_logger.verbose("Setscene task %s became valid" % tid)
2538            if harddepfail:
2539                self.sq_task_failoutright(tid)
2540
2541        if changed:
2542            self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
2543            self.holdoff_need_update = True
2544
2545    def scenequeue_updatecounters(self, task, fail=False):
2546
2547        for dep in sorted(self.sqdata.sq_deps[task]):
2548            if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]:
2549                if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered:
2550                    # dependency could be already processed, e.g. noexec setscene task
2551                    continue
2552                noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache)
2553                if noexec or stamppresent:
2554                    continue
2555                logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
2556                self.sq_task_failoutright(dep)
2557                continue
2558            if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2559                if dep not in self.sq_buildable:
2560                    self.sq_buildable.add(dep)
2561
2562        next = set([task])
2563        while next:
2564            new = set()
2565            for t in sorted(next):
2566                self.tasks_scenequeue_done.add(t)
2567                # Look down the dependency chain for non-setscene things which this task depends on
2568                # and mark as 'done'
2569                for dep in self.rqdata.runtaskentries[t].depends:
2570                    if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done:
2571                        continue
2572                    if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done):
2573                        new.add(dep)
2574            next = new
2575
2576        self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
2577        self.holdoff_need_update = True
2578
2579    def sq_task_completeoutright(self, task):
2580        """
2581        Mark a task as completed
2582        Look at the reverse dependencies and mark any task with
2583        completed dependencies as buildable
2584        """
2585
2586        logger.debug('Found task %s which could be accelerated', task)
2587        self.scenequeue_covered.add(task)
2588        self.scenequeue_updatecounters(task)
2589
2590    def sq_check_taskfail(self, task):
2591        if self.rqdata.setscene_ignore_tasks is not None:
2592            realtask = task.split('_setscene')[0]
2593            (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask)
2594            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2595            if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
2596                logger.error('Task %s.%s failed' % (pn, taskname + "_setscene"))
2597                self.rq.state = runQueueCleanUp
2598
2599    def sq_task_complete(self, task):
2600        bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
2601        self.sq_task_completeoutright(task)
2602
2603    def sq_task_fail(self, task, result):
2604        bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
2605        self.scenequeue_notcovered.add(task)
2606        self.scenequeue_updatecounters(task, True)
2607        self.sq_check_taskfail(task)
2608
2609    def sq_task_failoutright(self, task):
2610        self.sq_running.add(task)
2611        self.sq_buildable.add(task)
2612        self.scenequeue_notcovered.add(task)
2613        self.scenequeue_updatecounters(task, True)
2614
2615    def sq_task_skip(self, task):
2616        self.sq_running.add(task)
2617        self.sq_buildable.add(task)
2618        self.sq_task_completeoutright(task)
2619
2620    def sq_build_taskdepdata(self, task):
2621        def getsetscenedeps(tid):
2622            deps = set()
2623            (mc, fn, taskname, _) = split_tid_mcfn(tid)
2624            realtid = tid + "_setscene"
2625            idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends
2626            for (depname, idependtask) in idepends:
2627                if depname not in self.rqdata.taskData[mc].build_targets:
2628                    continue
2629
2630                depfn = self.rqdata.taskData[mc].build_targets[depname][0]
2631                if depfn is None:
2632                     continue
2633                deptid = depfn + ":" + idependtask.replace("_setscene", "")
2634                deps.add(deptid)
2635            return deps
2636
2637        taskdepdata = {}
2638        next = getsetscenedeps(task)
2639        next.add(task)
2640        while next:
2641            additional = []
2642            for revdep in next:
2643                (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2644                pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2645                deps = getsetscenedeps(revdep)
2646                provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
2647                taskhash = self.rqdata.runtaskentries[revdep].hash
2648                unihash = self.rqdata.runtaskentries[revdep].unihash
2649                taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash]
2650                for revdep2 in deps:
2651                    if revdep2 not in taskdepdata:
2652                        additional.append(revdep2)
2653            next = additional
2654
2655        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2656        return taskdepdata
2657
2658    def check_setscene_ignore_tasks(self, tid):
2659        # Check task that is going to run against the ignore tasks list
2660        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2661        # Ignore covered tasks
2662        if tid in self.tasks_covered:
2663            return False
2664        # Ignore stamped tasks
2665        if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache):
2666            return False
2667        # Ignore noexec tasks
2668        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2669        if 'noexec' in taskdep and taskname in taskdep['noexec']:
2670            return False
2671
2672        pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2673        if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
2674            if tid in self.rqdata.runq_setscene_tids:
2675                msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)]
2676            else:
2677                msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)]
2678            for t in self.scenequeue_notcovered:
2679                msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash))
2680            msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered))
2681            logger.error("".join(msg))
2682            return True
2683        return False
2684
2685class SQData(object):
2686    def __init__(self):
2687        # SceneQueue dependencies
2688        self.sq_deps = {}
2689        # SceneQueue reverse dependencies
2690        self.sq_revdeps = {}
2691        # Injected inter-setscene task dependencies
2692        self.sq_harddeps = {}
2693        # Cache of stamp files so duplicates can't run in parallel
2694        self.stamps = {}
2695        # Setscene tasks directly depended upon by the build
2696        self.unskippable = set()
2697        # List of setscene tasks which aren't present
2698        self.outrightfail = set()
2699        # A list of normal tasks a setscene task covers
2700        self.sq_covered_tasks = {}
2701
2702def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
2703
2704    sq_revdeps = {}
2705    sq_revdeps_squash = {}
2706    sq_collated_deps = {}
2707
2708    # We need to construct a dependency graph for the setscene functions. Intermediate
2709    # dependencies between the setscene tasks only complicate the code. This code
2710    # therefore aims to collapse the huge runqueue dependency tree into a smaller one
2711    # only containing the setscene functions.
2712
2713    rqdata.init_progress_reporter.next_stage()
2714
2715    # First process the chains up to the first setscene task.
2716    endpoints = {}
2717    for tid in rqdata.runtaskentries:
2718        sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps)
2719        sq_revdeps_squash[tid] = set()
2720        if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids:
2721            #bb.warn("Added endpoint %s" % (tid))
2722            endpoints[tid] = set()
2723
2724    rqdata.init_progress_reporter.next_stage()
2725
2726    # Secondly process the chains between setscene tasks.
2727    for tid in rqdata.runq_setscene_tids:
2728        sq_collated_deps[tid] = set()
2729        #bb.warn("Added endpoint 2 %s" % (tid))
2730        for dep in rqdata.runtaskentries[tid].depends:
2731                if tid in sq_revdeps[dep]:
2732                    sq_revdeps[dep].remove(tid)
2733                if dep not in endpoints:
2734                    endpoints[dep] = set()
2735                #bb.warn("  Added endpoint 3 %s" % (dep))
2736                endpoints[dep].add(tid)
2737
2738    rqdata.init_progress_reporter.next_stage()
2739
2740    def process_endpoints(endpoints):
2741        newendpoints = {}
2742        for point, task in endpoints.items():
2743            tasks = set()
2744            if task:
2745                tasks |= task
2746            if sq_revdeps_squash[point]:
2747                tasks |= sq_revdeps_squash[point]
2748            if point not in rqdata.runq_setscene_tids:
2749                for t in tasks:
2750                    sq_collated_deps[t].add(point)
2751            sq_revdeps_squash[point] = set()
2752            if point in rqdata.runq_setscene_tids:
2753                sq_revdeps_squash[point] = tasks
2754                continue
2755            for dep in rqdata.runtaskentries[point].depends:
2756                if point in sq_revdeps[dep]:
2757                    sq_revdeps[dep].remove(point)
2758                if tasks:
2759                    sq_revdeps_squash[dep] |= tasks
2760                if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids:
2761                    newendpoints[dep] = task
2762        if newendpoints:
2763            process_endpoints(newendpoints)
2764
2765    process_endpoints(endpoints)
2766
2767    rqdata.init_progress_reporter.next_stage()
2768
2769    # Build a list of tasks which are "unskippable"
2770    # These are direct endpoints referenced by the build upto and including setscene tasks
2771    # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon
2772    new = True
2773    for tid in rqdata.runtaskentries:
2774        if not rqdata.runtaskentries[tid].revdeps:
2775            sqdata.unskippable.add(tid)
2776    sqdata.unskippable |= sqrq.cantskip
2777    while new:
2778        new = False
2779        orig = sqdata.unskippable.copy()
2780        for tid in sorted(orig, reverse=True):
2781            if tid in rqdata.runq_setscene_tids:
2782                continue
2783            if not rqdata.runtaskentries[tid].depends:
2784                # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
2785                sqrq.setbuildable(tid)
2786            sqdata.unskippable |= rqdata.runtaskentries[tid].depends
2787            if sqdata.unskippable != orig:
2788                new = True
2789
2790    sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids)
2791
2792    rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries))
2793
2794    # Sanity check all dependencies could be changed to setscene task references
2795    for taskcounter, tid in enumerate(rqdata.runtaskentries):
2796        if tid in rqdata.runq_setscene_tids:
2797            pass
2798        elif sq_revdeps_squash[tid]:
2799            bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.")
2800        else:
2801            del sq_revdeps_squash[tid]
2802        rqdata.init_progress_reporter.update(taskcounter)
2803
2804    rqdata.init_progress_reporter.next_stage()
2805
2806    # Resolve setscene inter-task dependencies
2807    # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
2808    # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
2809    for tid in rqdata.runq_setscene_tids:
2810        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2811        realtid = tid + "_setscene"
2812        idepends = rqdata.taskData[mc].taskentries[realtid].idepends
2813        sqdata.stamps[tid] = bb.build.stampfile(taskname + "_setscene", rqdata.dataCaches[mc], taskfn, noextra=True)
2814        for (depname, idependtask) in idepends:
2815
2816            if depname not in rqdata.taskData[mc].build_targets:
2817                continue
2818
2819            depfn = rqdata.taskData[mc].build_targets[depname][0]
2820            if depfn is None:
2821                continue
2822            deptid = depfn + ":" + idependtask.replace("_setscene", "")
2823            if deptid not in rqdata.runtaskentries:
2824                bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask))
2825
2826            if not deptid in sqdata.sq_harddeps:
2827                sqdata.sq_harddeps[deptid] = set()
2828            sqdata.sq_harddeps[deptid].add(tid)
2829
2830            sq_revdeps_squash[tid].add(deptid)
2831            # Have to zero this to avoid circular dependencies
2832            sq_revdeps_squash[deptid] = set()
2833
2834    rqdata.init_progress_reporter.next_stage()
2835
2836    for task in sqdata.sq_harddeps:
2837        for dep in sqdata.sq_harddeps[task]:
2838            sq_revdeps_squash[dep].add(task)
2839
2840    rqdata.init_progress_reporter.next_stage()
2841
2842    #for tid in sq_revdeps_squash:
2843    #    data = ""
2844    #    for dep in sq_revdeps_squash[tid]:
2845    #        data = data + "\n   %s" % dep
2846    #    bb.warn("Task %s_setscene: is %s " % (tid, data))
2847
2848    sqdata.sq_revdeps = sq_revdeps_squash
2849    sqdata.sq_covered_tasks = sq_collated_deps
2850
2851    # Build reverse version of revdeps to populate deps structure
2852    for tid in sqdata.sq_revdeps:
2853        sqdata.sq_deps[tid] = set()
2854    for tid in sqdata.sq_revdeps:
2855        for dep in sqdata.sq_revdeps[tid]:
2856            sqdata.sq_deps[dep].add(tid)
2857
2858    rqdata.init_progress_reporter.next_stage()
2859
2860    sqdata.multiconfigs = set()
2861    for tid in sqdata.sq_revdeps:
2862        sqdata.multiconfigs.add(mc_from_tid(tid))
2863        if not sqdata.sq_revdeps[tid]:
2864            sqrq.sq_buildable.add(tid)
2865
2866    rqdata.init_progress_reporter.finish()
2867
2868    sqdata.noexec = set()
2869    sqdata.stamppresent = set()
2870    sqdata.valid = set()
2871
2872    sqdata.hashes = {}
2873    sqrq.sq_deferred = {}
2874    for mc in sorted(sqdata.multiconfigs):
2875        for tid in sorted(sqdata.sq_revdeps):
2876            if mc_from_tid(tid) != mc:
2877                continue
2878            h = pending_hash_index(tid, rqdata)
2879            if h not in sqdata.hashes:
2880                sqdata.hashes[h] = tid
2881            else:
2882                sqrq.sq_deferred[tid] = sqdata.hashes[h]
2883                bb.note("Deferring %s after %s" % (tid, sqdata.hashes[h]))
2884
2885    update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True)
2886
2887    # Compute a list of 'stale' sstate tasks where the current hash does not match the one
2888    # in any stamp files. Pass the list out to metadata as an event.
2889    found = {}
2890    for tid in rqdata.runq_setscene_tids:
2891        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2892        stamps = bb.build.find_stale_stamps(taskname, rqdata.dataCaches[mc], taskfn)
2893        if stamps:
2894            if mc not in found:
2895                found[mc] = {}
2896            found[mc][tid] = stamps
2897    for mc in found:
2898        event = bb.event.StaleSetSceneTasks(found[mc])
2899        bb.event.fire(event, cooker.databuilder.mcdata[mc])
2900
2901def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False):
2902
2903    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2904
2905    taskdep = rqdata.dataCaches[mc].task_deps[taskfn]
2906
2907    if 'noexec' in taskdep and taskname in taskdep['noexec']:
2908        bb.build.make_stamp(taskname + "_setscene", rqdata.dataCaches[mc], taskfn)
2909        return True, False
2910
2911    if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache):
2912        logger.debug2('Setscene stamp current for task %s', tid)
2913        return False, True
2914
2915    if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache):
2916        logger.debug2('Normal stamp current for task %s', tid)
2917        return False, True
2918
2919    return False, False
2920
2921def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True):
2922
2923    tocheck = set()
2924
2925    for tid in sorted(tids):
2926        if tid in sqdata.stamppresent:
2927            sqdata.stamppresent.remove(tid)
2928        if tid in sqdata.valid:
2929            sqdata.valid.remove(tid)
2930        if tid in sqdata.outrightfail:
2931            sqdata.outrightfail.remove(tid)
2932
2933        noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True)
2934
2935        if noexec:
2936            sqdata.noexec.add(tid)
2937            sqrq.sq_task_skip(tid)
2938            continue
2939
2940        if stamppresent:
2941            sqdata.stamppresent.add(tid)
2942            sqrq.sq_task_skip(tid)
2943            continue
2944
2945        tocheck.add(tid)
2946
2947    sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary)
2948
2949    for tid in tids:
2950        if tid in sqdata.stamppresent:
2951            continue
2952        if tid in sqdata.valid:
2953            continue
2954        if tid in sqdata.noexec:
2955            continue
2956        if tid in sqrq.scenequeue_covered:
2957            continue
2958        if tid in sqrq.scenequeue_notcovered:
2959            continue
2960        if tid in sqrq.sq_deferred:
2961            continue
2962        sqdata.outrightfail.add(tid)
2963
2964class TaskFailure(Exception):
2965    """
2966    Exception raised when a task in a runqueue fails
2967    """
2968    def __init__(self, x):
2969        self.args = x
2970
2971
2972class runQueueExitWait(bb.event.Event):
2973    """
2974    Event when waiting for task processes to exit
2975    """
2976
2977    def __init__(self, remain):
2978        self.remain = remain
2979        self.message = "Waiting for %s active tasks to finish" % remain
2980        bb.event.Event.__init__(self)
2981
2982class runQueueEvent(bb.event.Event):
2983    """
2984    Base runQueue event class
2985    """
2986    def __init__(self, task, stats, rq):
2987        self.taskid = task
2988        self.taskstring = task
2989        self.taskname = taskname_from_tid(task)
2990        self.taskfile = fn_from_tid(task)
2991        self.taskhash = rq.rqdata.get_task_hash(task)
2992        self.stats = stats.copy()
2993        bb.event.Event.__init__(self)
2994
2995class sceneQueueEvent(runQueueEvent):
2996    """
2997    Base sceneQueue event class
2998    """
2999    def __init__(self, task, stats, rq, noexec=False):
3000        runQueueEvent.__init__(self, task, stats, rq)
3001        self.taskstring = task + "_setscene"
3002        self.taskname = taskname_from_tid(task) + "_setscene"
3003        self.taskfile = fn_from_tid(task)
3004        self.taskhash = rq.rqdata.get_task_hash(task)
3005
3006class runQueueTaskStarted(runQueueEvent):
3007    """
3008    Event notifying a task was started
3009    """
3010    def __init__(self, task, stats, rq, noexec=False):
3011        runQueueEvent.__init__(self, task, stats, rq)
3012        self.noexec = noexec
3013
3014class sceneQueueTaskStarted(sceneQueueEvent):
3015    """
3016    Event notifying a setscene task was started
3017    """
3018    def __init__(self, task, stats, rq, noexec=False):
3019        sceneQueueEvent.__init__(self, task, stats, rq)
3020        self.noexec = noexec
3021
3022class runQueueTaskFailed(runQueueEvent):
3023    """
3024    Event notifying a task failed
3025    """
3026    def __init__(self, task, stats, exitcode, rq, fakeroot_log=None):
3027        runQueueEvent.__init__(self, task, stats, rq)
3028        self.exitcode = exitcode
3029        self.fakeroot_log = fakeroot_log
3030
3031    def __str__(self):
3032        if self.fakeroot_log:
3033            return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log)
3034        else:
3035            return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode)
3036
3037class sceneQueueTaskFailed(sceneQueueEvent):
3038    """
3039    Event notifying a setscene task failed
3040    """
3041    def __init__(self, task, stats, exitcode, rq):
3042        sceneQueueEvent.__init__(self, task, stats, rq)
3043        self.exitcode = exitcode
3044
3045    def __str__(self):
3046        return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode)
3047
3048class sceneQueueComplete(sceneQueueEvent):
3049    """
3050    Event when all the sceneQueue tasks are complete
3051    """
3052    def __init__(self, stats, rq):
3053        self.stats = stats.copy()
3054        bb.event.Event.__init__(self)
3055
3056class runQueueTaskCompleted(runQueueEvent):
3057    """
3058    Event notifying a task completed
3059    """
3060
3061class sceneQueueTaskCompleted(sceneQueueEvent):
3062    """
3063    Event notifying a setscene task completed
3064    """
3065
3066class runQueueTaskSkipped(runQueueEvent):
3067    """
3068    Event notifying a task was skipped
3069    """
3070    def __init__(self, task, stats, rq, reason):
3071        runQueueEvent.__init__(self, task, stats, rq)
3072        self.reason = reason
3073
3074class taskUniHashUpdate(bb.event.Event):
3075    """
3076    Base runQueue event class
3077    """
3078    def __init__(self, task, unihash):
3079        self.taskid = task
3080        self.unihash = unihash
3081        bb.event.Event.__init__(self)
3082
3083class runQueuePipe():
3084    """
3085    Abstraction for a pipe between a worker thread and the server
3086    """
3087    def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None):
3088        self.input = pipein
3089        if pipeout:
3090            pipeout.close()
3091        bb.utils.nonblockingfd(self.input)
3092        self.queue = b""
3093        self.d = d
3094        self.rq = rq
3095        self.rqexec = rqexec
3096        self.fakerootlogs = fakerootlogs
3097
3098    def setrunqueueexec(self, rqexec):
3099        self.rqexec = rqexec
3100
3101    def read(self):
3102        for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]:
3103            for worker in workers.values():
3104                worker.process.poll()
3105                if worker.process.returncode is not None and not self.rq.teardown:
3106                    bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode)))
3107                    self.rq.finish_runqueue(True)
3108
3109        start = len(self.queue)
3110        try:
3111            self.queue = self.queue + (self.input.read(102400) or b"")
3112        except (OSError, IOError) as e:
3113            if e.errno != errno.EAGAIN:
3114                raise
3115        end = len(self.queue)
3116        found = True
3117        while found and self.queue:
3118            found = False
3119            index = self.queue.find(b"</event>")
3120            while index != -1 and self.queue.startswith(b"<event>"):
3121                try:
3122                    event = pickle.loads(self.queue[7:index])
3123                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3124                    if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e):
3125                        # The pickled data could contain "</event>" so search for the next occurance
3126                        # unpickling again, this should be the only way an unpickle error could occur
3127                        index = self.queue.find(b"</event>", index + 1)
3128                        continue
3129                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
3130                bb.event.fire_from_worker(event, self.d)
3131                if isinstance(event, taskUniHashUpdate):
3132                    self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash))
3133                found = True
3134                self.queue = self.queue[index+8:]
3135                index = self.queue.find(b"</event>")
3136            index = self.queue.find(b"</exitcode>")
3137            while index != -1 and self.queue.startswith(b"<exitcode>"):
3138                try:
3139                    task, status = pickle.loads(self.queue[10:index])
3140                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3141                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
3142                (_, _, _, taskfn) = split_tid_mcfn(task)
3143                fakerootlog = None
3144                if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs:
3145                    fakerootlog = self.fakerootlogs[taskfn]
3146                self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog)
3147                found = True
3148                self.queue = self.queue[index+11:]
3149                index = self.queue.find(b"</exitcode>")
3150        return (end > start)
3151
3152    def close(self):
3153        while self.read():
3154            continue
3155        if self.queue:
3156            print("Warning, worker left partial message: %s" % self.queue)
3157        self.input.close()
3158
3159def get_setscene_enforce_ignore_tasks(d, targets):
3160    if d.getVar('BB_SETSCENE_ENFORCE') != '1':
3161        return None
3162    ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split()
3163    outlist = []
3164    for item in ignore_tasks[:]:
3165        if item.startswith('%:'):
3166            for (mc, target, task, fn) in targets:
3167                outlist.append(target + ':' + item.split(':')[1])
3168        else:
3169            outlist.append(item)
3170    return outlist
3171
3172def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks):
3173    import fnmatch
3174    if ignore_tasks is not None:
3175        item = '%s:%s' % (pn, taskname)
3176        for ignore_tasks in ignore_tasks:
3177            if fnmatch.fnmatch(item, ignore_tasks):
3178                return True
3179        return False
3180    return True
3181