xref: /openbmc/openbmc/poky/bitbake/lib/bb/runqueue.py (revision 220dafdb)
1"""
2BitBake 'RunQueue' implementation
3
4Handles preparation and execution of a queue of tasks
5"""
6
7# Copyright (C) 2006-2007  Richard Purdie
8#
9# SPDX-License-Identifier: GPL-2.0-only
10#
11
12import copy
13import os
14import sys
15import stat
16import errno
17import logging
18import re
19import bb
20from bb import msg, event
21from bb import monitordisk
22import subprocess
23import pickle
24from multiprocessing import Process
25import shlex
26import pprint
27import time
28
29bblogger = logging.getLogger("BitBake")
30logger = logging.getLogger("BitBake.RunQueue")
31hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv")
32
33__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' )
34
35def fn_from_tid(tid):
36     return tid.rsplit(":", 1)[0]
37
38def taskname_from_tid(tid):
39    return tid.rsplit(":", 1)[1]
40
41def mc_from_tid(tid):
42    if tid.startswith('mc:') and tid.count(':') >= 2:
43        return tid.split(':')[1]
44    return ""
45
46def split_tid(tid):
47    (mc, fn, taskname, _) = split_tid_mcfn(tid)
48    return (mc, fn, taskname)
49
50def split_mc(n):
51    if n.startswith("mc:") and n.count(':') >= 2:
52        _, mc, n = n.split(":", 2)
53        return (mc, n)
54    return ('', n)
55
56def split_tid_mcfn(tid):
57    if tid.startswith('mc:') and tid.count(':') >= 2:
58        elems = tid.split(':')
59        mc = elems[1]
60        fn = ":".join(elems[2:-1])
61        taskname = elems[-1]
62        mcfn = "mc:" + mc + ":" + fn
63    else:
64        tid = tid.rsplit(":", 1)
65        mc = ""
66        fn = tid[0]
67        taskname = tid[1]
68        mcfn = fn
69
70    return (mc, fn, taskname, mcfn)
71
72def build_tid(mc, fn, taskname):
73    if mc:
74        return "mc:" + mc + ":" + fn + ":" + taskname
75    return fn + ":" + taskname
76
77# Index used to pair up potentially matching multiconfig tasks
78# We match on PN, taskname and hash being equal
79def pending_hash_index(tid, rqdata):
80    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
81    pn = rqdata.dataCaches[mc].pkg_fn[taskfn]
82    h = rqdata.runtaskentries[tid].unihash
83    return pn + ":" + "taskname" + h
84
85class RunQueueStats:
86    """
87    Holds statistics on the tasks handled by the associated runQueue
88    """
89    def __init__(self, total, setscene_total):
90        self.completed = 0
91        self.skipped = 0
92        self.failed = 0
93        self.active = 0
94        self.setscene_active = 0
95        self.setscene_covered = 0
96        self.setscene_notcovered = 0
97        self.setscene_total = setscene_total
98        self.total = total
99
100    def copy(self):
101        obj = self.__class__(self.total, self.setscene_total)
102        obj.__dict__.update(self.__dict__)
103        return obj
104
105    def taskFailed(self):
106        self.active = self.active - 1
107        self.failed = self.failed + 1
108
109    def taskCompleted(self):
110        self.active = self.active - 1
111        self.completed = self.completed + 1
112
113    def taskSkipped(self):
114        self.active = self.active + 1
115        self.skipped = self.skipped + 1
116
117    def taskActive(self):
118        self.active = self.active + 1
119
120    def updateCovered(self, covered, notcovered):
121        self.setscene_covered = covered
122        self.setscene_notcovered = notcovered
123
124    def updateActiveSetscene(self, active):
125        self.setscene_active = active
126
127# These values indicate the next step due to be run in the
128# runQueue state machine
129runQueuePrepare = 2
130runQueueSceneInit = 3
131runQueueRunning = 6
132runQueueFailed = 7
133runQueueCleanUp = 8
134runQueueComplete = 9
135
136class RunQueueScheduler(object):
137    """
138    Control the order tasks are scheduled in.
139    """
140    name = "basic"
141
142    def __init__(self, runqueue, rqdata):
143        """
144        The default scheduler just returns the first buildable task (the
145        priority map is sorted by task number)
146        """
147        self.rq = runqueue
148        self.rqdata = rqdata
149        self.numTasks = len(self.rqdata.runtaskentries)
150
151        self.prio_map = [self.rqdata.runtaskentries.keys()]
152
153        self.buildable = set()
154        self.skip_maxthread = {}
155        self.stamps = {}
156        for tid in self.rqdata.runtaskentries:
157            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
158            self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
159            if tid in self.rq.runq_buildable:
160                self.buildable.append(tid)
161
162        self.rev_prio_map = None
163        self.is_pressure_usable()
164
165    def is_pressure_usable(self):
166        """
167        If monitoring pressure, return True if pressure files can be open and read. For example
168        openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported)
169        is returned.
170        """
171        if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure:
172            try:
173                with open("/proc/pressure/cpu") as cpu_pressure_fds, \
174                    open("/proc/pressure/io") as io_pressure_fds, \
175                    open("/proc/pressure/memory") as memory_pressure_fds:
176
177                    self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
178                    self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
179                    self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
180                    self.prev_pressure_time = time.time()
181                self.check_pressure = True
182            except:
183                bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure")
184                self.check_pressure = False
185        else:
186            self.check_pressure = False
187
188    def exceeds_max_pressure(self):
189        """
190        Monitor the difference in total pressure at least once per second, if
191        BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold.
192        """
193        if self.check_pressure:
194            with open("/proc/pressure/cpu") as cpu_pressure_fds, \
195                open("/proc/pressure/io") as io_pressure_fds, \
196                open("/proc/pressure/memory") as memory_pressure_fds:
197                # extract "total" from /proc/pressure/{cpu|io}
198                curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
199                curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
200                curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
201                now = time.time()
202                tdiff = now - self.prev_pressure_time
203                psi_accumulation_interval = 1.0
204                cpu_pressure = (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) / tdiff
205                io_pressure = (float(curr_io_pressure) - float(self.prev_io_pressure)) / tdiff
206                memory_pressure = (float(curr_memory_pressure) - float(self.prev_memory_pressure)) / tdiff
207                exceeds_cpu_pressure =  self.rq.max_cpu_pressure and cpu_pressure > self.rq.max_cpu_pressure
208                exceeds_io_pressure =  self.rq.max_io_pressure and io_pressure > self.rq.max_io_pressure
209                exceeds_memory_pressure =  self.rq.max_memory_pressure and memory_pressure > self.rq.max_memory_pressure
210
211                if tdiff > psi_accumulation_interval:
212                    self.prev_cpu_pressure = curr_cpu_pressure
213                    self.prev_io_pressure = curr_io_pressure
214                    self.prev_memory_pressure = curr_memory_pressure
215                    self.prev_pressure_time = now
216
217            pressure_state = (exceeds_cpu_pressure, exceeds_io_pressure, exceeds_memory_pressure)
218            pressure_values = (round(cpu_pressure,1), self.rq.max_cpu_pressure, round(io_pressure,1), self.rq.max_io_pressure, round(memory_pressure,1), self.rq.max_memory_pressure)
219            if hasattr(self, "pressure_state") and pressure_state != self.pressure_state:
220                bb.note("Pressure status changed to CPU: %s, IO: %s, Mem: %s (CPU: %s/%s, IO: %s/%s, Mem: %s/%s) - using %s/%s bitbake threads" % (pressure_state + pressure_values + (len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks)))
221            self.pressure_state = pressure_state
222            return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure)
223        return False
224
225    def next_buildable_task(self):
226        """
227        Return the id of the first task we find that is buildable
228        """
229        # Once tasks are running we don't need to worry about them again
230        self.buildable.difference_update(self.rq.runq_running)
231        buildable = set(self.buildable)
232        buildable.difference_update(self.rq.holdoff_tasks)
233        buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered)
234        if not buildable:
235            return None
236
237        # Bitbake requires that at least one task be active. Only check for pressure if
238        # this is the case, otherwise the pressure limitation could result in no tasks
239        # being active and no new tasks started thereby, at times, breaking the scheduler.
240        if self.rq.stats.active and self.exceeds_max_pressure():
241            return None
242
243        # Filter out tasks that have a max number of threads that have been exceeded
244        skip_buildable = {}
245        for running in self.rq.runq_running.difference(self.rq.runq_complete):
246            rtaskname = taskname_from_tid(running)
247            if rtaskname not in self.skip_maxthread:
248                self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads")
249            if not self.skip_maxthread[rtaskname]:
250                continue
251            if rtaskname in skip_buildable:
252                skip_buildable[rtaskname] += 1
253            else:
254                skip_buildable[rtaskname] = 1
255
256        if len(buildable) == 1:
257            tid = buildable.pop()
258            taskname = taskname_from_tid(tid)
259            if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
260                return None
261            stamp = self.stamps[tid]
262            if stamp not in self.rq.build_stamps.values():
263                return tid
264
265        if not self.rev_prio_map:
266            self.rev_prio_map = {}
267            for tid in self.rqdata.runtaskentries:
268                self.rev_prio_map[tid] = self.prio_map.index(tid)
269
270        best = None
271        bestprio = None
272        for tid in buildable:
273            taskname = taskname_from_tid(tid)
274            if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
275                continue
276            prio = self.rev_prio_map[tid]
277            if bestprio is None or bestprio > prio:
278                stamp = self.stamps[tid]
279                if stamp in self.rq.build_stamps.values():
280                    continue
281                bestprio = prio
282                best = tid
283
284        return best
285
286    def next(self):
287        """
288        Return the id of the task we should build next
289        """
290        if self.rq.can_start_task():
291            return self.next_buildable_task()
292
293    def newbuildable(self, task):
294        self.buildable.add(task)
295
296    def removebuildable(self, task):
297        self.buildable.remove(task)
298
299    def describe_task(self, taskid):
300        result = 'ID %s' % taskid
301        if self.rev_prio_map:
302            result = result + (' pri %d' % self.rev_prio_map[taskid])
303        return result
304
305    def dump_prio(self, comment):
306        bb.debug(3, '%s (most important first):\n%s' %
307                 (comment,
308                  '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for
309                             index, taskid in enumerate(self.prio_map)])))
310
311class RunQueueSchedulerSpeed(RunQueueScheduler):
312    """
313    A scheduler optimised for speed. The priority map is sorted by task weight,
314    heavier weighted tasks (tasks needed by the most other tasks) are run first.
315    """
316    name = "speed"
317
318    def __init__(self, runqueue, rqdata):
319        """
320        The priority map is sorted by task weight.
321        """
322        RunQueueScheduler.__init__(self, runqueue, rqdata)
323
324        weights = {}
325        for tid in self.rqdata.runtaskentries:
326            weight = self.rqdata.runtaskentries[tid].weight
327            if not weight in weights:
328                weights[weight] = []
329            weights[weight].append(tid)
330
331        self.prio_map = []
332        for weight in sorted(weights):
333            for w in weights[weight]:
334                self.prio_map.append(w)
335
336        self.prio_map.reverse()
337
338class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
339    """
340    A scheduler optimised to complete .bb files as quickly as possible. The
341    priority map is sorted by task weight, but then reordered so once a given
342    .bb file starts to build, it's completed as quickly as possible by
343    running all tasks related to the same .bb file one after the after.
344    This works well where disk space is at a premium and classes like OE's
345    rm_work are in force.
346    """
347    name = "completion"
348
349    def __init__(self, runqueue, rqdata):
350        super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata)
351
352        # Extract list of tasks for each recipe, with tasks sorted
353        # ascending from "must run first" (typically do_fetch) to
354        # "runs last" (do_build). The speed scheduler prioritizes
355        # tasks that must run first before the ones that run later;
356        # this is what we depend on here.
357        task_lists = {}
358        for taskid in self.prio_map:
359            fn, taskname = taskid.rsplit(':', 1)
360            task_lists.setdefault(fn, []).append(taskname)
361
362        # Now unify the different task lists. The strategy is that
363        # common tasks get skipped and new ones get inserted after the
364        # preceeding common one(s) as they are found. Because task
365        # lists should differ only by their number of tasks, but not
366        # the ordering of the common tasks, this should result in a
367        # deterministic result that is a superset of the individual
368        # task ordering.
369        all_tasks = []
370        for recipe, new_tasks in task_lists.items():
371            index = 0
372            old_task = all_tasks[index] if index < len(all_tasks) else None
373            for new_task in new_tasks:
374                if old_task == new_task:
375                    # Common task, skip it. This is the fast-path which
376                    # avoids a full search.
377                    index += 1
378                    old_task = all_tasks[index] if index < len(all_tasks) else None
379                else:
380                    try:
381                        index = all_tasks.index(new_task)
382                        # Already present, just not at the current
383                        # place. We re-synchronized by changing the
384                        # index so that it matches again. Now
385                        # move on to the next existing task.
386                        index += 1
387                        old_task = all_tasks[index] if index < len(all_tasks) else None
388                    except ValueError:
389                        # Not present. Insert before old_task, which
390                        # remains the same (but gets shifted back).
391                        all_tasks.insert(index, new_task)
392                        index += 1
393        bb.debug(3, 'merged task list: %s'  % all_tasks)
394
395        # Now reverse the order so that tasks that finish the work on one
396        # recipe are considered more imporant (= come first). The ordering
397        # is now so that do_build is most important.
398        all_tasks.reverse()
399
400        # Group tasks of the same kind before tasks of less important
401        # kinds at the head of the queue (because earlier = lower
402        # priority number = runs earlier), while preserving the
403        # ordering by recipe. If recipe foo is more important than
404        # bar, then the goal is to work on foo's do_populate_sysroot
405        # before bar's do_populate_sysroot and on the more important
406        # tasks of foo before any of the less important tasks in any
407        # other recipe (if those other recipes are more important than
408        # foo).
409        #
410        # All of this only applies when tasks are runable. Explicit
411        # dependencies still override this ordering by priority.
412        #
413        # Here's an example why this priority re-ordering helps with
414        # minimizing disk usage. Consider a recipe foo with a higher
415        # priority than bar where foo DEPENDS on bar. Then the
416        # implicit rule (from base.bbclass) is that foo's do_configure
417        # depends on bar's do_populate_sysroot. This ensures that
418        # bar's do_populate_sysroot gets done first. Normally the
419        # tasks from foo would continue to run once that is done, and
420        # bar only gets completed and cleaned up later. By ordering
421        # bar's task that depend on bar's do_populate_sysroot before foo's
422        # do_configure, that problem gets avoided.
423        task_index = 0
424        self.dump_prio('original priorities')
425        for task in all_tasks:
426            for index in range(task_index, self.numTasks):
427                taskid = self.prio_map[index]
428                taskname = taskid.rsplit(':', 1)[1]
429                if taskname == task:
430                    del self.prio_map[index]
431                    self.prio_map.insert(task_index, taskid)
432                    task_index += 1
433        self.dump_prio('completion priorities')
434
435class RunTaskEntry(object):
436    def __init__(self):
437        self.depends = set()
438        self.revdeps = set()
439        self.hash = None
440        self.unihash = None
441        self.task = None
442        self.weight = 1
443
444class RunQueueData:
445    """
446    BitBake Run Queue implementation
447    """
448    def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets):
449        self.cooker = cooker
450        self.dataCaches = dataCaches
451        self.taskData = taskData
452        self.targets = targets
453        self.rq = rq
454        self.warn_multi_bb = False
455
456        self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split()
457        self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets)
458        self.setscene_ignore_tasks_checked = False
459        self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1")
460        self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter()
461
462        self.reset()
463
464    def reset(self):
465        self.runtaskentries = {}
466
467    def runq_depends_names(self, ids):
468        import re
469        ret = []
470        for id in ids:
471            nam = os.path.basename(id)
472            nam = re.sub("_[^,]*,", ",", nam)
473            ret.extend([nam])
474        return ret
475
476    def get_task_hash(self, tid):
477        return self.runtaskentries[tid].hash
478
479    def get_task_unihash(self, tid):
480        return self.runtaskentries[tid].unihash
481
482    def get_user_idstring(self, tid, task_name_suffix = ""):
483        return tid + task_name_suffix
484
485    def get_short_user_idstring(self, task, task_name_suffix = ""):
486        (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
487        pn = self.dataCaches[mc].pkg_fn[taskfn]
488        taskname = taskname_from_tid(task) + task_name_suffix
489        return "%s:%s" % (pn, taskname)
490
491    def circular_depchains_handler(self, tasks):
492        """
493        Some tasks aren't buildable, likely due to circular dependency issues.
494        Identify the circular dependencies and print them in a user readable format.
495        """
496        from copy import deepcopy
497
498        valid_chains = []
499        explored_deps = {}
500        msgs = []
501
502        class TooManyLoops(Exception):
503            pass
504
505        def chain_reorder(chain):
506            """
507            Reorder a dependency chain so the lowest task id is first
508            """
509            lowest = 0
510            new_chain = []
511            for entry in range(len(chain)):
512                if chain[entry] < chain[lowest]:
513                    lowest = entry
514            new_chain.extend(chain[lowest:])
515            new_chain.extend(chain[:lowest])
516            return new_chain
517
518        def chain_compare_equal(chain1, chain2):
519            """
520            Compare two dependency chains and see if they're the same
521            """
522            if len(chain1) != len(chain2):
523                return False
524            for index in range(len(chain1)):
525                if chain1[index] != chain2[index]:
526                    return False
527            return True
528
529        def chain_array_contains(chain, chain_array):
530            """
531            Return True if chain_array contains chain
532            """
533            for ch in chain_array:
534                if chain_compare_equal(ch, chain):
535                    return True
536            return False
537
538        def find_chains(tid, prev_chain):
539            prev_chain.append(tid)
540            total_deps = []
541            total_deps.extend(self.runtaskentries[tid].revdeps)
542            for revdep in self.runtaskentries[tid].revdeps:
543                if revdep in prev_chain:
544                    idx = prev_chain.index(revdep)
545                    # To prevent duplicates, reorder the chain to start with the lowest taskid
546                    # and search through an array of those we've already printed
547                    chain = prev_chain[idx:]
548                    new_chain = chain_reorder(chain)
549                    if not chain_array_contains(new_chain, valid_chains):
550                        valid_chains.append(new_chain)
551                        msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
552                        for dep in new_chain:
553                            msgs.append("  Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends)))
554                        msgs.append("\n")
555                    if len(valid_chains) > 10:
556                        msgs.append("Halted dependency loops search after 10 matches.\n")
557                        raise TooManyLoops
558                    continue
559                scan = False
560                if revdep not in explored_deps:
561                    scan = True
562                elif revdep in explored_deps[revdep]:
563                    scan = True
564                else:
565                    for dep in prev_chain:
566                        if dep in explored_deps[revdep]:
567                            scan = True
568                if scan:
569                    find_chains(revdep, copy.deepcopy(prev_chain))
570                for dep in explored_deps[revdep]:
571                    if dep not in total_deps:
572                        total_deps.append(dep)
573
574            explored_deps[tid] = total_deps
575
576        try:
577            for task in tasks:
578                find_chains(task, [])
579        except TooManyLoops:
580            pass
581
582        return msgs
583
584    def calculate_task_weights(self, endpoints):
585        """
586        Calculate a number representing the "weight" of each task. Heavier weighted tasks
587        have more dependencies and hence should be executed sooner for maximum speed.
588
589        This function also sanity checks the task list finding tasks that are not
590        possible to execute due to circular dependencies.
591        """
592
593        numTasks = len(self.runtaskentries)
594        weight = {}
595        deps_left = {}
596        task_done = {}
597
598        for tid in self.runtaskentries:
599            task_done[tid] = False
600            weight[tid] = 1
601            deps_left[tid] = len(self.runtaskentries[tid].revdeps)
602
603        for tid in endpoints:
604            weight[tid] = 10
605            task_done[tid] = True
606
607        while True:
608            next_points = []
609            for tid in endpoints:
610                for revdep in self.runtaskentries[tid].depends:
611                    weight[revdep] = weight[revdep] + weight[tid]
612                    deps_left[revdep] = deps_left[revdep] - 1
613                    if deps_left[revdep] == 0:
614                        next_points.append(revdep)
615                        task_done[revdep] = True
616            endpoints = next_points
617            if not next_points:
618                break
619
620        # Circular dependency sanity check
621        problem_tasks = []
622        for tid in self.runtaskentries:
623            if task_done[tid] is False or deps_left[tid] != 0:
624                problem_tasks.append(tid)
625                logger.debug2("Task %s is not buildable", tid)
626                logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid])
627            self.runtaskentries[tid].weight = weight[tid]
628
629        if problem_tasks:
630            message = "%s unbuildable tasks were found.\n" % len(problem_tasks)
631            message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
632            message = message + "Identifying dependency loops (this may take a short while)...\n"
633            logger.error(message)
634
635            msgs = self.circular_depchains_handler(problem_tasks)
636
637            message = "\n"
638            for msg in msgs:
639                message = message + msg
640            bb.msg.fatal("RunQueue", message)
641
642        return weight
643
644    def prepare(self):
645        """
646        Turn a set of taskData into a RunQueue and compute data needed
647        to optimise the execution order.
648        """
649
650        runq_build = {}
651        recursivetasks = {}
652        recursiveitasks = {}
653        recursivetasksselfref = set()
654
655        taskData = self.taskData
656
657        found = False
658        for mc in self.taskData:
659            if taskData[mc].taskentries:
660                found = True
661                break
662        if not found:
663            # Nothing to do
664            return 0
665
666        bb.parse.siggen.setup_datacache(self.dataCaches)
667
668        self.init_progress_reporter.start()
669        self.init_progress_reporter.next_stage()
670        bb.event.check_for_interrupts(self.cooker.data)
671
672        # Step A - Work out a list of tasks to run
673        #
674        # Taskdata gives us a list of possible providers for every build and run
675        # target ordered by priority. It also gives information on each of those
676        # providers.
677        #
678        # To create the actual list of tasks to execute we fix the list of
679        # providers and then resolve the dependencies into task IDs. This
680        # process is repeated for each type of dependency (tdepends, deptask,
681        # rdeptast, recrdeptask, idepends).
682
683        def add_build_dependencies(depids, tasknames, depends, mc):
684            for depname in depids:
685                # Won't be in build_targets if ASSUME_PROVIDED
686                if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]:
687                    continue
688                depdata = taskData[mc].build_targets[depname][0]
689                if depdata is None:
690                    continue
691                for taskname in tasknames:
692                    t = depdata + ":" + taskname
693                    if t in taskData[mc].taskentries:
694                        depends.add(t)
695
696        def add_runtime_dependencies(depids, tasknames, depends, mc):
697            for depname in depids:
698                if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]:
699                    continue
700                depdata = taskData[mc].run_targets[depname][0]
701                if depdata is None:
702                    continue
703                for taskname in tasknames:
704                    t = depdata + ":" + taskname
705                    if t in taskData[mc].taskentries:
706                        depends.add(t)
707
708        def add_mc_dependencies(mc, tid):
709            mcdeps = taskData[mc].get_mcdepends()
710            for dep in mcdeps:
711                mcdependency = dep.split(':')
712                pn = mcdependency[3]
713                frommc = mcdependency[1]
714                mcdep = mcdependency[2]
715                deptask = mcdependency[4]
716                if mcdep not in taskData:
717                    bb.fatal("Multiconfig '%s' is referenced in multiconfig dependency '%s' but not enabled in BBMULTICONFIG?" % (mcdep, dep))
718                if mc == frommc:
719                    fn = taskData[mcdep].build_targets[pn][0]
720                    newdep = '%s:%s' % (fn,deptask)
721                    taskData[mc].taskentries[tid].tdepends.append(newdep)
722
723        for mc in taskData:
724            for tid in taskData[mc].taskentries:
725
726                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
727                #runtid = build_tid(mc, fn, taskname)
728
729                #logger.debug2("Processing %s,%s:%s", mc, fn, taskname)
730
731                depends = set()
732                task_deps = self.dataCaches[mc].task_deps[taskfn]
733
734                self.runtaskentries[tid] = RunTaskEntry()
735
736                if fn in taskData[mc].failed_fns:
737                    continue
738
739                # We add multiconfig dependencies before processing internal task deps (tdepends)
740                if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']:
741                    add_mc_dependencies(mc, tid)
742
743                # Resolve task internal dependencies
744                #
745                # e.g. addtask before X after Y
746                for t in taskData[mc].taskentries[tid].tdepends:
747                    (depmc, depfn, deptaskname, _) = split_tid_mcfn(t)
748                    depends.add(build_tid(depmc, depfn, deptaskname))
749
750                # Resolve 'deptask' dependencies
751                #
752                # e.g. do_sometask[deptask] = "do_someothertask"
753                # (makes sure sometask runs after someothertask of all DEPENDS)
754                if 'deptask' in task_deps and taskname in task_deps['deptask']:
755                    tasknames = task_deps['deptask'][taskname].split()
756                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
757
758                # Resolve 'rdeptask' dependencies
759                #
760                # e.g. do_sometask[rdeptask] = "do_someothertask"
761                # (makes sure sometask runs after someothertask of all RDEPENDS)
762                if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']:
763                    tasknames = task_deps['rdeptask'][taskname].split()
764                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
765
766                # Resolve inter-task dependencies
767                #
768                # e.g. do_sometask[depends] = "targetname:do_someothertask"
769                # (makes sure sometask runs after targetname's someothertask)
770                idepends = taskData[mc].taskentries[tid].idepends
771                for (depname, idependtask) in idepends:
772                    if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps:
773                        # Won't be in build_targets if ASSUME_PROVIDED
774                        depdata = taskData[mc].build_targets[depname][0]
775                        if depdata is not None:
776                            t = depdata + ":" + idependtask
777                            depends.add(t)
778                            if t not in taskData[mc].taskentries:
779                                bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
780                irdepends = taskData[mc].taskentries[tid].irdepends
781                for (depname, idependtask) in irdepends:
782                    if depname in taskData[mc].run_targets:
783                        # Won't be in run_targets if ASSUME_PROVIDED
784                        if not taskData[mc].run_targets[depname]:
785                            continue
786                        depdata = taskData[mc].run_targets[depname][0]
787                        if depdata is not None:
788                            t = depdata + ":" + idependtask
789                            depends.add(t)
790                            if t not in taskData[mc].taskentries:
791                                bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
792
793                # Resolve recursive 'recrdeptask' dependencies (Part A)
794                #
795                # e.g. do_sometask[recrdeptask] = "do_someothertask"
796                # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
797                # We cover the recursive part of the dependencies below
798                if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']:
799                    tasknames = task_deps['recrdeptask'][taskname].split()
800                    recursivetasks[tid] = tasknames
801                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
802                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
803                    if taskname in tasknames:
804                        recursivetasksselfref.add(tid)
805
806                    if 'recideptask' in task_deps and taskname in task_deps['recideptask']:
807                        recursiveitasks[tid] = []
808                        for t in task_deps['recideptask'][taskname].split():
809                            newdep = build_tid(mc, fn, t)
810                            recursiveitasks[tid].append(newdep)
811
812                self.runtaskentries[tid].depends = depends
813                # Remove all self references
814                self.runtaskentries[tid].depends.discard(tid)
815
816        #self.dump_data()
817
818        self.init_progress_reporter.next_stage()
819        bb.event.check_for_interrupts(self.cooker.data)
820
821        # Resolve recursive 'recrdeptask' dependencies (Part B)
822        #
823        # e.g. do_sometask[recrdeptask] = "do_someothertask"
824        # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
825        # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed
826
827        # Generating/interating recursive lists of dependencies is painful and potentially slow
828        # Precompute recursive task dependencies here by:
829        #     a) create a temp list of reverse dependencies (revdeps)
830        #     b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0)
831        #     c) combine the total list of dependencies in cumulativedeps
832        #     d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower)
833
834
835        revdeps = {}
836        deps = {}
837        cumulativedeps = {}
838        for tid in self.runtaskentries:
839            deps[tid] = set(self.runtaskentries[tid].depends)
840            revdeps[tid] = set()
841            cumulativedeps[tid] = set()
842        # Generate a temp list of reverse dependencies
843        for tid in self.runtaskentries:
844            for dep in self.runtaskentries[tid].depends:
845                revdeps[dep].add(tid)
846        # Find the dependency chain endpoints
847        endpoints = set()
848        for tid in self.runtaskentries:
849            if not deps[tid]:
850                endpoints.add(tid)
851        # Iterate the chains collating dependencies
852        while endpoints:
853            next = set()
854            for tid in endpoints:
855                for dep in revdeps[tid]:
856                    cumulativedeps[dep].add(fn_from_tid(tid))
857                    cumulativedeps[dep].update(cumulativedeps[tid])
858                    if tid in deps[dep]:
859                        deps[dep].remove(tid)
860                    if not deps[dep]:
861                        next.add(dep)
862            endpoints = next
863        #for tid in deps:
864        #    if deps[tid]:
865        #        bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid]))
866
867        # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to
868        # resolve these recursively until we aren't adding any further extra dependencies
869        extradeps = True
870        while extradeps:
871            extradeps = 0
872            for tid in recursivetasks:
873                tasknames = recursivetasks[tid]
874
875                totaldeps = set(self.runtaskentries[tid].depends)
876                if tid in recursiveitasks:
877                    totaldeps.update(recursiveitasks[tid])
878                    for dep in recursiveitasks[tid]:
879                        if dep not in self.runtaskentries:
880                            continue
881                        totaldeps.update(self.runtaskentries[dep].depends)
882
883                deps = set()
884                for dep in totaldeps:
885                    if dep in cumulativedeps:
886                        deps.update(cumulativedeps[dep])
887
888                for t in deps:
889                    for taskname in tasknames:
890                        newtid = t + ":" + taskname
891                        if newtid == tid:
892                            continue
893                        if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends:
894                            extradeps += 1
895                            self.runtaskentries[tid].depends.add(newtid)
896
897                # Handle recursive tasks which depend upon other recursive tasks
898                deps = set()
899                for dep in self.runtaskentries[tid].depends.intersection(recursivetasks):
900                    deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends))
901                for newtid in deps:
902                    for taskname in tasknames:
903                        if not newtid.endswith(":" + taskname):
904                            continue
905                        if newtid in self.runtaskentries:
906                            extradeps += 1
907                            self.runtaskentries[tid].depends.add(newtid)
908
909            bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps)
910
911        # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work
912        for tid in recursivetasksselfref:
913            self.runtaskentries[tid].depends.difference_update(recursivetasksselfref)
914
915        self.init_progress_reporter.next_stage()
916        bb.event.check_for_interrupts(self.cooker.data)
917
918        #self.dump_data()
919
920        # Step B - Mark all active tasks
921        #
922        # Start with the tasks we were asked to run and mark all dependencies
923        # as active too. If the task is to be 'forced', clear its stamp. Once
924        # all active tasks are marked, prune the ones we don't need.
925
926        logger.verbose("Marking Active Tasks")
927
928        def mark_active(tid, depth):
929            """
930            Mark an item as active along with its depends
931            (calls itself recursively)
932            """
933
934            if tid in runq_build:
935                return
936
937            runq_build[tid] = 1
938
939            depends = self.runtaskentries[tid].depends
940            for depend in depends:
941                mark_active(depend, depth+1)
942
943        def invalidate_task(tid, error_nostamp):
944            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
945            taskdep = self.dataCaches[mc].task_deps[taskfn]
946            if fn + ":" + taskname not in taskData[mc].taskentries:
947                logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname)
948            if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
949                if error_nostamp:
950                    bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
951                else:
952                    bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
953            else:
954                logger.verbose("Invalidate task %s, %s", taskname, fn)
955                bb.parse.siggen.invalidate_task(taskname, taskfn)
956
957        self.target_tids = []
958        for (mc, target, task, fn) in self.targets:
959
960            if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]:
961                continue
962
963            if target in taskData[mc].failed_deps:
964                continue
965
966            parents = False
967            if task.endswith('-'):
968                parents = True
969                task = task[:-1]
970
971            if fn in taskData[mc].failed_fns:
972                continue
973
974            # fn already has mc prefix
975            tid = fn + ":" + task
976            self.target_tids.append(tid)
977            if tid not in taskData[mc].taskentries:
978                import difflib
979                tasks = []
980                for x in taskData[mc].taskentries:
981                    if x.startswith(fn + ":"):
982                        tasks.append(taskname_from_tid(x))
983                close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7)
984                if close_matches:
985                    extra = ". Close matches:\n  %s" % "\n  ".join(close_matches)
986                else:
987                    extra = ""
988                bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra))
989
990            # For tasks called "XXXX-", ony run their dependencies
991            if parents:
992                for i in self.runtaskentries[tid].depends:
993                    mark_active(i, 1)
994            else:
995                mark_active(tid, 1)
996
997        self.init_progress_reporter.next_stage()
998        bb.event.check_for_interrupts(self.cooker.data)
999
1000        # Step C - Prune all inactive tasks
1001        #
1002        # Once all active tasks are marked, prune the ones we don't need.
1003
1004        # Handle --runall
1005        if self.cooker.configuration.runall:
1006            # re-run the mark_active and then drop unused tasks from new list
1007            reduced_tasklist = set(self.runtaskentries.keys())
1008            for tid in list(self.runtaskentries.keys()):
1009                if tid not in runq_build:
1010                   reduced_tasklist.remove(tid)
1011            runq_build = {}
1012
1013            for task in self.cooker.configuration.runall:
1014                if not task.startswith("do_"):
1015                    task = "do_{0}".format(task)
1016                runall_tids = set()
1017                for tid in reduced_tasklist:
1018                    wanttid = "{0}:{1}".format(fn_from_tid(tid), task)
1019                    if wanttid in self.runtaskentries:
1020                        runall_tids.add(wanttid)
1021
1022                for tid in list(runall_tids):
1023                    mark_active(tid, 1)
1024                    if self.cooker.configuration.force:
1025                        invalidate_task(tid, False)
1026
1027        delcount = set()
1028        for tid in list(self.runtaskentries.keys()):
1029            if tid not in runq_build:
1030                delcount.add(tid)
1031                del self.runtaskentries[tid]
1032
1033        if self.cooker.configuration.runall:
1034            if not self.runtaskentries:
1035                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets)))
1036
1037        self.init_progress_reporter.next_stage()
1038        bb.event.check_for_interrupts(self.cooker.data)
1039
1040        # Handle runonly
1041        if self.cooker.configuration.runonly:
1042            # re-run the mark_active and then drop unused tasks from new list
1043            runq_build = {}
1044
1045            for task in self.cooker.configuration.runonly:
1046                if not task.startswith("do_"):
1047                    task = "do_{0}".format(task)
1048                runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task]
1049
1050                for tid in runonly_tids:
1051                    mark_active(tid, 1)
1052                    if self.cooker.configuration.force:
1053                        invalidate_task(tid, False)
1054
1055            for tid in list(self.runtaskentries.keys()):
1056                if tid not in runq_build:
1057                    delcount.add(tid)
1058                    del self.runtaskentries[tid]
1059
1060            if not self.runtaskentries:
1061                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets)))
1062
1063        #
1064        # Step D - Sanity checks and computation
1065        #
1066
1067        # Check to make sure we still have tasks to run
1068        if not self.runtaskentries:
1069            if not taskData[''].halt:
1070                bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
1071            else:
1072                bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
1073
1074        logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries))
1075
1076        logger.verbose("Assign Weightings")
1077
1078        self.init_progress_reporter.next_stage()
1079        bb.event.check_for_interrupts(self.cooker.data)
1080
1081        # Generate a list of reverse dependencies to ease future calculations
1082        for tid in self.runtaskentries:
1083            for dep in self.runtaskentries[tid].depends:
1084                self.runtaskentries[dep].revdeps.add(tid)
1085
1086        self.init_progress_reporter.next_stage()
1087        bb.event.check_for_interrupts(self.cooker.data)
1088
1089        # Identify tasks at the end of dependency chains
1090        # Error on circular dependency loops (length two)
1091        endpoints = []
1092        for tid in self.runtaskentries:
1093            revdeps = self.runtaskentries[tid].revdeps
1094            if not revdeps:
1095                endpoints.append(tid)
1096            for dep in revdeps:
1097                if dep in self.runtaskentries[tid].depends:
1098                    bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep))
1099
1100
1101        logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
1102
1103        self.init_progress_reporter.next_stage()
1104        bb.event.check_for_interrupts(self.cooker.data)
1105
1106        # Calculate task weights
1107        # Check of higher length circular dependencies
1108        self.runq_weight = self.calculate_task_weights(endpoints)
1109
1110        self.init_progress_reporter.next_stage()
1111        bb.event.check_for_interrupts(self.cooker.data)
1112
1113        # Sanity Check - Check for multiple tasks building the same provider
1114        for mc in self.dataCaches:
1115            prov_list = {}
1116            seen_fn = []
1117            for tid in self.runtaskentries:
1118                (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1119                if taskfn in seen_fn:
1120                    continue
1121                if mc != tidmc:
1122                    continue
1123                seen_fn.append(taskfn)
1124                for prov in self.dataCaches[mc].fn_provides[taskfn]:
1125                    if prov not in prov_list:
1126                        prov_list[prov] = [taskfn]
1127                    elif taskfn not in prov_list[prov]:
1128                        prov_list[prov].append(taskfn)
1129            for prov in prov_list:
1130                if len(prov_list[prov]) < 2:
1131                    continue
1132                if prov in self.multi_provider_allowed:
1133                    continue
1134                seen_pn = []
1135                # If two versions of the same PN are being built its fatal, we don't support it.
1136                for fn in prov_list[prov]:
1137                    pn = self.dataCaches[mc].pkg_fn[fn]
1138                    if pn not in seen_pn:
1139                        seen_pn.append(pn)
1140                    else:
1141                        bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
1142                msgs = ["Multiple .bb files are due to be built which each provide %s:\n  %s" % (prov, "\n  ".join(prov_list[prov]))]
1143                #
1144                # Construct a list of things which uniquely depend on each provider
1145                # since this may help the user figure out which dependency is triggering this warning
1146                #
1147                msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.")
1148                deplist = {}
1149                commondeps = None
1150                for provfn in prov_list[prov]:
1151                    deps = set()
1152                    for tid in self.runtaskentries:
1153                        fn = fn_from_tid(tid)
1154                        if fn != provfn:
1155                            continue
1156                        for dep in self.runtaskentries[tid].revdeps:
1157                            fn = fn_from_tid(dep)
1158                            if fn == provfn:
1159                                continue
1160                            deps.add(dep)
1161                    if not commondeps:
1162                        commondeps = set(deps)
1163                    else:
1164                        commondeps &= deps
1165                    deplist[provfn] = deps
1166                for provfn in deplist:
1167                    msgs.append("\n%s has unique dependees:\n  %s" % (provfn, "\n  ".join(deplist[provfn] - commondeps)))
1168                #
1169                # Construct a list of provides and runtime providers for each recipe
1170                # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC)
1171                #
1172                msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.")
1173                provide_results = {}
1174                rprovide_results = {}
1175                commonprovs = None
1176                commonrprovs = None
1177                for provfn in prov_list[prov]:
1178                    provides = set(self.dataCaches[mc].fn_provides[provfn])
1179                    rprovides = set()
1180                    for rprovide in self.dataCaches[mc].rproviders:
1181                        if provfn in self.dataCaches[mc].rproviders[rprovide]:
1182                            rprovides.add(rprovide)
1183                    for package in self.dataCaches[mc].packages:
1184                        if provfn in self.dataCaches[mc].packages[package]:
1185                            rprovides.add(package)
1186                    for package in self.dataCaches[mc].packages_dynamic:
1187                        if provfn in self.dataCaches[mc].packages_dynamic[package]:
1188                            rprovides.add(package)
1189                    if not commonprovs:
1190                        commonprovs = set(provides)
1191                    else:
1192                        commonprovs &= provides
1193                    provide_results[provfn] = provides
1194                    if not commonrprovs:
1195                        commonrprovs = set(rprovides)
1196                    else:
1197                        commonrprovs &= rprovides
1198                    rprovide_results[provfn] = rprovides
1199                #msgs.append("\nCommon provides:\n  %s" % ("\n  ".join(commonprovs)))
1200                #msgs.append("\nCommon rprovides:\n  %s" % ("\n  ".join(commonrprovs)))
1201                for provfn in prov_list[prov]:
1202                    msgs.append("\n%s has unique provides:\n  %s" % (provfn, "\n  ".join(provide_results[provfn] - commonprovs)))
1203                    msgs.append("\n%s has unique rprovides:\n  %s" % (provfn, "\n  ".join(rprovide_results[provfn] - commonrprovs)))
1204
1205                if self.warn_multi_bb:
1206                    logger.verbnote("".join(msgs))
1207                else:
1208                    logger.error("".join(msgs))
1209
1210        self.init_progress_reporter.next_stage()
1211        self.init_progress_reporter.next_stage()
1212        bb.event.check_for_interrupts(self.cooker.data)
1213
1214        # Iterate over the task list looking for tasks with a 'setscene' function
1215        self.runq_setscene_tids = set()
1216        if not self.cooker.configuration.nosetscene:
1217            for tid in self.runtaskentries:
1218                (mc, fn, taskname, _) = split_tid_mcfn(tid)
1219                setscenetid = tid + "_setscene"
1220                if setscenetid not in taskData[mc].taskentries:
1221                    continue
1222                self.runq_setscene_tids.add(tid)
1223
1224        self.init_progress_reporter.next_stage()
1225        bb.event.check_for_interrupts(self.cooker.data)
1226
1227        # Invalidate task if force mode active
1228        if self.cooker.configuration.force:
1229            for tid in self.target_tids:
1230                invalidate_task(tid, False)
1231
1232        # Invalidate task if invalidate mode active
1233        if self.cooker.configuration.invalidate_stamp:
1234            for tid in self.target_tids:
1235                fn = fn_from_tid(tid)
1236                for st in self.cooker.configuration.invalidate_stamp.split(','):
1237                    if not st.startswith("do_"):
1238                        st = "do_%s" % st
1239                    invalidate_task(fn + ":" + st, True)
1240
1241        self.init_progress_reporter.next_stage()
1242        bb.event.check_for_interrupts(self.cooker.data)
1243
1244        # Create and print to the logs a virtual/xxxx -> PN (fn) table
1245        for mc in taskData:
1246            virtmap = taskData[mc].get_providermap(prefix="virtual/")
1247            virtpnmap = {}
1248            for v in virtmap:
1249                virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]]
1250                bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v]))
1251            if hasattr(bb.parse.siggen, "tasks_resolved"):
1252                bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc])
1253
1254        self.init_progress_reporter.next_stage()
1255        bb.event.check_for_interrupts(self.cooker.data)
1256
1257        bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids)
1258
1259        # Iterate over the task list and call into the siggen code
1260        dealtwith = set()
1261        todeal = set(self.runtaskentries)
1262        while todeal:
1263            for tid in todeal.copy():
1264                if not (self.runtaskentries[tid].depends - dealtwith):
1265                    dealtwith.add(tid)
1266                    todeal.remove(tid)
1267                    self.prepare_task_hash(tid)
1268                bb.event.check_for_interrupts(self.cooker.data)
1269
1270        bb.parse.siggen.writeout_file_checksum_cache()
1271
1272        #self.dump_data()
1273        return len(self.runtaskentries)
1274
1275    def prepare_task_hash(self, tid):
1276        bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
1277        self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
1278        self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
1279
1280    def dump_data(self):
1281        """
1282        Dump some debug information on the internal data structures
1283        """
1284        logger.debug3("run_tasks:")
1285        for tid in self.runtaskentries:
1286            logger.debug3(" %s: %s   Deps %s RevDeps %s", tid,
1287                         self.runtaskentries[tid].weight,
1288                         self.runtaskentries[tid].depends,
1289                         self.runtaskentries[tid].revdeps)
1290
1291class RunQueueWorker():
1292    def __init__(self, process, pipe):
1293        self.process = process
1294        self.pipe = pipe
1295
1296class RunQueue:
1297    def __init__(self, cooker, cfgData, dataCaches, taskData, targets):
1298
1299        self.cooker = cooker
1300        self.cfgData = cfgData
1301        self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets)
1302
1303        self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None
1304        self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None
1305
1306        self.state = runQueuePrepare
1307
1308        # For disk space monitor
1309        # Invoked at regular time intervals via the bitbake heartbeat event
1310        # while the build is running. We generate a unique name for the handler
1311        # here, just in case that there ever is more than one RunQueue instance,
1312        # start the handler when reaching runQueueSceneInit, and stop it when
1313        # done with the build.
1314        self.dm = monitordisk.diskMonitor(cfgData)
1315        self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self))
1316        self.dm_event_handler_registered = False
1317        self.rqexe = None
1318        self.worker = {}
1319        self.fakeworker = {}
1320
1321    def _start_worker(self, mc, fakeroot = False, rqexec = None):
1322        logger.debug("Starting bitbake-worker")
1323        magic = "decafbad"
1324        if self.cooker.configuration.profile:
1325            magic = "decafbadbad"
1326        fakerootlogs = None
1327
1328        workerscript = os.path.realpath(os.path.dirname(__file__) + "/../../bin/bitbake-worker")
1329        if fakeroot:
1330            magic = magic + "beef"
1331            mcdata = self.cooker.databuilder.mcdata[mc]
1332            fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD"))
1333            fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split()
1334            env = os.environ.copy()
1335            for key, value in (var.split('=') for var in fakerootenv):
1336                env[key] = value
1337            worker = subprocess.Popen(fakerootcmd + [sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
1338            fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs
1339        else:
1340            worker = subprocess.Popen([sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1341        bb.utils.nonblockingfd(worker.stdout)
1342        workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs)
1343
1344        workerdata = {
1345            "sigdata" : bb.parse.siggen.get_taskdata(),
1346            "logdefaultlevel" : bb.msg.loggerDefaultLogLevel,
1347            "build_verbose_shell" : self.cooker.configuration.build_verbose_shell,
1348            "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout,
1349            "logdefaultdomain" : bb.msg.loggerDefaultDomains,
1350            "prhost" : self.cooker.prhost,
1351            "buildname" : self.cfgData.getVar("BUILDNAME"),
1352            "date" : self.cfgData.getVar("DATE"),
1353            "time" : self.cfgData.getVar("TIME"),
1354            "hashservaddr" : self.cooker.hashservaddr,
1355            "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
1356        }
1357
1358        worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>")
1359        worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>")
1360        worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>")
1361        worker.stdin.flush()
1362
1363        return RunQueueWorker(worker, workerpipe)
1364
1365    def _teardown_worker(self, worker):
1366        if not worker:
1367            return
1368        logger.debug("Teardown for bitbake-worker")
1369        try:
1370           worker.process.stdin.write(b"<quit></quit>")
1371           worker.process.stdin.flush()
1372           worker.process.stdin.close()
1373        except IOError:
1374           pass
1375        while worker.process.returncode is None:
1376            worker.pipe.read()
1377            worker.process.poll()
1378        while worker.pipe.read():
1379            continue
1380        worker.pipe.close()
1381
1382    def start_worker(self):
1383        if self.worker:
1384            self.teardown_workers()
1385        self.teardown = False
1386        for mc in self.rqdata.dataCaches:
1387            self.worker[mc] = self._start_worker(mc)
1388
1389    def start_fakeworker(self, rqexec, mc):
1390        if not mc in self.fakeworker:
1391            self.fakeworker[mc] = self._start_worker(mc, True, rqexec)
1392
1393    def teardown_workers(self):
1394        self.teardown = True
1395        for mc in self.worker:
1396            self._teardown_worker(self.worker[mc])
1397        self.worker = {}
1398        for mc in self.fakeworker:
1399            self._teardown_worker(self.fakeworker[mc])
1400        self.fakeworker = {}
1401
1402    def read_workers(self):
1403        for mc in self.worker:
1404            self.worker[mc].pipe.read()
1405        for mc in self.fakeworker:
1406            self.fakeworker[mc].pipe.read()
1407
1408    def active_fds(self):
1409        fds = []
1410        for mc in self.worker:
1411            fds.append(self.worker[mc].pipe.input)
1412        for mc in self.fakeworker:
1413            fds.append(self.fakeworker[mc].pipe.input)
1414        return fds
1415
1416    def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None):
1417        def get_timestamp(f):
1418            try:
1419                if not os.access(f, os.F_OK):
1420                    return None
1421                return os.stat(f)[stat.ST_MTIME]
1422            except:
1423                return None
1424
1425        (mc, fn, tn, taskfn) = split_tid_mcfn(tid)
1426        if taskname is None:
1427            taskname = tn
1428
1429        stampfile = bb.parse.siggen.stampfile_mcfn(taskname, taskfn)
1430
1431        # If the stamp is missing, it's not current
1432        if not os.access(stampfile, os.F_OK):
1433            logger.debug2("Stampfile %s not available", stampfile)
1434            return False
1435        # If it's a 'nostamp' task, it's not current
1436        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1437        if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
1438            logger.debug2("%s.%s is nostamp\n", fn, taskname)
1439            return False
1440
1441        if taskname.endswith("_setscene"):
1442            return True
1443
1444        if cache is None:
1445            cache = {}
1446
1447        iscurrent = True
1448        t1 = get_timestamp(stampfile)
1449        for dep in self.rqdata.runtaskentries[tid].depends:
1450            if iscurrent:
1451                (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep)
1452                stampfile2 = bb.parse.siggen.stampfile_mcfn(taskname2, taskfn2)
1453                stampfile3 = bb.parse.siggen.stampfile_mcfn(taskname2 + "_setscene", taskfn2)
1454                t2 = get_timestamp(stampfile2)
1455                t3 = get_timestamp(stampfile3)
1456                if t3 and not t2:
1457                    continue
1458                if t3 and t3 > t2:
1459                    continue
1460                if fn == fn2:
1461                    if not t2:
1462                        logger.debug2('Stampfile %s does not exist', stampfile2)
1463                        iscurrent = False
1464                        break
1465                    if t1 < t2:
1466                        logger.debug2('Stampfile %s < %s', stampfile, stampfile2)
1467                        iscurrent = False
1468                        break
1469                    if recurse and iscurrent:
1470                        if dep in cache:
1471                            iscurrent = cache[dep]
1472                            if not iscurrent:
1473                                logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1474                        else:
1475                            iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1476                            cache[dep] = iscurrent
1477        if recurse:
1478            cache[tid] = iscurrent
1479        return iscurrent
1480
1481    def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True):
1482        valid = set()
1483        if self.hashvalidate:
1484            sq_data = {}
1485            sq_data['hash'] = {}
1486            sq_data['hashfn'] = {}
1487            sq_data['unihash'] = {}
1488            for tid in tocheck:
1489                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1490                sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash
1491                sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn]
1492                sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash
1493
1494            valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary)
1495
1496        return valid
1497
1498    def validate_hash(self, sq_data, d, siginfo, currentcount, summary):
1499        locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary}
1500
1501        # Metadata has **kwargs so args can be added, sq_data can also gain new fields
1502        call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)"
1503
1504        return bb.utils.better_eval(call, locs)
1505
1506    def _execute_runqueue(self):
1507        """
1508        Run the tasks in a queue prepared by rqdata.prepare()
1509        Upon failure, optionally try to recover the build using any alternate providers
1510        (if the halt on failure configuration option isn't set)
1511        """
1512
1513        retval = True
1514        bb.event.check_for_interrupts(self.cooker.data)
1515
1516        if self.state is runQueuePrepare:
1517            # NOTE: if you add, remove or significantly refactor the stages of this
1518            # process then you should recalculate the weightings here. This is quite
1519            # easy to do - just change the next line temporarily to pass debug=True as
1520            # the last parameter and you'll get a printout of the weightings as well
1521            # as a map to the lines where next_stage() was called. Of course this isn't
1522            # critical, but it helps to keep the progress reporting accurate.
1523            self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data,
1524                                                            "Initialising tasks",
1525                                                            [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244])
1526            if self.rqdata.prepare() == 0:
1527                self.state = runQueueComplete
1528            else:
1529                self.state = runQueueSceneInit
1530                bb.parse.siggen.save_unitaskhashes()
1531
1532        if self.state is runQueueSceneInit:
1533            self.rqdata.init_progress_reporter.next_stage()
1534
1535            # we are ready to run,  emit dependency info to any UI or class which
1536            # needs it
1537            depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1538            self.rqdata.init_progress_reporter.next_stage()
1539            bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1540
1541            if not self.dm_event_handler_registered:
1542                 res = bb.event.register(self.dm_event_handler_name,
1543                                         lambda x, y: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False,
1544                                         ('bb.event.HeartbeatEvent',), data=self.cfgData)
1545                 self.dm_event_handler_registered = True
1546
1547            dump = self.cooker.configuration.dump_signatures
1548            if dump:
1549                self.rqdata.init_progress_reporter.finish()
1550                if 'printdiff' in dump:
1551                    invalidtasks = self.print_diffscenetasks()
1552                self.dump_signatures(dump)
1553                if 'printdiff' in dump:
1554                    self.write_diffscenetasks(invalidtasks)
1555                self.state = runQueueComplete
1556
1557        if self.state is runQueueSceneInit:
1558            self.rqdata.init_progress_reporter.next_stage()
1559            self.start_worker()
1560            self.rqdata.init_progress_reporter.next_stage()
1561            self.rqexe = RunQueueExecute(self)
1562
1563            # If we don't have any setscene functions, skip execution
1564            if not self.rqdata.runq_setscene_tids:
1565                logger.info('No setscene tasks')
1566                for tid in self.rqdata.runtaskentries:
1567                    if not self.rqdata.runtaskentries[tid].depends:
1568                        self.rqexe.setbuildable(tid)
1569                    self.rqexe.tasks_notcovered.add(tid)
1570                self.rqexe.sqdone = True
1571            logger.info('Executing Tasks')
1572            self.state = runQueueRunning
1573
1574        if self.state is runQueueRunning:
1575            retval = self.rqexe.execute()
1576
1577        if self.state is runQueueCleanUp:
1578            retval = self.rqexe.finish()
1579
1580        build_done = self.state is runQueueComplete or self.state is runQueueFailed
1581
1582        if build_done and self.dm_event_handler_registered:
1583            bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData)
1584            self.dm_event_handler_registered = False
1585
1586        if build_done and self.rqexe:
1587            bb.parse.siggen.save_unitaskhashes()
1588            self.teardown_workers()
1589            if self.rqexe:
1590                if self.rqexe.stats.failed:
1591                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1592                else:
1593                    # Let's avoid the word "failed" if nothing actually did
1594                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1595
1596        if self.state is runQueueFailed:
1597            raise bb.runqueue.TaskFailure(self.rqexe.failed_tids)
1598
1599        if self.state is runQueueComplete:
1600            # All done
1601            return False
1602
1603        # Loop
1604        return retval
1605
1606    def execute_runqueue(self):
1607        # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1608        try:
1609            return self._execute_runqueue()
1610        except bb.runqueue.TaskFailure:
1611            raise
1612        except SystemExit:
1613            raise
1614        except bb.BBHandledException:
1615            try:
1616                self.teardown_workers()
1617            except:
1618                pass
1619            self.state = runQueueComplete
1620            raise
1621        except Exception as err:
1622            logger.exception("An uncaught exception occurred in runqueue")
1623            try:
1624                self.teardown_workers()
1625            except:
1626                pass
1627            self.state = runQueueComplete
1628            raise
1629
1630    def finish_runqueue(self, now = False):
1631        if not self.rqexe:
1632            self.state = runQueueComplete
1633            return
1634
1635        if now:
1636            self.rqexe.finish_now()
1637        else:
1638            self.rqexe.finish()
1639
1640    def _rq_dump_sigtid(self, tids):
1641        for tid in tids:
1642            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1643            dataCaches = self.rqdata.dataCaches
1644            bb.parse.siggen.dump_sigtask(taskfn, taskname, dataCaches[mc].stamp[taskfn], True)
1645
1646    def dump_signatures(self, options):
1647        if bb.cooker.CookerFeatures.RECIPE_SIGGEN_INFO not in self.cooker.featureset:
1648            bb.fatal("The dump signatures functionality needs the RECIPE_SIGGEN_INFO feature enabled")
1649
1650        bb.note("Writing task signature files")
1651
1652        max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1)
1653        def chunkify(l, n):
1654            return [l[i::n] for i in range(n)]
1655        tids = chunkify(list(self.rqdata.runtaskentries), max_process)
1656        # We cannot use the real multiprocessing.Pool easily due to some local data
1657        # that can't be pickled. This is a cheap multi-process solution.
1658        launched = []
1659        while tids:
1660            if len(launched) < max_process:
1661                p = Process(target=self._rq_dump_sigtid, args=(tids.pop(), ))
1662                p.start()
1663                launched.append(p)
1664            for q in launched:
1665                # The finished processes are joined when calling is_alive()
1666                if not q.is_alive():
1667                    launched.remove(q)
1668        for p in launched:
1669                p.join()
1670
1671        bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options)
1672
1673        return
1674
1675    def print_diffscenetasks(self):
1676
1677        noexec = []
1678        tocheck = set()
1679
1680        for tid in self.rqdata.runtaskentries:
1681            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1682            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1683
1684            if 'noexec' in taskdep and taskname in taskdep['noexec']:
1685                noexec.append(tid)
1686                continue
1687
1688            tocheck.add(tid)
1689
1690        valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False)
1691
1692        # Tasks which are both setscene and noexec never care about dependencies
1693        # We therefore find tasks which are setscene and noexec and mark their
1694        # unique dependencies as valid.
1695        for tid in noexec:
1696            if tid not in self.rqdata.runq_setscene_tids:
1697                continue
1698            for dep in self.rqdata.runtaskentries[tid].depends:
1699                hasnoexecparents = True
1700                for dep2 in self.rqdata.runtaskentries[dep].revdeps:
1701                    if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec:
1702                        continue
1703                    hasnoexecparents = False
1704                    break
1705                if hasnoexecparents:
1706                    valid_new.add(dep)
1707
1708        invalidtasks = set()
1709        for tid in self.rqdata.runtaskentries:
1710            if tid not in valid_new and tid not in noexec:
1711                invalidtasks.add(tid)
1712
1713        found = set()
1714        processed = set()
1715        for tid in invalidtasks:
1716            toprocess = set([tid])
1717            while toprocess:
1718                next = set()
1719                for t in toprocess:
1720                    for dep in self.rqdata.runtaskentries[t].depends:
1721                        if dep in invalidtasks:
1722                            found.add(tid)
1723                        if dep not in processed:
1724                            processed.add(dep)
1725                            next.add(dep)
1726                toprocess = next
1727                if tid in found:
1728                    toprocess = set()
1729
1730        tasklist = []
1731        for tid in invalidtasks.difference(found):
1732            tasklist.append(tid)
1733
1734        if tasklist:
1735            bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1736
1737        return invalidtasks.difference(found)
1738
1739    def write_diffscenetasks(self, invalidtasks):
1740
1741        # Define recursion callback
1742        def recursecb(key, hash1, hash2):
1743            hashes = [hash1, hash2]
1744            hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1745
1746            recout = []
1747            if len(hashfiles) == 2:
1748                out2 = bb.siggen.compare_sigfiles(hashfiles[hash1], hashfiles[hash2], recursecb)
1749                recout.extend(list('    ' + l for l in out2))
1750            else:
1751                recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1752
1753            return recout
1754
1755
1756        for tid in invalidtasks:
1757            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1758            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1759            h = self.rqdata.runtaskentries[tid].hash
1760            matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc])
1761            match = None
1762            for m in matches:
1763                if h in m:
1764                    match = m
1765            if match is None:
1766                bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h)
1767            matches = {k : v for k, v in iter(matches.items()) if h not in k}
1768            if matches:
1769                latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1]
1770                prevh = __find_sha256__.search(latestmatch).group(0)
1771                output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1772                bb.plain("\nTask %s:%s couldn't be used from the cache because:\n  We need hash %s, closest matching task was %s\n  " % (pn, taskname, h, prevh) + '\n  '.join(output))
1773
1774
1775class RunQueueExecute:
1776
1777    def __init__(self, rq):
1778        self.rq = rq
1779        self.cooker = rq.cooker
1780        self.cfgData = rq.cfgData
1781        self.rqdata = rq.rqdata
1782
1783        self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1)
1784        self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed"
1785        self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU")
1786        self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO")
1787        self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY")
1788
1789        self.sq_buildable = set()
1790        self.sq_running = set()
1791        self.sq_live = set()
1792
1793        self.updated_taskhash_queue = []
1794        self.pending_migrations = set()
1795
1796        self.runq_buildable = set()
1797        self.runq_running = set()
1798        self.runq_complete = set()
1799        self.runq_tasksrun = set()
1800
1801        self.build_stamps = {}
1802        self.build_stamps2 = []
1803        self.failed_tids = []
1804        self.sq_deferred = {}
1805
1806        self.stampcache = {}
1807
1808        self.holdoff_tasks = set()
1809        self.holdoff_need_update = True
1810        self.sqdone = False
1811
1812        self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids))
1813
1814        for mc in rq.worker:
1815            rq.worker[mc].pipe.setrunqueueexec(self)
1816        for mc in rq.fakeworker:
1817            rq.fakeworker[mc].pipe.setrunqueueexec(self)
1818
1819        if self.number_tasks <= 0:
1820             bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
1821
1822        lower_limit = 1.0
1823        upper_limit = 1000000.0
1824        if self.max_cpu_pressure:
1825            self.max_cpu_pressure = float(self.max_cpu_pressure)
1826            if self.max_cpu_pressure < lower_limit:
1827                bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit))
1828            if self.max_cpu_pressure > upper_limit:
1829                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure))
1830
1831        if self.max_io_pressure:
1832            self.max_io_pressure = float(self.max_io_pressure)
1833            if self.max_io_pressure < lower_limit:
1834                bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit))
1835            if self.max_io_pressure > upper_limit:
1836                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
1837
1838        if self.max_memory_pressure:
1839            self.max_memory_pressure = float(self.max_memory_pressure)
1840            if self.max_memory_pressure < lower_limit:
1841                bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit))
1842            if self.max_memory_pressure > upper_limit:
1843                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
1844
1845        # List of setscene tasks which we've covered
1846        self.scenequeue_covered = set()
1847        # List of tasks which are covered (including setscene ones)
1848        self.tasks_covered = set()
1849        self.tasks_scenequeue_done = set()
1850        self.scenequeue_notcovered = set()
1851        self.tasks_notcovered = set()
1852        self.scenequeue_notneeded = set()
1853
1854        # We can't skip specified target tasks which aren't setscene tasks
1855        self.cantskip = set(self.rqdata.target_tids)
1856        self.cantskip.difference_update(self.rqdata.runq_setscene_tids)
1857        self.cantskip.intersection_update(self.rqdata.runtaskentries)
1858
1859        schedulers = self.get_schedulers()
1860        for scheduler in schedulers:
1861            if self.scheduler == scheduler.name:
1862                self.sched = scheduler(self, self.rqdata)
1863                logger.debug("Using runqueue scheduler '%s'", scheduler.name)
1864                break
1865        else:
1866            bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
1867                     (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1868
1869        #if self.rqdata.runq_setscene_tids:
1870        self.sqdata = SQData()
1871        build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self)
1872
1873    def runqueue_process_waitpid(self, task, status, fakerootlog=None):
1874
1875        # self.build_stamps[pid] may not exist when use shared work directory.
1876        if task in self.build_stamps:
1877            self.build_stamps2.remove(self.build_stamps[task])
1878            del self.build_stamps[task]
1879
1880        if task in self.sq_live:
1881            if status != 0:
1882                self.sq_task_fail(task, status)
1883            else:
1884                self.sq_task_complete(task)
1885            self.sq_live.remove(task)
1886            self.stats.updateActiveSetscene(len(self.sq_live))
1887        else:
1888            if status != 0:
1889                self.task_fail(task, status, fakerootlog=fakerootlog)
1890            else:
1891                self.task_complete(task)
1892        return True
1893
1894    def finish_now(self):
1895        for mc in self.rq.worker:
1896            try:
1897                self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>")
1898                self.rq.worker[mc].process.stdin.flush()
1899            except IOError:
1900                # worker must have died?
1901                pass
1902        for mc in self.rq.fakeworker:
1903            try:
1904                self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>")
1905                self.rq.fakeworker[mc].process.stdin.flush()
1906            except IOError:
1907                # worker must have died?
1908                pass
1909
1910        if self.failed_tids:
1911            self.rq.state = runQueueFailed
1912            return
1913
1914        self.rq.state = runQueueComplete
1915        return
1916
1917    def finish(self):
1918        self.rq.state = runQueueCleanUp
1919
1920        active = self.stats.active + len(self.sq_live)
1921        if active > 0:
1922            bb.event.fire(runQueueExitWait(active), self.cfgData)
1923            self.rq.read_workers()
1924            return self.rq.active_fds()
1925
1926        if self.failed_tids:
1927            self.rq.state = runQueueFailed
1928            return True
1929
1930        self.rq.state = runQueueComplete
1931        return True
1932
1933    # Used by setscene only
1934    def check_dependencies(self, task, taskdeps):
1935        if not self.rq.depvalidate:
1936            return False
1937
1938        # Must not edit parent data
1939        taskdeps = set(taskdeps)
1940
1941        taskdata = {}
1942        taskdeps.add(task)
1943        for dep in taskdeps:
1944            (mc, fn, taskname, taskfn) = split_tid_mcfn(dep)
1945            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1946            taskdata[dep] = [pn, taskname, fn]
1947        call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
1948        locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data }
1949        valid = bb.utils.better_eval(call, locs)
1950        return valid
1951
1952    def can_start_task(self):
1953        active = self.stats.active + len(self.sq_live)
1954        can_start = active < self.number_tasks
1955        return can_start
1956
1957    def get_schedulers(self):
1958        schedulers = set(obj for obj in globals().values()
1959                             if type(obj) is type and
1960                                issubclass(obj, RunQueueScheduler))
1961
1962        user_schedulers = self.cfgData.getVar("BB_SCHEDULERS")
1963        if user_schedulers:
1964            for sched in user_schedulers.split():
1965                if not "." in sched:
1966                    bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
1967                    continue
1968
1969                modname, name = sched.rsplit(".", 1)
1970                try:
1971                    module = __import__(modname, fromlist=(name,))
1972                except ImportError as exc:
1973                    bb.fatal("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
1974                else:
1975                    schedulers.add(getattr(module, name))
1976        return schedulers
1977
1978    def setbuildable(self, task):
1979        self.runq_buildable.add(task)
1980        self.sched.newbuildable(task)
1981
1982    def task_completeoutright(self, task):
1983        """
1984        Mark a task as completed
1985        Look at the reverse dependencies and mark any task with
1986        completed dependencies as buildable
1987        """
1988        self.runq_complete.add(task)
1989        for revdep in self.rqdata.runtaskentries[task].revdeps:
1990            if revdep in self.runq_running:
1991                continue
1992            if revdep in self.runq_buildable:
1993                continue
1994            alldeps = True
1995            for dep in self.rqdata.runtaskentries[revdep].depends:
1996                if dep not in self.runq_complete:
1997                    alldeps = False
1998                    break
1999            if alldeps:
2000                self.setbuildable(revdep)
2001                logger.debug("Marking task %s as buildable", revdep)
2002
2003        found = None
2004        for t in sorted(self.sq_deferred.copy()):
2005            if self.sq_deferred[t] == task:
2006                # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task.
2007                # We shouldn't allow all to run at once as it is prone to races.
2008                if not found:
2009                    bb.debug(1, "Deferred task %s now buildable" % t)
2010                    del self.sq_deferred[t]
2011                    update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
2012                    found = t
2013                else:
2014                    bb.debug(1, "Deferring %s after %s" % (t, found))
2015                    self.sq_deferred[t] = found
2016
2017    def task_complete(self, task):
2018        self.stats.taskCompleted()
2019        bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
2020        self.task_completeoutright(task)
2021        self.runq_tasksrun.add(task)
2022
2023    def task_fail(self, task, exitcode, fakerootlog=None):
2024        """
2025        Called when a task has failed
2026        Updates the state engine with the failure
2027        """
2028        self.stats.taskFailed()
2029        self.failed_tids.append(task)
2030
2031        fakeroot_log = []
2032        if fakerootlog and os.path.exists(fakerootlog):
2033            with open(fakerootlog) as fakeroot_log_file:
2034                fakeroot_failed = False
2035                for line in reversed(fakeroot_log_file.readlines()):
2036                    for fakeroot_error in ['mismatch', 'error', 'fatal']:
2037                        if fakeroot_error in line.lower():
2038                            fakeroot_failed = True
2039                    if 'doing new pid setup and server start' in line:
2040                        break
2041                    fakeroot_log.append(line)
2042
2043            if not fakeroot_failed:
2044                fakeroot_log = []
2045
2046        bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData)
2047
2048        if self.rqdata.taskData[''].halt:
2049            self.rq.state = runQueueCleanUp
2050
2051    def task_skip(self, task, reason):
2052        self.runq_running.add(task)
2053        self.setbuildable(task)
2054        bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
2055        self.task_completeoutright(task)
2056        self.stats.taskSkipped()
2057        self.stats.taskCompleted()
2058
2059    def summarise_scenequeue_errors(self):
2060        err = False
2061        if not self.sqdone:
2062            logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
2063            completeevent = sceneQueueComplete(self.stats, self.rq)
2064            bb.event.fire(completeevent, self.cfgData)
2065        if self.sq_deferred:
2066            logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred))
2067            err = True
2068        if self.updated_taskhash_queue:
2069            logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue))
2070            err = True
2071        if self.holdoff_tasks:
2072            logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks))
2073            err = True
2074
2075        for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered):
2076            # No task should end up in both covered and uncovered, that is a bug.
2077            logger.error("Setscene task %s in both covered and notcovered." % tid)
2078
2079        for tid in self.rqdata.runq_setscene_tids:
2080            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2081                err = True
2082                logger.error("Setscene Task %s was never marked as covered or not covered" % tid)
2083            if tid not in self.sq_buildable:
2084                err = True
2085                logger.error("Setscene Task %s was never marked as buildable" % tid)
2086            if tid not in self.sq_running:
2087                err = True
2088                logger.error("Setscene Task %s was never marked as running" % tid)
2089
2090        for x in self.rqdata.runtaskentries:
2091            if x not in self.tasks_covered and x not in self.tasks_notcovered:
2092                logger.error("Task %s was never moved from the setscene queue" % x)
2093                err = True
2094            if x not in self.tasks_scenequeue_done:
2095                logger.error("Task %s was never processed by the setscene code" % x)
2096                err = True
2097            if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable:
2098                logger.error("Task %s was never marked as buildable by the setscene code" % x)
2099                err = True
2100        return err
2101
2102
2103    def execute(self):
2104        """
2105        Run the tasks in a queue prepared by prepare_runqueue
2106        """
2107
2108        self.rq.read_workers()
2109        if self.updated_taskhash_queue or self.pending_migrations:
2110            self.process_possible_migrations()
2111
2112        if not hasattr(self, "sorted_setscene_tids"):
2113            # Don't want to sort this set every execution
2114            self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids)
2115
2116        task = None
2117        if not self.sqdone and self.can_start_task():
2118            # Find the next setscene to run
2119            for nexttask in self.sorted_setscene_tids:
2120                if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values():
2121                    if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]):
2122                        if nexttask not in self.rqdata.target_tids:
2123                            logger.debug2("Skipping setscene for task %s" % nexttask)
2124                            self.sq_task_skip(nexttask)
2125                            self.scenequeue_notneeded.add(nexttask)
2126                            if nexttask in self.sq_deferred:
2127                                del self.sq_deferred[nexttask]
2128                            return True
2129                    # If covered tasks are running, need to wait for them to complete
2130                    for t in self.sqdata.sq_covered_tasks[nexttask]:
2131                        if t in self.runq_running and t not in self.runq_complete:
2132                            continue
2133                    if nexttask in self.sq_deferred:
2134                        if self.sq_deferred[nexttask] not in self.runq_complete:
2135                            continue
2136                        logger.debug("Task %s no longer deferred" % nexttask)
2137                        del self.sq_deferred[nexttask]
2138                        valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False)
2139                        if not valid:
2140                            logger.debug("%s didn't become valid, skipping setscene" % nexttask)
2141                            self.sq_task_failoutright(nexttask)
2142                            return True
2143                    if nexttask in self.sqdata.outrightfail:
2144                        logger.debug2('No package found, so skipping setscene task %s', nexttask)
2145                        self.sq_task_failoutright(nexttask)
2146                        return True
2147                    if nexttask in self.sqdata.unskippable:
2148                        logger.debug2("Setscene task %s is unskippable" % nexttask)
2149                    task = nexttask
2150                    break
2151        if task is not None:
2152            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2153            taskname = taskname + "_setscene"
2154            if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache):
2155                logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task)
2156                self.sq_task_failoutright(task)
2157                return True
2158
2159            if self.cooker.configuration.force:
2160                if task in self.rqdata.target_tids:
2161                    self.sq_task_failoutright(task)
2162                    return True
2163
2164            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2165                logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task)
2166                self.sq_task_skip(task)
2167                return True
2168
2169            if self.cooker.configuration.skipsetscene:
2170                logger.debug2('No setscene tasks should be executed. Skipping %s', task)
2171                self.sq_task_failoutright(task)
2172                return True
2173
2174            startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
2175            bb.event.fire(startevent, self.cfgData)
2176
2177            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2178            realfn = bb.cache.virtualfn2realfn(taskfn)[0]
2179            runtask = {
2180                'fn' : taskfn,
2181                'task' : task,
2182                'taskname' : taskname,
2183                'taskhash' : self.rqdata.get_task_hash(task),
2184                'unihash' : self.rqdata.get_task_unihash(task),
2185                'quieterrors' : True,
2186                'appends' : self.cooker.collections[mc].get_file_appends(taskfn),
2187                'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2],
2188                'taskdepdata' : self.sq_build_taskdepdata(task),
2189                'dry_run' : False,
2190                'taskdep': taskdep,
2191                'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn],
2192                'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn],
2193                'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn]
2194            }
2195
2196            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
2197                if not mc in self.rq.fakeworker:
2198                    self.rq.start_fakeworker(self, mc)
2199                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
2200                self.rq.fakeworker[mc].process.stdin.flush()
2201            else:
2202                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
2203                self.rq.worker[mc].process.stdin.flush()
2204
2205            self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2206            self.build_stamps2.append(self.build_stamps[task])
2207            self.sq_running.add(task)
2208            self.sq_live.add(task)
2209            self.stats.updateActiveSetscene(len(self.sq_live))
2210            if self.can_start_task():
2211                return True
2212
2213        self.update_holdofftasks()
2214
2215        if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks:
2216            hashequiv_logger.verbose("Setscene tasks completed")
2217
2218            err = self.summarise_scenequeue_errors()
2219            if err:
2220                self.rq.state = runQueueFailed
2221                return True
2222
2223            if self.cooker.configuration.setsceneonly:
2224                self.rq.state = runQueueComplete
2225                return True
2226            self.sqdone = True
2227
2228            if self.stats.total == 0:
2229                # nothing to do
2230                self.rq.state = runQueueComplete
2231                return True
2232
2233        if self.cooker.configuration.setsceneonly:
2234            task = None
2235        else:
2236            task = self.sched.next()
2237        if task is not None:
2238            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2239
2240            if self.rqdata.setscene_ignore_tasks is not None:
2241                if self.check_setscene_ignore_tasks(task):
2242                    self.task_fail(task, "setscene ignore_tasks")
2243                    return True
2244
2245            if task in self.tasks_covered:
2246                logger.debug2("Setscene covered task %s", task)
2247                self.task_skip(task, "covered")
2248                return True
2249
2250            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2251                logger.debug2("Stamp current task %s", task)
2252
2253                self.task_skip(task, "existing")
2254                self.runq_tasksrun.add(task)
2255                return True
2256
2257            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2258            if 'noexec' in taskdep and taskname in taskdep['noexec']:
2259                startevent = runQueueTaskStarted(task, self.stats, self.rq,
2260                                                 noexec=True)
2261                bb.event.fire(startevent, self.cfgData)
2262                self.runq_running.add(task)
2263                self.stats.taskActive()
2264                if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2265                    bb.build.make_stamp_mcfn(taskname, taskfn)
2266                self.task_complete(task)
2267                return True
2268            else:
2269                startevent = runQueueTaskStarted(task, self.stats, self.rq)
2270                bb.event.fire(startevent, self.cfgData)
2271
2272            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2273            realfn = bb.cache.virtualfn2realfn(taskfn)[0]
2274            runtask = {
2275                'fn' : taskfn,
2276                'task' : task,
2277                'taskname' : taskname,
2278                'taskhash' : self.rqdata.get_task_hash(task),
2279                'unihash' : self.rqdata.get_task_unihash(task),
2280                'quieterrors' : False,
2281                'appends' : self.cooker.collections[mc].get_file_appends(taskfn),
2282                'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2],
2283                'taskdepdata' : self.build_taskdepdata(task),
2284                'dry_run' : self.rqdata.setscene_enforce,
2285                'taskdep': taskdep,
2286                'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn],
2287                'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn],
2288                'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn]
2289            }
2290
2291            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2292                if not mc in self.rq.fakeworker:
2293                    try:
2294                        self.rq.start_fakeworker(self, mc)
2295                    except OSError as exc:
2296                        logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc)))
2297                        self.rq.state = runQueueFailed
2298                        self.stats.taskFailed()
2299                        return True
2300                self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
2301                self.rq.fakeworker[mc].process.stdin.flush()
2302            else:
2303                self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>")
2304                self.rq.worker[mc].process.stdin.flush()
2305
2306            self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2307            self.build_stamps2.append(self.build_stamps[task])
2308            self.runq_running.add(task)
2309            self.stats.taskActive()
2310            if self.can_start_task():
2311                return True
2312
2313        if self.stats.active > 0 or self.sq_live:
2314            self.rq.read_workers()
2315            return self.rq.active_fds()
2316
2317        # No more tasks can be run. If we have deferred setscene tasks we should run them.
2318        if self.sq_deferred:
2319            deferred_tid = list(self.sq_deferred.keys())[0]
2320            blocking_tid = self.sq_deferred.pop(deferred_tid)
2321            logger.warning("Runqueue deadlocked on deferred tasks, forcing task %s blocked by %s" % (deferred_tid, blocking_tid))
2322            return True
2323
2324        if self.failed_tids:
2325            self.rq.state = runQueueFailed
2326            return True
2327
2328        # Sanity Checks
2329        err = self.summarise_scenequeue_errors()
2330        for task in self.rqdata.runtaskentries:
2331            if task not in self.runq_buildable:
2332                logger.error("Task %s never buildable!", task)
2333                err = True
2334            elif task not in self.runq_running:
2335                logger.error("Task %s never ran!", task)
2336                err = True
2337            elif task not in self.runq_complete:
2338                logger.error("Task %s never completed!", task)
2339                err = True
2340
2341        if err:
2342            self.rq.state = runQueueFailed
2343        else:
2344            self.rq.state = runQueueComplete
2345
2346        return True
2347
2348    def filtermcdeps(self, task, mc, deps):
2349        ret = set()
2350        for dep in deps:
2351            thismc = mc_from_tid(dep)
2352            if thismc != mc:
2353                continue
2354            ret.add(dep)
2355        return ret
2356
2357    # We filter out multiconfig dependencies from taskdepdata we pass to the tasks
2358    # as most code can't handle them
2359    def build_taskdepdata(self, task):
2360        taskdepdata = {}
2361        mc = mc_from_tid(task)
2362        next = self.rqdata.runtaskentries[task].depends.copy()
2363        next.add(task)
2364        next = self.filtermcdeps(task, mc, next)
2365        while next:
2366            additional = []
2367            for revdep in next:
2368                (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2369                pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2370                deps = self.rqdata.runtaskentries[revdep].depends
2371                provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
2372                taskhash = self.rqdata.runtaskentries[revdep].hash
2373                unihash = self.rqdata.runtaskentries[revdep].unihash
2374                deps = self.filtermcdeps(task, mc, deps)
2375                hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn]
2376                taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn]
2377                for revdep2 in deps:
2378                    if revdep2 not in taskdepdata:
2379                        additional.append(revdep2)
2380            next = additional
2381
2382        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2383        return taskdepdata
2384
2385    def update_holdofftasks(self):
2386
2387        if not self.holdoff_need_update:
2388            return
2389
2390        notcovered = set(self.scenequeue_notcovered)
2391        notcovered |= self.cantskip
2392        for tid in self.scenequeue_notcovered:
2393            notcovered |= self.sqdata.sq_covered_tasks[tid]
2394        notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids)
2395        notcovered.intersection_update(self.tasks_scenequeue_done)
2396
2397        covered = set(self.scenequeue_covered)
2398        for tid in self.scenequeue_covered:
2399            covered |= self.sqdata.sq_covered_tasks[tid]
2400        covered.difference_update(notcovered)
2401        covered.intersection_update(self.tasks_scenequeue_done)
2402
2403        for tid in notcovered | covered:
2404            if not self.rqdata.runtaskentries[tid].depends:
2405                self.setbuildable(tid)
2406            elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
2407                 self.setbuildable(tid)
2408
2409        self.tasks_covered = covered
2410        self.tasks_notcovered = notcovered
2411
2412        self.holdoff_tasks = set()
2413
2414        for tid in self.rqdata.runq_setscene_tids:
2415            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2416                self.holdoff_tasks.add(tid)
2417
2418        for tid in self.holdoff_tasks.copy():
2419            for dep in self.sqdata.sq_covered_tasks[tid]:
2420                if dep not in self.runq_complete:
2421                    self.holdoff_tasks.add(dep)
2422
2423        self.holdoff_need_update = False
2424
2425    def process_possible_migrations(self):
2426
2427        changed = set()
2428        toprocess = set()
2429        for tid, unihash in self.updated_taskhash_queue.copy():
2430            if tid in self.runq_running and tid not in self.runq_complete:
2431                continue
2432
2433            self.updated_taskhash_queue.remove((tid, unihash))
2434
2435            if unihash != self.rqdata.runtaskentries[tid].unihash:
2436                # Make sure we rehash any other tasks with the same task hash that we're deferred against.
2437                torehash = [tid]
2438                for deftid in self.sq_deferred:
2439                    if self.sq_deferred[deftid] == tid:
2440                        torehash.append(deftid)
2441                for hashtid in torehash:
2442                    hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash))
2443                    self.rqdata.runtaskentries[hashtid].unihash = unihash
2444                    bb.parse.siggen.set_unihash(hashtid, unihash)
2445                    toprocess.add(hashtid)
2446                if torehash:
2447                    # Need to save after set_unihash above
2448                    bb.parse.siggen.save_unitaskhashes()
2449
2450        # Work out all tasks which depend upon these
2451        total = set()
2452        next = set()
2453        for p in toprocess:
2454            next |= self.rqdata.runtaskentries[p].revdeps
2455        while next:
2456            current = next.copy()
2457            total = total | next
2458            next = set()
2459            for ntid in current:
2460                next |= self.rqdata.runtaskentries[ntid].revdeps
2461            next.difference_update(total)
2462
2463        # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
2464        next = set()
2465        for p in total:
2466            if not self.rqdata.runtaskentries[p].depends:
2467                next.add(p)
2468            elif self.rqdata.runtaskentries[p].depends.isdisjoint(total):
2469                next.add(p)
2470
2471        # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled
2472        while next:
2473            current = next.copy()
2474            next = set()
2475            for tid in current:
2476                if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
2477                    continue
2478                orighash = self.rqdata.runtaskentries[tid].hash
2479                newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches)
2480                origuni = self.rqdata.runtaskentries[tid].unihash
2481                newuni = bb.parse.siggen.get_unihash(tid)
2482                # FIXME, need to check it can come from sstate at all for determinism?
2483                remapped = False
2484                if newuni == origuni:
2485                    # Nothing to do, we match, skip code below
2486                    remapped = True
2487                elif tid in self.scenequeue_covered or tid in self.sq_live:
2488                    # Already ran this setscene task or it running. Report the new taskhash
2489                    bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches)
2490                    hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid))
2491                    remapped = True
2492
2493                if not remapped:
2494                    #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni))
2495                    self.rqdata.runtaskentries[tid].hash = newhash
2496                    self.rqdata.runtaskentries[tid].unihash = newuni
2497                    changed.add(tid)
2498
2499                next |= self.rqdata.runtaskentries[tid].revdeps
2500                total.remove(tid)
2501                next.intersection_update(total)
2502
2503        if changed:
2504            for mc in self.rq.worker:
2505                self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
2506            for mc in self.rq.fakeworker:
2507                self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>")
2508
2509            hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))
2510
2511        for tid in changed:
2512            if tid not in self.rqdata.runq_setscene_tids:
2513                continue
2514            if tid not in self.pending_migrations:
2515                self.pending_migrations.add(tid)
2516
2517        update_tasks = []
2518        for tid in self.pending_migrations.copy():
2519            if tid in self.runq_running or tid in self.sq_live:
2520                # Too late, task already running, not much we can do now
2521                self.pending_migrations.remove(tid)
2522                continue
2523
2524            valid = True
2525            # Check no tasks this covers are running
2526            for dep in self.sqdata.sq_covered_tasks[tid]:
2527                if dep in self.runq_running and dep not in self.runq_complete:
2528                    hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid))
2529                    valid = False
2530                    break
2531            if not valid:
2532                continue
2533
2534            self.pending_migrations.remove(tid)
2535            changed = True
2536
2537            if tid in self.tasks_scenequeue_done:
2538                self.tasks_scenequeue_done.remove(tid)
2539            for dep in self.sqdata.sq_covered_tasks[tid]:
2540                if dep in self.runq_complete and dep not in self.runq_tasksrun:
2541                    bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep)
2542                    self.failed_tids.append(tid)
2543                    self.rq.state = runQueueCleanUp
2544                    return
2545
2546                if dep not in self.runq_complete:
2547                    if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable:
2548                        self.tasks_scenequeue_done.remove(dep)
2549
2550            if tid in self.sq_buildable:
2551                self.sq_buildable.remove(tid)
2552            if tid in self.sq_running:
2553                self.sq_running.remove(tid)
2554            if tid in self.sqdata.outrightfail:
2555                self.sqdata.outrightfail.remove(tid)
2556            if tid in self.scenequeue_notcovered:
2557                self.scenequeue_notcovered.remove(tid)
2558            if tid in self.scenequeue_covered:
2559                self.scenequeue_covered.remove(tid)
2560            if tid in self.scenequeue_notneeded:
2561                self.scenequeue_notneeded.remove(tid)
2562
2563            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2564            self.sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2565
2566            if tid in self.stampcache:
2567                del self.stampcache[tid]
2568
2569            if tid in self.build_stamps:
2570                del self.build_stamps[tid]
2571
2572            update_tasks.append(tid)
2573
2574        update_tasks2 = []
2575        for tid in update_tasks:
2576            harddepfail = False
2577            for t in self.sqdata.sq_harddeps:
2578                if tid in self.sqdata.sq_harddeps[t] and t in self.scenequeue_notcovered:
2579                    harddepfail = True
2580                    break
2581            if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2582                if tid not in self.sq_buildable:
2583                    self.sq_buildable.add(tid)
2584            if not self.sqdata.sq_revdeps[tid]:
2585                self.sq_buildable.add(tid)
2586
2587            update_tasks2.append((tid, harddepfail, tid in self.sqdata.valid))
2588
2589        if update_tasks2:
2590            self.sqdone = False
2591            for mc in sorted(self.sqdata.multiconfigs):
2592                for tid in sorted([t[0] for t in update_tasks2]):
2593                    if mc_from_tid(tid) != mc:
2594                        continue
2595                    h = pending_hash_index(tid, self.rqdata)
2596                    if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]:
2597                        self.sq_deferred[tid] = self.sqdata.hashes[h]
2598                        bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h]))
2599            update_scenequeue_data([t[0] for t in update_tasks2], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
2600
2601        for (tid, harddepfail, origvalid) in update_tasks2:
2602            if tid in self.sqdata.valid and not origvalid:
2603                hashequiv_logger.verbose("Setscene task %s became valid" % tid)
2604            if harddepfail:
2605                logger.debug2("%s has an unavailable hard dependency so skipping" % (tid))
2606                self.sq_task_failoutright(tid)
2607
2608        if changed:
2609            self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
2610            self.holdoff_need_update = True
2611
2612    def scenequeue_updatecounters(self, task, fail=False):
2613
2614        for dep in sorted(self.sqdata.sq_deps[task]):
2615            if fail and task in self.sqdata.sq_harddeps and dep in self.sqdata.sq_harddeps[task]:
2616                if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered:
2617                    # dependency could be already processed, e.g. noexec setscene task
2618                    continue
2619                noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache)
2620                if noexec or stamppresent:
2621                    continue
2622                logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
2623                self.sq_task_failoutright(dep)
2624                continue
2625            if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2626                if dep not in self.sq_buildable:
2627                    self.sq_buildable.add(dep)
2628
2629        next = set([task])
2630        while next:
2631            new = set()
2632            for t in sorted(next):
2633                self.tasks_scenequeue_done.add(t)
2634                # Look down the dependency chain for non-setscene things which this task depends on
2635                # and mark as 'done'
2636                for dep in self.rqdata.runtaskentries[t].depends:
2637                    if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done:
2638                        continue
2639                    if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done):
2640                        new.add(dep)
2641            next = new
2642
2643        self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
2644        self.holdoff_need_update = True
2645
2646    def sq_task_completeoutright(self, task):
2647        """
2648        Mark a task as completed
2649        Look at the reverse dependencies and mark any task with
2650        completed dependencies as buildable
2651        """
2652
2653        logger.debug('Found task %s which could be accelerated', task)
2654        self.scenequeue_covered.add(task)
2655        self.scenequeue_updatecounters(task)
2656
2657    def sq_check_taskfail(self, task):
2658        if self.rqdata.setscene_ignore_tasks is not None:
2659            realtask = task.split('_setscene')[0]
2660            (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask)
2661            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2662            if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
2663                logger.error('Task %s.%s failed' % (pn, taskname + "_setscene"))
2664                self.rq.state = runQueueCleanUp
2665
2666    def sq_task_complete(self, task):
2667        bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
2668        self.sq_task_completeoutright(task)
2669
2670    def sq_task_fail(self, task, result):
2671        bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
2672        self.scenequeue_notcovered.add(task)
2673        self.scenequeue_updatecounters(task, True)
2674        self.sq_check_taskfail(task)
2675
2676    def sq_task_failoutright(self, task):
2677        self.sq_running.add(task)
2678        self.sq_buildable.add(task)
2679        self.scenequeue_notcovered.add(task)
2680        self.scenequeue_updatecounters(task, True)
2681
2682    def sq_task_skip(self, task):
2683        self.sq_running.add(task)
2684        self.sq_buildable.add(task)
2685        self.sq_task_completeoutright(task)
2686
2687    def sq_build_taskdepdata(self, task):
2688        def getsetscenedeps(tid):
2689            deps = set()
2690            (mc, fn, taskname, _) = split_tid_mcfn(tid)
2691            realtid = tid + "_setscene"
2692            idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends
2693            for (depname, idependtask) in idepends:
2694                if depname not in self.rqdata.taskData[mc].build_targets:
2695                    continue
2696
2697                depfn = self.rqdata.taskData[mc].build_targets[depname][0]
2698                if depfn is None:
2699                     continue
2700                deptid = depfn + ":" + idependtask.replace("_setscene", "")
2701                deps.add(deptid)
2702            return deps
2703
2704        taskdepdata = {}
2705        next = getsetscenedeps(task)
2706        next.add(task)
2707        while next:
2708            additional = []
2709            for revdep in next:
2710                (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2711                pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2712                deps = getsetscenedeps(revdep)
2713                provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
2714                taskhash = self.rqdata.runtaskentries[revdep].hash
2715                unihash = self.rqdata.runtaskentries[revdep].unihash
2716                hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn]
2717                taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn]
2718                for revdep2 in deps:
2719                    if revdep2 not in taskdepdata:
2720                        additional.append(revdep2)
2721            next = additional
2722
2723        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2724        return taskdepdata
2725
2726    def check_setscene_ignore_tasks(self, tid):
2727        # Check task that is going to run against the ignore tasks list
2728        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2729        # Ignore covered tasks
2730        if tid in self.tasks_covered:
2731            return False
2732        # Ignore stamped tasks
2733        if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache):
2734            return False
2735        # Ignore noexec tasks
2736        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2737        if 'noexec' in taskdep and taskname in taskdep['noexec']:
2738            return False
2739
2740        pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2741        if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
2742            if tid in self.rqdata.runq_setscene_tids:
2743                msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)]
2744            else:
2745                msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)]
2746            for t in self.scenequeue_notcovered:
2747                msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash))
2748            msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered))
2749            logger.error("".join(msg))
2750            return True
2751        return False
2752
2753class SQData(object):
2754    def __init__(self):
2755        # SceneQueue dependencies
2756        self.sq_deps = {}
2757        # SceneQueue reverse dependencies
2758        self.sq_revdeps = {}
2759        # Injected inter-setscene task dependencies
2760        self.sq_harddeps = {}
2761        # Cache of stamp files so duplicates can't run in parallel
2762        self.stamps = {}
2763        # Setscene tasks directly depended upon by the build
2764        self.unskippable = set()
2765        # List of setscene tasks which aren't present
2766        self.outrightfail = set()
2767        # A list of normal tasks a setscene task covers
2768        self.sq_covered_tasks = {}
2769
2770def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq):
2771
2772    sq_revdeps = {}
2773    sq_revdeps_squash = {}
2774    sq_collated_deps = {}
2775
2776    # We need to construct a dependency graph for the setscene functions. Intermediate
2777    # dependencies between the setscene tasks only complicate the code. This code
2778    # therefore aims to collapse the huge runqueue dependency tree into a smaller one
2779    # only containing the setscene functions.
2780
2781    rqdata.init_progress_reporter.next_stage()
2782
2783    # First process the chains up to the first setscene task.
2784    endpoints = {}
2785    for tid in rqdata.runtaskentries:
2786        sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps)
2787        sq_revdeps_squash[tid] = set()
2788        if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids:
2789            #bb.warn("Added endpoint %s" % (tid))
2790            endpoints[tid] = set()
2791
2792    rqdata.init_progress_reporter.next_stage()
2793
2794    # Secondly process the chains between setscene tasks.
2795    for tid in rqdata.runq_setscene_tids:
2796        sq_collated_deps[tid] = set()
2797        #bb.warn("Added endpoint 2 %s" % (tid))
2798        for dep in rqdata.runtaskentries[tid].depends:
2799                if tid in sq_revdeps[dep]:
2800                    sq_revdeps[dep].remove(tid)
2801                if dep not in endpoints:
2802                    endpoints[dep] = set()
2803                #bb.warn("  Added endpoint 3 %s" % (dep))
2804                endpoints[dep].add(tid)
2805
2806    rqdata.init_progress_reporter.next_stage()
2807
2808    def process_endpoints(endpoints):
2809        newendpoints = {}
2810        for point, task in endpoints.items():
2811            tasks = set()
2812            if task:
2813                tasks |= task
2814            if sq_revdeps_squash[point]:
2815                tasks |= sq_revdeps_squash[point]
2816            if point not in rqdata.runq_setscene_tids:
2817                for t in tasks:
2818                    sq_collated_deps[t].add(point)
2819            sq_revdeps_squash[point] = set()
2820            if point in rqdata.runq_setscene_tids:
2821                sq_revdeps_squash[point] = tasks
2822                continue
2823            for dep in rqdata.runtaskentries[point].depends:
2824                if point in sq_revdeps[dep]:
2825                    sq_revdeps[dep].remove(point)
2826                if tasks:
2827                    sq_revdeps_squash[dep] |= tasks
2828                if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids:
2829                    newendpoints[dep] = task
2830        if newendpoints:
2831            process_endpoints(newendpoints)
2832
2833    process_endpoints(endpoints)
2834
2835    rqdata.init_progress_reporter.next_stage()
2836
2837    # Build a list of tasks which are "unskippable"
2838    # These are direct endpoints referenced by the build upto and including setscene tasks
2839    # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon
2840    new = True
2841    for tid in rqdata.runtaskentries:
2842        if not rqdata.runtaskentries[tid].revdeps:
2843            sqdata.unskippable.add(tid)
2844    sqdata.unskippable |= sqrq.cantskip
2845    while new:
2846        new = False
2847        orig = sqdata.unskippable.copy()
2848        for tid in sorted(orig, reverse=True):
2849            if tid in rqdata.runq_setscene_tids:
2850                continue
2851            if not rqdata.runtaskentries[tid].depends:
2852                # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
2853                sqrq.setbuildable(tid)
2854            sqdata.unskippable |= rqdata.runtaskentries[tid].depends
2855            if sqdata.unskippable != orig:
2856                new = True
2857
2858    sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids)
2859
2860    rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries))
2861
2862    # Sanity check all dependencies could be changed to setscene task references
2863    for taskcounter, tid in enumerate(rqdata.runtaskentries):
2864        if tid in rqdata.runq_setscene_tids:
2865            pass
2866        elif sq_revdeps_squash[tid]:
2867            bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.")
2868        else:
2869            del sq_revdeps_squash[tid]
2870        rqdata.init_progress_reporter.update(taskcounter)
2871
2872    rqdata.init_progress_reporter.next_stage()
2873
2874    # Resolve setscene inter-task dependencies
2875    # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
2876    # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
2877    for tid in rqdata.runq_setscene_tids:
2878        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2879        realtid = tid + "_setscene"
2880        idepends = rqdata.taskData[mc].taskentries[realtid].idepends
2881        sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2882
2883        for (depname, idependtask) in idepends:
2884
2885            if depname not in rqdata.taskData[mc].build_targets:
2886                continue
2887
2888            depfn = rqdata.taskData[mc].build_targets[depname][0]
2889            if depfn is None:
2890                continue
2891            deptid = depfn + ":" + idependtask.replace("_setscene", "")
2892            if deptid not in rqdata.runtaskentries:
2893                bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask))
2894
2895            if not deptid in sqdata.sq_harddeps:
2896                sqdata.sq_harddeps[deptid] = set()
2897            sqdata.sq_harddeps[deptid].add(tid)
2898
2899            sq_revdeps_squash[tid].add(deptid)
2900            # Have to zero this to avoid circular dependencies
2901            sq_revdeps_squash[deptid] = set()
2902
2903    rqdata.init_progress_reporter.next_stage()
2904
2905    for task in sqdata.sq_harddeps:
2906        for dep in sqdata.sq_harddeps[task]:
2907            sq_revdeps_squash[dep].add(task)
2908
2909    rqdata.init_progress_reporter.next_stage()
2910
2911    #for tid in sq_revdeps_squash:
2912    #    data = ""
2913    #    for dep in sq_revdeps_squash[tid]:
2914    #        data = data + "\n   %s" % dep
2915    #    bb.warn("Task %s_setscene: is %s " % (tid, data))
2916
2917    sqdata.sq_revdeps = sq_revdeps_squash
2918    sqdata.sq_covered_tasks = sq_collated_deps
2919
2920    # Build reverse version of revdeps to populate deps structure
2921    for tid in sqdata.sq_revdeps:
2922        sqdata.sq_deps[tid] = set()
2923    for tid in sqdata.sq_revdeps:
2924        for dep in sqdata.sq_revdeps[tid]:
2925            sqdata.sq_deps[dep].add(tid)
2926
2927    rqdata.init_progress_reporter.next_stage()
2928
2929    sqdata.multiconfigs = set()
2930    for tid in sqdata.sq_revdeps:
2931        sqdata.multiconfigs.add(mc_from_tid(tid))
2932        if not sqdata.sq_revdeps[tid]:
2933            sqrq.sq_buildable.add(tid)
2934
2935    rqdata.init_progress_reporter.finish()
2936
2937    sqdata.noexec = set()
2938    sqdata.stamppresent = set()
2939    sqdata.valid = set()
2940
2941    sqdata.hashes = {}
2942    sqrq.sq_deferred = {}
2943    for mc in sorted(sqdata.multiconfigs):
2944        for tid in sorted(sqdata.sq_revdeps):
2945            if mc_from_tid(tid) != mc:
2946                continue
2947            h = pending_hash_index(tid, rqdata)
2948            if h not in sqdata.hashes:
2949                sqdata.hashes[h] = tid
2950            else:
2951                sqrq.sq_deferred[tid] = sqdata.hashes[h]
2952                bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h]))
2953
2954    update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True)
2955
2956    # Compute a list of 'stale' sstate tasks where the current hash does not match the one
2957    # in any stamp files. Pass the list out to metadata as an event.
2958    found = {}
2959    for tid in rqdata.runq_setscene_tids:
2960        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2961        stamps = bb.build.find_stale_stamps(taskname, taskfn)
2962        if stamps:
2963            if mc not in found:
2964                found[mc] = {}
2965            found[mc][tid] = stamps
2966    for mc in found:
2967        event = bb.event.StaleSetSceneTasks(found[mc])
2968        bb.event.fire(event, cooker.databuilder.mcdata[mc])
2969
2970def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False):
2971
2972    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2973
2974    taskdep = rqdata.dataCaches[mc].task_deps[taskfn]
2975
2976    if 'noexec' in taskdep and taskname in taskdep['noexec']:
2977        bb.build.make_stamp_mcfn(taskname + "_setscene", taskfn)
2978        return True, False
2979
2980    if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache):
2981        logger.debug2('Setscene stamp current for task %s', tid)
2982        return False, True
2983
2984    if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache):
2985        logger.debug2('Normal stamp current for task %s', tid)
2986        return False, True
2987
2988    return False, False
2989
2990def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True):
2991
2992    tocheck = set()
2993
2994    for tid in sorted(tids):
2995        if tid in sqdata.stamppresent:
2996            sqdata.stamppresent.remove(tid)
2997        if tid in sqdata.valid:
2998            sqdata.valid.remove(tid)
2999        if tid in sqdata.outrightfail:
3000            sqdata.outrightfail.remove(tid)
3001
3002        noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True)
3003
3004        if noexec:
3005            sqdata.noexec.add(tid)
3006            sqrq.sq_task_skip(tid)
3007            logger.debug2("%s is noexec so skipping setscene" % (tid))
3008            continue
3009
3010        if stamppresent:
3011            sqdata.stamppresent.add(tid)
3012            sqrq.sq_task_skip(tid)
3013            logger.debug2("%s has a valid stamp, skipping" % (tid))
3014            continue
3015
3016        tocheck.add(tid)
3017
3018    sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary)
3019
3020    for tid in tids:
3021        if tid in sqdata.stamppresent:
3022            continue
3023        if tid in sqdata.valid:
3024            continue
3025        if tid in sqdata.noexec:
3026            continue
3027        if tid in sqrq.scenequeue_covered:
3028            continue
3029        if tid in sqrq.scenequeue_notcovered:
3030            continue
3031        if tid in sqrq.sq_deferred:
3032            continue
3033        sqdata.outrightfail.add(tid)
3034        logger.debug2("%s already handled (fallthrough), skipping" % (tid))
3035
3036class TaskFailure(Exception):
3037    """
3038    Exception raised when a task in a runqueue fails
3039    """
3040    def __init__(self, x):
3041        self.args = x
3042
3043
3044class runQueueExitWait(bb.event.Event):
3045    """
3046    Event when waiting for task processes to exit
3047    """
3048
3049    def __init__(self, remain):
3050        self.remain = remain
3051        self.message = "Waiting for %s active tasks to finish" % remain
3052        bb.event.Event.__init__(self)
3053
3054class runQueueEvent(bb.event.Event):
3055    """
3056    Base runQueue event class
3057    """
3058    def __init__(self, task, stats, rq):
3059        self.taskid = task
3060        self.taskstring = task
3061        self.taskname = taskname_from_tid(task)
3062        self.taskfile = fn_from_tid(task)
3063        self.taskhash = rq.rqdata.get_task_hash(task)
3064        self.stats = stats.copy()
3065        bb.event.Event.__init__(self)
3066
3067class sceneQueueEvent(runQueueEvent):
3068    """
3069    Base sceneQueue event class
3070    """
3071    def __init__(self, task, stats, rq, noexec=False):
3072        runQueueEvent.__init__(self, task, stats, rq)
3073        self.taskstring = task + "_setscene"
3074        self.taskname = taskname_from_tid(task) + "_setscene"
3075        self.taskfile = fn_from_tid(task)
3076        self.taskhash = rq.rqdata.get_task_hash(task)
3077
3078class runQueueTaskStarted(runQueueEvent):
3079    """
3080    Event notifying a task was started
3081    """
3082    def __init__(self, task, stats, rq, noexec=False):
3083        runQueueEvent.__init__(self, task, stats, rq)
3084        self.noexec = noexec
3085
3086class sceneQueueTaskStarted(sceneQueueEvent):
3087    """
3088    Event notifying a setscene task was started
3089    """
3090    def __init__(self, task, stats, rq, noexec=False):
3091        sceneQueueEvent.__init__(self, task, stats, rq)
3092        self.noexec = noexec
3093
3094class runQueueTaskFailed(runQueueEvent):
3095    """
3096    Event notifying a task failed
3097    """
3098    def __init__(self, task, stats, exitcode, rq, fakeroot_log=None):
3099        runQueueEvent.__init__(self, task, stats, rq)
3100        self.exitcode = exitcode
3101        self.fakeroot_log = fakeroot_log
3102
3103    def __str__(self):
3104        if self.fakeroot_log:
3105            return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log)
3106        else:
3107            return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode)
3108
3109class sceneQueueTaskFailed(sceneQueueEvent):
3110    """
3111    Event notifying a setscene task failed
3112    """
3113    def __init__(self, task, stats, exitcode, rq):
3114        sceneQueueEvent.__init__(self, task, stats, rq)
3115        self.exitcode = exitcode
3116
3117    def __str__(self):
3118        return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode)
3119
3120class sceneQueueComplete(sceneQueueEvent):
3121    """
3122    Event when all the sceneQueue tasks are complete
3123    """
3124    def __init__(self, stats, rq):
3125        self.stats = stats.copy()
3126        bb.event.Event.__init__(self)
3127
3128class runQueueTaskCompleted(runQueueEvent):
3129    """
3130    Event notifying a task completed
3131    """
3132
3133class sceneQueueTaskCompleted(sceneQueueEvent):
3134    """
3135    Event notifying a setscene task completed
3136    """
3137
3138class runQueueTaskSkipped(runQueueEvent):
3139    """
3140    Event notifying a task was skipped
3141    """
3142    def __init__(self, task, stats, rq, reason):
3143        runQueueEvent.__init__(self, task, stats, rq)
3144        self.reason = reason
3145
3146class taskUniHashUpdate(bb.event.Event):
3147    """
3148    Base runQueue event class
3149    """
3150    def __init__(self, task, unihash):
3151        self.taskid = task
3152        self.unihash = unihash
3153        bb.event.Event.__init__(self)
3154
3155class runQueuePipe():
3156    """
3157    Abstraction for a pipe between a worker thread and the server
3158    """
3159    def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None):
3160        self.input = pipein
3161        if pipeout:
3162            pipeout.close()
3163        bb.utils.nonblockingfd(self.input)
3164        self.queue = bytearray()
3165        self.d = d
3166        self.rq = rq
3167        self.rqexec = rqexec
3168        self.fakerootlogs = fakerootlogs
3169
3170    def setrunqueueexec(self, rqexec):
3171        self.rqexec = rqexec
3172
3173    def read(self):
3174        for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]:
3175            for worker in workers.values():
3176                worker.process.poll()
3177                if worker.process.returncode is not None and not self.rq.teardown:
3178                    bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode)))
3179                    self.rq.finish_runqueue(True)
3180
3181        start = len(self.queue)
3182        try:
3183            self.queue.extend(self.input.read(102400) or b"")
3184        except (OSError, IOError) as e:
3185            if e.errno != errno.EAGAIN:
3186                raise
3187        end = len(self.queue)
3188        found = True
3189        while found and self.queue:
3190            found = False
3191            index = self.queue.find(b"</event>")
3192            while index != -1 and self.queue.startswith(b"<event>"):
3193                try:
3194                    event = pickle.loads(self.queue[7:index])
3195                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3196                    if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e):
3197                        # The pickled data could contain "</event>" so search for the next occurance
3198                        # unpickling again, this should be the only way an unpickle error could occur
3199                        index = self.queue.find(b"</event>", index + 1)
3200                        continue
3201                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
3202                bb.event.fire_from_worker(event, self.d)
3203                if isinstance(event, taskUniHashUpdate):
3204                    self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash))
3205                found = True
3206                self.queue = self.queue[index+8:]
3207                index = self.queue.find(b"</event>")
3208            index = self.queue.find(b"</exitcode>")
3209            while index != -1 and self.queue.startswith(b"<exitcode>"):
3210                try:
3211                    task, status = pickle.loads(self.queue[10:index])
3212                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3213                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
3214                (_, _, _, taskfn) = split_tid_mcfn(task)
3215                fakerootlog = None
3216                if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs:
3217                    fakerootlog = self.fakerootlogs[taskfn]
3218                self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog)
3219                found = True
3220                self.queue = self.queue[index+11:]
3221                index = self.queue.find(b"</exitcode>")
3222        return (end > start)
3223
3224    def close(self):
3225        while self.read():
3226            continue
3227        if self.queue:
3228            print("Warning, worker left partial message: %s" % self.queue)
3229        self.input.close()
3230
3231def get_setscene_enforce_ignore_tasks(d, targets):
3232    if d.getVar('BB_SETSCENE_ENFORCE') != '1':
3233        return None
3234    ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split()
3235    outlist = []
3236    for item in ignore_tasks[:]:
3237        if item.startswith('%:'):
3238            for (mc, target, task, fn) in targets:
3239                outlist.append(target + ':' + item.split(':')[1])
3240        else:
3241            outlist.append(item)
3242    return outlist
3243
3244def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks):
3245    import fnmatch
3246    if ignore_tasks is not None:
3247        item = '%s:%s' % (pn, taskname)
3248        for ignore_tasks in ignore_tasks:
3249            if fnmatch.fnmatch(item, ignore_tasks):
3250                return True
3251        return False
3252    return True
3253