xref: /openbmc/openbmc/poky/bitbake/lib/bb/runqueue.py (revision 8460358c3d24c71d9d38fd126c745854a6301564)
1"""
2BitBake 'RunQueue' implementation
3
4Handles preparation and execution of a queue of tasks
5"""
6
7# Copyright (C) 2006-2007  Richard Purdie
8#
9# SPDX-License-Identifier: GPL-2.0-only
10#
11
12import copy
13import os
14import sys
15import stat
16import errno
17import itertools
18import logging
19import re
20import bb
21from bb import msg, event
22from bb import monitordisk
23import subprocess
24import pickle
25from multiprocessing import Process
26import shlex
27import pprint
28import time
29
30bblogger = logging.getLogger("BitBake")
31logger = logging.getLogger("BitBake.RunQueue")
32hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv")
33
34__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' )
35
36def fn_from_tid(tid):
37     return tid.rsplit(":", 1)[0]
38
39def taskname_from_tid(tid):
40    return tid.rsplit(":", 1)[1]
41
42def mc_from_tid(tid):
43    if tid.startswith('mc:') and tid.count(':') >= 2:
44        return tid.split(':')[1]
45    return ""
46
47def split_tid(tid):
48    (mc, fn, taskname, _) = split_tid_mcfn(tid)
49    return (mc, fn, taskname)
50
51def split_mc(n):
52    if n.startswith("mc:") and n.count(':') >= 2:
53        _, mc, n = n.split(":", 2)
54        return (mc, n)
55    return ('', n)
56
57def split_tid_mcfn(tid):
58    if tid.startswith('mc:') and tid.count(':') >= 2:
59        elems = tid.split(':')
60        mc = elems[1]
61        fn = ":".join(elems[2:-1])
62        taskname = elems[-1]
63        mcfn = "mc:" + mc + ":" + fn
64    else:
65        tid = tid.rsplit(":", 1)
66        mc = ""
67        fn = tid[0]
68        taskname = tid[1]
69        mcfn = fn
70
71    return (mc, fn, taskname, mcfn)
72
73def build_tid(mc, fn, taskname):
74    if mc:
75        return "mc:" + mc + ":" + fn + ":" + taskname
76    return fn + ":" + taskname
77
78# Index used to pair up potentially matching multiconfig tasks
79# We match on PN, taskname and hash being equal
80def pending_hash_index(tid, rqdata):
81    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
82    pn = rqdata.dataCaches[mc].pkg_fn[taskfn]
83    h = rqdata.runtaskentries[tid].unihash
84    return pn + ":" + "taskname" + h
85
86class RunQueueStats:
87    """
88    Holds statistics on the tasks handled by the associated runQueue
89    """
90    def __init__(self, total, setscene_total):
91        self.completed = 0
92        self.skipped = 0
93        self.failed = 0
94        self.active = 0
95        self.setscene_active = 0
96        self.setscene_covered = 0
97        self.setscene_notcovered = 0
98        self.setscene_total = setscene_total
99        self.total = total
100
101    def copy(self):
102        obj = self.__class__(self.total, self.setscene_total)
103        obj.__dict__.update(self.__dict__)
104        return obj
105
106    def taskFailed(self):
107        self.active = self.active - 1
108        self.failed = self.failed + 1
109
110    def taskCompleted(self):
111        self.active = self.active - 1
112        self.completed = self.completed + 1
113
114    def taskSkipped(self):
115        self.active = self.active + 1
116        self.skipped = self.skipped + 1
117
118    def taskActive(self):
119        self.active = self.active + 1
120
121    def updateCovered(self, covered, notcovered):
122        self.setscene_covered = covered
123        self.setscene_notcovered = notcovered
124
125    def updateActiveSetscene(self, active):
126        self.setscene_active = active
127
128# These values indicate the next step due to be run in the
129# runQueue state machine
130runQueuePrepare = 2
131runQueueSceneInit = 3
132runQueueDumpSigs = 4
133runQueueRunning = 6
134runQueueFailed = 7
135runQueueCleanUp = 8
136runQueueComplete = 9
137
138class RunQueueScheduler(object):
139    """
140    Control the order tasks are scheduled in.
141    """
142    name = "basic"
143
144    def __init__(self, runqueue, rqdata):
145        """
146        The default scheduler just returns the first buildable task (the
147        priority map is sorted by task number)
148        """
149        self.rq = runqueue
150        self.rqdata = rqdata
151        self.numTasks = len(self.rqdata.runtaskentries)
152
153        self.prio_map = [self.rqdata.runtaskentries.keys()]
154
155        self.buildable = set()
156        self.skip_maxthread = {}
157        self.stamps = {}
158        for tid in self.rqdata.runtaskentries:
159            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
160            self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
161            if tid in self.rq.runq_buildable:
162                self.buildable.add(tid)
163
164        self.rev_prio_map = None
165        self.is_pressure_usable()
166
167    def is_pressure_usable(self):
168        """
169        If monitoring pressure, return True if pressure files can be open and read. For example
170        openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported)
171        is returned.
172        """
173        if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure:
174            try:
175                with open("/proc/pressure/cpu") as cpu_pressure_fds, \
176                    open("/proc/pressure/io") as io_pressure_fds, \
177                    open("/proc/pressure/memory") as memory_pressure_fds:
178
179                    self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
180                    self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
181                    self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
182                    self.prev_pressure_time = time.time()
183                self.check_pressure = True
184            except:
185                bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure")
186                self.check_pressure = False
187        else:
188            self.check_pressure = False
189
190    def exceeds_max_pressure(self):
191        """
192        Monitor the difference in total pressure at least once per second, if
193        BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold.
194        """
195        if self.check_pressure:
196            with open("/proc/pressure/cpu") as cpu_pressure_fds, \
197                open("/proc/pressure/io") as io_pressure_fds, \
198                open("/proc/pressure/memory") as memory_pressure_fds:
199                # extract "total" from /proc/pressure/{cpu|io}
200                curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
201                curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
202                curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
203                now = time.time()
204                tdiff = now - self.prev_pressure_time
205                psi_accumulation_interval = 1.0
206                cpu_pressure = (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) / tdiff
207                io_pressure = (float(curr_io_pressure) - float(self.prev_io_pressure)) / tdiff
208                memory_pressure = (float(curr_memory_pressure) - float(self.prev_memory_pressure)) / tdiff
209                exceeds_cpu_pressure =  self.rq.max_cpu_pressure and cpu_pressure > self.rq.max_cpu_pressure
210                exceeds_io_pressure =  self.rq.max_io_pressure and io_pressure > self.rq.max_io_pressure
211                exceeds_memory_pressure =  self.rq.max_memory_pressure and memory_pressure > self.rq.max_memory_pressure
212
213                if tdiff > psi_accumulation_interval:
214                    self.prev_cpu_pressure = curr_cpu_pressure
215                    self.prev_io_pressure = curr_io_pressure
216                    self.prev_memory_pressure = curr_memory_pressure
217                    self.prev_pressure_time = now
218
219            pressure_state = (exceeds_cpu_pressure, exceeds_io_pressure, exceeds_memory_pressure)
220            pressure_values = (round(cpu_pressure,1), self.rq.max_cpu_pressure, round(io_pressure,1), self.rq.max_io_pressure, round(memory_pressure,1), self.rq.max_memory_pressure)
221            if hasattr(self, "pressure_state") and pressure_state != self.pressure_state:
222                bb.note("Pressure status changed to CPU: %s, IO: %s, Mem: %s (CPU: %s/%s, IO: %s/%s, Mem: %s/%s) - using %s/%s bitbake threads" % (pressure_state + pressure_values + (len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks)))
223            self.pressure_state = pressure_state
224            return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure)
225        elif self.rq.max_loadfactor:
226            limit = False
227            loadfactor = float(os.getloadavg()[0]) / os.cpu_count()
228            # bb.warn("Comparing %s to %s" % (loadfactor, self.rq.max_loadfactor))
229            if loadfactor > self.rq.max_loadfactor:
230                limit = True
231            if hasattr(self, "loadfactor_limit") and limit != self.loadfactor_limit:
232                bb.note("Load average limiting set to %s as load average: %s - using %s/%s bitbake threads" % (limit, loadfactor, len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks))
233            self.loadfactor_limit = limit
234            return limit
235        return False
236
237    def next_buildable_task(self):
238        """
239        Return the id of the first task we find that is buildable
240        """
241        # Once tasks are running we don't need to worry about them again
242        self.buildable.difference_update(self.rq.runq_running)
243        buildable = set(self.buildable)
244        buildable.difference_update(self.rq.holdoff_tasks)
245        buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered)
246        if not buildable:
247            return None
248
249        # Bitbake requires that at least one task be active. Only check for pressure if
250        # this is the case, otherwise the pressure limitation could result in no tasks
251        # being active and no new tasks started thereby, at times, breaking the scheduler.
252        if self.rq.stats.active and self.exceeds_max_pressure():
253            return None
254
255        # Filter out tasks that have a max number of threads that have been exceeded
256        skip_buildable = {}
257        for running in self.rq.runq_running.difference(self.rq.runq_complete):
258            rtaskname = taskname_from_tid(running)
259            if rtaskname not in self.skip_maxthread:
260                self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads")
261            if not self.skip_maxthread[rtaskname]:
262                continue
263            if rtaskname in skip_buildable:
264                skip_buildable[rtaskname] += 1
265            else:
266                skip_buildable[rtaskname] = 1
267
268        if len(buildable) == 1:
269            tid = buildable.pop()
270            taskname = taskname_from_tid(tid)
271            if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
272                return None
273            stamp = self.stamps[tid]
274            if stamp not in self.rq.build_stamps.values():
275                return tid
276
277        if not self.rev_prio_map:
278            self.rev_prio_map = {}
279            for tid in self.rqdata.runtaskentries:
280                self.rev_prio_map[tid] = self.prio_map.index(tid)
281
282        best = None
283        bestprio = None
284        for tid in buildable:
285            prio = self.rev_prio_map[tid]
286            if bestprio is None or bestprio > prio:
287                taskname = taskname_from_tid(tid)
288                if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
289                    continue
290                stamp = self.stamps[tid]
291                if stamp in self.rq.build_stamps.values():
292                    continue
293                bestprio = prio
294                best = tid
295
296        return best
297
298    def next(self):
299        """
300        Return the id of the task we should build next
301        """
302        if self.rq.can_start_task():
303            return self.next_buildable_task()
304
305    def newbuildable(self, task):
306        self.buildable.add(task)
307
308    def removebuildable(self, task):
309        self.buildable.remove(task)
310
311    def describe_task(self, taskid):
312        result = 'ID %s' % taskid
313        if self.rev_prio_map:
314            result = result + (' pri %d' % self.rev_prio_map[taskid])
315        return result
316
317    def dump_prio(self, comment):
318        bb.debug(3, '%s (most important first):\n%s' %
319                 (comment,
320                  '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for
321                             index, taskid in enumerate(self.prio_map)])))
322
323class RunQueueSchedulerSpeed(RunQueueScheduler):
324    """
325    A scheduler optimised for speed. The priority map is sorted by task weight,
326    heavier weighted tasks (tasks needed by the most other tasks) are run first.
327    """
328    name = "speed"
329
330    def __init__(self, runqueue, rqdata):
331        """
332        The priority map is sorted by task weight.
333        """
334        RunQueueScheduler.__init__(self, runqueue, rqdata)
335
336        weights = {}
337        for tid in self.rqdata.runtaskentries:
338            weight = self.rqdata.runtaskentries[tid].weight
339            if not weight in weights:
340                weights[weight] = []
341            weights[weight].append(tid)
342
343        self.prio_map = []
344        for weight in sorted(weights):
345            for w in weights[weight]:
346                self.prio_map.append(w)
347
348        self.prio_map.reverse()
349
350class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
351    """
352    A scheduler optimised to complete .bb files as quickly as possible. The
353    priority map is sorted by task weight, but then reordered so once a given
354    .bb file starts to build, it's completed as quickly as possible by
355    running all tasks related to the same .bb file one after the after.
356    This works well where disk space is at a premium and classes like OE's
357    rm_work are in force.
358    """
359    name = "completion"
360
361    def __init__(self, runqueue, rqdata):
362        super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata)
363
364        # Extract list of tasks for each recipe, with tasks sorted
365        # ascending from "must run first" (typically do_fetch) to
366        # "runs last" (do_build). The speed scheduler prioritizes
367        # tasks that must run first before the ones that run later;
368        # this is what we depend on here.
369        task_lists = {}
370        for taskid in self.prio_map:
371            fn, taskname = taskid.rsplit(':', 1)
372            task_lists.setdefault(fn, []).append(taskname)
373
374        # Now unify the different task lists. The strategy is that
375        # common tasks get skipped and new ones get inserted after the
376        # preceeding common one(s) as they are found. Because task
377        # lists should differ only by their number of tasks, but not
378        # the ordering of the common tasks, this should result in a
379        # deterministic result that is a superset of the individual
380        # task ordering.
381        all_tasks = []
382        for recipe, new_tasks in task_lists.items():
383            index = 0
384            old_task = all_tasks[index] if index < len(all_tasks) else None
385            for new_task in new_tasks:
386                if old_task == new_task:
387                    # Common task, skip it. This is the fast-path which
388                    # avoids a full search.
389                    index += 1
390                    old_task = all_tasks[index] if index < len(all_tasks) else None
391                else:
392                    try:
393                        index = all_tasks.index(new_task)
394                        # Already present, just not at the current
395                        # place. We re-synchronized by changing the
396                        # index so that it matches again. Now
397                        # move on to the next existing task.
398                        index += 1
399                        old_task = all_tasks[index] if index < len(all_tasks) else None
400                    except ValueError:
401                        # Not present. Insert before old_task, which
402                        # remains the same (but gets shifted back).
403                        all_tasks.insert(index, new_task)
404                        index += 1
405        bb.debug(3, 'merged task list: %s'  % all_tasks)
406
407        # Now reverse the order so that tasks that finish the work on one
408        # recipe are considered more imporant (= come first). The ordering
409        # is now so that do_build is most important.
410        all_tasks.reverse()
411
412        # Group tasks of the same kind before tasks of less important
413        # kinds at the head of the queue (because earlier = lower
414        # priority number = runs earlier), while preserving the
415        # ordering by recipe. If recipe foo is more important than
416        # bar, then the goal is to work on foo's do_populate_sysroot
417        # before bar's do_populate_sysroot and on the more important
418        # tasks of foo before any of the less important tasks in any
419        # other recipe (if those other recipes are more important than
420        # foo).
421        #
422        # All of this only applies when tasks are runable. Explicit
423        # dependencies still override this ordering by priority.
424        #
425        # Here's an example why this priority re-ordering helps with
426        # minimizing disk usage. Consider a recipe foo with a higher
427        # priority than bar where foo DEPENDS on bar. Then the
428        # implicit rule (from base.bbclass) is that foo's do_configure
429        # depends on bar's do_populate_sysroot. This ensures that
430        # bar's do_populate_sysroot gets done first. Normally the
431        # tasks from foo would continue to run once that is done, and
432        # bar only gets completed and cleaned up later. By ordering
433        # bar's task that depend on bar's do_populate_sysroot before foo's
434        # do_configure, that problem gets avoided.
435        task_index = 0
436        self.dump_prio('original priorities')
437        for task in all_tasks:
438            for index in range(task_index, self.numTasks):
439                taskid = self.prio_map[index]
440                taskname = taskid.rsplit(':', 1)[1]
441                if taskname == task:
442                    del self.prio_map[index]
443                    self.prio_map.insert(task_index, taskid)
444                    task_index += 1
445        self.dump_prio('completion priorities')
446
447class RunTaskEntry(object):
448    def __init__(self):
449        self.depends = set()
450        self.revdeps = set()
451        self.hash = None
452        self.unihash = None
453        self.task = None
454        self.weight = 1
455
456class RunQueueData:
457    """
458    BitBake Run Queue implementation
459    """
460    def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets):
461        self.cooker = cooker
462        self.dataCaches = dataCaches
463        self.taskData = taskData
464        self.targets = targets
465        self.rq = rq
466        self.warn_multi_bb = False
467
468        self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split()
469        self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets)
470        self.setscene_ignore_tasks_checked = False
471        self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1")
472        self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter()
473
474        self.reset()
475
476    def reset(self):
477        self.runtaskentries = {}
478
479    def runq_depends_names(self, ids):
480        import re
481        ret = []
482        for id in ids:
483            nam = os.path.basename(id)
484            nam = re.sub("_[^,]*,", ",", nam)
485            ret.extend([nam])
486        return ret
487
488    def get_task_hash(self, tid):
489        return self.runtaskentries[tid].hash
490
491    def get_task_unihash(self, tid):
492        return self.runtaskentries[tid].unihash
493
494    def get_user_idstring(self, tid, task_name_suffix = ""):
495        return tid + task_name_suffix
496
497    def get_short_user_idstring(self, task, task_name_suffix = ""):
498        (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
499        pn = self.dataCaches[mc].pkg_fn[taskfn]
500        taskname = taskname_from_tid(task) + task_name_suffix
501        return "%s:%s" % (pn, taskname)
502
503    def circular_depchains_handler(self, tasks):
504        """
505        Some tasks aren't buildable, likely due to circular dependency issues.
506        Identify the circular dependencies and print them in a user readable format.
507        """
508        from copy import deepcopy
509
510        valid_chains = []
511        explored_deps = {}
512        msgs = []
513
514        class TooManyLoops(Exception):
515            pass
516
517        def chain_reorder(chain):
518            """
519            Reorder a dependency chain so the lowest task id is first
520            """
521            lowest = 0
522            new_chain = []
523            for entry in range(len(chain)):
524                if chain[entry] < chain[lowest]:
525                    lowest = entry
526            new_chain.extend(chain[lowest:])
527            new_chain.extend(chain[:lowest])
528            return new_chain
529
530        def chain_compare_equal(chain1, chain2):
531            """
532            Compare two dependency chains and see if they're the same
533            """
534            if len(chain1) != len(chain2):
535                return False
536            for index in range(len(chain1)):
537                if chain1[index] != chain2[index]:
538                    return False
539            return True
540
541        def chain_array_contains(chain, chain_array):
542            """
543            Return True if chain_array contains chain
544            """
545            for ch in chain_array:
546                if chain_compare_equal(ch, chain):
547                    return True
548            return False
549
550        def find_chains(tid, prev_chain):
551            prev_chain.append(tid)
552            total_deps = []
553            total_deps.extend(self.runtaskentries[tid].revdeps)
554            for revdep in self.runtaskentries[tid].revdeps:
555                if revdep in prev_chain:
556                    idx = prev_chain.index(revdep)
557                    # To prevent duplicates, reorder the chain to start with the lowest taskid
558                    # and search through an array of those we've already printed
559                    chain = prev_chain[idx:]
560                    new_chain = chain_reorder(chain)
561                    if not chain_array_contains(new_chain, valid_chains):
562                        valid_chains.append(new_chain)
563                        msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
564                        for dep in new_chain:
565                            msgs.append("  Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends)))
566                        msgs.append("\n")
567                    if len(valid_chains) > 10:
568                        msgs.append("Halted dependency loops search after 10 matches.\n")
569                        raise TooManyLoops
570                    continue
571                scan = False
572                if revdep not in explored_deps:
573                    scan = True
574                elif revdep in explored_deps[revdep]:
575                    scan = True
576                else:
577                    for dep in prev_chain:
578                        if dep in explored_deps[revdep]:
579                            scan = True
580                if scan:
581                    find_chains(revdep, copy.deepcopy(prev_chain))
582                for dep in explored_deps[revdep]:
583                    if dep not in total_deps:
584                        total_deps.append(dep)
585
586            explored_deps[tid] = total_deps
587
588        try:
589            for task in tasks:
590                find_chains(task, [])
591        except TooManyLoops:
592            pass
593
594        return msgs
595
596    def calculate_task_weights(self, endpoints):
597        """
598        Calculate a number representing the "weight" of each task. Heavier weighted tasks
599        have more dependencies and hence should be executed sooner for maximum speed.
600
601        This function also sanity checks the task list finding tasks that are not
602        possible to execute due to circular dependencies.
603        """
604
605        numTasks = len(self.runtaskentries)
606        weight = {}
607        deps_left = {}
608        task_done = {}
609
610        for tid in self.runtaskentries:
611            task_done[tid] = False
612            weight[tid] = 1
613            deps_left[tid] = len(self.runtaskentries[tid].revdeps)
614
615        for tid in endpoints:
616            weight[tid] = 10
617            task_done[tid] = True
618
619        while True:
620            next_points = []
621            for tid in endpoints:
622                for revdep in self.runtaskentries[tid].depends:
623                    weight[revdep] = weight[revdep] + weight[tid]
624                    deps_left[revdep] = deps_left[revdep] - 1
625                    if deps_left[revdep] == 0:
626                        next_points.append(revdep)
627                        task_done[revdep] = True
628            endpoints = next_points
629            if not next_points:
630                break
631
632        # Circular dependency sanity check
633        problem_tasks = []
634        for tid in self.runtaskentries:
635            if task_done[tid] is False or deps_left[tid] != 0:
636                problem_tasks.append(tid)
637                logger.debug2("Task %s is not buildable", tid)
638                logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid])
639            self.runtaskentries[tid].weight = weight[tid]
640
641        if problem_tasks:
642            message = "%s unbuildable tasks were found.\n" % len(problem_tasks)
643            message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
644            message = message + "Identifying dependency loops (this may take a short while)...\n"
645            logger.error(message)
646
647            msgs = self.circular_depchains_handler(problem_tasks)
648
649            message = "\n"
650            for msg in msgs:
651                message = message + msg
652            bb.msg.fatal("RunQueue", message)
653
654        return weight
655
656    def prepare(self):
657        """
658        Turn a set of taskData into a RunQueue and compute data needed
659        to optimise the execution order.
660        """
661
662        runq_build = {}
663        recursivetasks = {}
664        recursiveitasks = {}
665        recursivetasksselfref = set()
666
667        taskData = self.taskData
668
669        found = False
670        for mc in self.taskData:
671            if taskData[mc].taskentries:
672                found = True
673                break
674        if not found:
675            # Nothing to do
676            return 0
677
678        bb.parse.siggen.setup_datacache(self.dataCaches)
679
680        self.init_progress_reporter.start()
681        self.init_progress_reporter.next_stage()
682        bb.event.check_for_interrupts(self.cooker.data)
683
684        # Step A - Work out a list of tasks to run
685        #
686        # Taskdata gives us a list of possible providers for every build and run
687        # target ordered by priority. It also gives information on each of those
688        # providers.
689        #
690        # To create the actual list of tasks to execute we fix the list of
691        # providers and then resolve the dependencies into task IDs. This
692        # process is repeated for each type of dependency (tdepends, deptask,
693        # rdeptast, recrdeptask, idepends).
694
695        def add_build_dependencies(depids, tasknames, depends, mc):
696            for depname in depids:
697                # Won't be in build_targets if ASSUME_PROVIDED
698                if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]:
699                    continue
700                depdata = taskData[mc].build_targets[depname][0]
701                if depdata is None:
702                    continue
703                for taskname in tasknames:
704                    t = depdata + ":" + taskname
705                    if t in taskData[mc].taskentries:
706                        depends.add(t)
707
708        def add_runtime_dependencies(depids, tasknames, depends, mc):
709            for depname in depids:
710                if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]:
711                    continue
712                depdata = taskData[mc].run_targets[depname][0]
713                if depdata is None:
714                    continue
715                for taskname in tasknames:
716                    t = depdata + ":" + taskname
717                    if t in taskData[mc].taskentries:
718                        depends.add(t)
719
720        def add_mc_dependencies(mc, tid):
721            mcdeps = taskData[mc].get_mcdepends()
722            for dep in mcdeps:
723                mcdependency = dep.split(':')
724                pn = mcdependency[3]
725                frommc = mcdependency[1]
726                mcdep = mcdependency[2]
727                deptask = mcdependency[4]
728                if mcdep not in taskData:
729                    bb.fatal("Multiconfig '%s' is referenced in multiconfig dependency '%s' but not enabled in BBMULTICONFIG?" % (mcdep, dep))
730                if mc == frommc:
731                    fn = taskData[mcdep].build_targets[pn][0]
732                    newdep = '%s:%s' % (fn,deptask)
733                    taskData[mc].taskentries[tid].tdepends.append(newdep)
734
735        for mc in taskData:
736            for tid in taskData[mc].taskentries:
737
738                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
739                #runtid = build_tid(mc, fn, taskname)
740
741                #logger.debug2("Processing %s,%s:%s", mc, fn, taskname)
742
743                depends = set()
744                task_deps = self.dataCaches[mc].task_deps[taskfn]
745
746                self.runtaskentries[tid] = RunTaskEntry()
747
748                if fn in taskData[mc].failed_fns:
749                    continue
750
751                # We add multiconfig dependencies before processing internal task deps (tdepends)
752                if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']:
753                    add_mc_dependencies(mc, tid)
754
755                # Resolve task internal dependencies
756                #
757                # e.g. addtask before X after Y
758                for t in taskData[mc].taskentries[tid].tdepends:
759                    (depmc, depfn, deptaskname, _) = split_tid_mcfn(t)
760                    depends.add(build_tid(depmc, depfn, deptaskname))
761
762                # Resolve 'deptask' dependencies
763                #
764                # e.g. do_sometask[deptask] = "do_someothertask"
765                # (makes sure sometask runs after someothertask of all DEPENDS)
766                if 'deptask' in task_deps and taskname in task_deps['deptask']:
767                    tasknames = task_deps['deptask'][taskname].split()
768                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
769
770                # Resolve 'rdeptask' dependencies
771                #
772                # e.g. do_sometask[rdeptask] = "do_someothertask"
773                # (makes sure sometask runs after someothertask of all RDEPENDS)
774                if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']:
775                    tasknames = task_deps['rdeptask'][taskname].split()
776                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
777
778                # Resolve inter-task dependencies
779                #
780                # e.g. do_sometask[depends] = "targetname:do_someothertask"
781                # (makes sure sometask runs after targetname's someothertask)
782                idepends = taskData[mc].taskentries[tid].idepends
783                for (depname, idependtask) in idepends:
784                    if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps:
785                        # Won't be in build_targets if ASSUME_PROVIDED
786                        depdata = taskData[mc].build_targets[depname][0]
787                        if depdata is not None:
788                            t = depdata + ":" + idependtask
789                            depends.add(t)
790                            if t not in taskData[mc].taskentries:
791                                bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
792                irdepends = taskData[mc].taskentries[tid].irdepends
793                for (depname, idependtask) in irdepends:
794                    if depname in taskData[mc].run_targets:
795                        # Won't be in run_targets if ASSUME_PROVIDED
796                        if not taskData[mc].run_targets[depname]:
797                            continue
798                        depdata = taskData[mc].run_targets[depname][0]
799                        if depdata is not None:
800                            t = depdata + ":" + idependtask
801                            depends.add(t)
802                            if t not in taskData[mc].taskentries:
803                                bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
804
805                # Resolve recursive 'recrdeptask' dependencies (Part A)
806                #
807                # e.g. do_sometask[recrdeptask] = "do_someothertask"
808                # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
809                # We cover the recursive part of the dependencies below
810                if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']:
811                    tasknames = task_deps['recrdeptask'][taskname].split()
812                    recursivetasks[tid] = tasknames
813                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
814                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
815                    if taskname in tasknames:
816                        recursivetasksselfref.add(tid)
817
818                    if 'recideptask' in task_deps and taskname in task_deps['recideptask']:
819                        recursiveitasks[tid] = []
820                        for t in task_deps['recideptask'][taskname].split():
821                            newdep = build_tid(mc, fn, t)
822                            recursiveitasks[tid].append(newdep)
823
824                self.runtaskentries[tid].depends = depends
825                # Remove all self references
826                self.runtaskentries[tid].depends.discard(tid)
827
828        #self.dump_data()
829
830        self.init_progress_reporter.next_stage()
831        bb.event.check_for_interrupts(self.cooker.data)
832
833        # Resolve recursive 'recrdeptask' dependencies (Part B)
834        #
835        # e.g. do_sometask[recrdeptask] = "do_someothertask"
836        # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
837        # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed
838
839        # Generating/interating recursive lists of dependencies is painful and potentially slow
840        # Precompute recursive task dependencies here by:
841        #     a) create a temp list of reverse dependencies (revdeps)
842        #     b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0)
843        #     c) combine the total list of dependencies in cumulativedeps
844        #     d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower)
845
846
847        revdeps = {}
848        deps = {}
849        cumulativedeps = {}
850        for tid in self.runtaskentries:
851            deps[tid] = set(self.runtaskentries[tid].depends)
852            revdeps[tid] = set()
853            cumulativedeps[tid] = set()
854        # Generate a temp list of reverse dependencies
855        for tid in self.runtaskentries:
856            for dep in self.runtaskentries[tid].depends:
857                revdeps[dep].add(tid)
858        # Find the dependency chain endpoints
859        endpoints = set()
860        for tid in self.runtaskentries:
861            if not deps[tid]:
862                endpoints.add(tid)
863        # Iterate the chains collating dependencies
864        while endpoints:
865            next = set()
866            for tid in endpoints:
867                for dep in revdeps[tid]:
868                    cumulativedeps[dep].add(fn_from_tid(tid))
869                    cumulativedeps[dep].update(cumulativedeps[tid])
870                    if tid in deps[dep]:
871                        deps[dep].remove(tid)
872                    if not deps[dep]:
873                        next.add(dep)
874            endpoints = next
875        #for tid in deps:
876        #    if deps[tid]:
877        #        bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid]))
878
879        # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to
880        # resolve these recursively until we aren't adding any further extra dependencies
881        extradeps = True
882        while extradeps:
883            extradeps = 0
884            for tid in recursivetasks:
885                tasknames = recursivetasks[tid]
886
887                totaldeps = set(self.runtaskentries[tid].depends)
888                if tid in recursiveitasks:
889                    totaldeps.update(recursiveitasks[tid])
890                    for dep in recursiveitasks[tid]:
891                        if dep not in self.runtaskentries:
892                            continue
893                        totaldeps.update(self.runtaskentries[dep].depends)
894
895                deps = set()
896                for dep in totaldeps:
897                    if dep in cumulativedeps:
898                        deps.update(cumulativedeps[dep])
899
900                for t in deps:
901                    for taskname in tasknames:
902                        newtid = t + ":" + taskname
903                        if newtid == tid:
904                            continue
905                        if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends:
906                            extradeps += 1
907                            self.runtaskentries[tid].depends.add(newtid)
908
909                # Handle recursive tasks which depend upon other recursive tasks
910                deps = set()
911                for dep in self.runtaskentries[tid].depends.intersection(recursivetasks):
912                    deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends))
913                for newtid in deps:
914                    for taskname in tasknames:
915                        if not newtid.endswith(":" + taskname):
916                            continue
917                        if newtid in self.runtaskentries:
918                            extradeps += 1
919                            self.runtaskentries[tid].depends.add(newtid)
920
921            bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps)
922
923        # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work
924        for tid in recursivetasksselfref:
925            self.runtaskentries[tid].depends.difference_update(recursivetasksselfref)
926
927        self.init_progress_reporter.next_stage()
928        bb.event.check_for_interrupts(self.cooker.data)
929
930        #self.dump_data()
931
932        # Step B - Mark all active tasks
933        #
934        # Start with the tasks we were asked to run and mark all dependencies
935        # as active too. If the task is to be 'forced', clear its stamp. Once
936        # all active tasks are marked, prune the ones we don't need.
937
938        logger.verbose("Marking Active Tasks")
939
940        def mark_active(tid, depth):
941            """
942            Mark an item as active along with its depends
943            (calls itself recursively)
944            """
945
946            if tid in runq_build:
947                return
948
949            runq_build[tid] = 1
950
951            depends = self.runtaskentries[tid].depends
952            for depend in depends:
953                mark_active(depend, depth+1)
954
955        def invalidate_task(tid, error_nostamp):
956            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
957            taskdep = self.dataCaches[mc].task_deps[taskfn]
958            if fn + ":" + taskname not in taskData[mc].taskentries:
959                logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname)
960            if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
961                if error_nostamp:
962                    bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
963                else:
964                    bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
965            else:
966                logger.verbose("Invalidate task %s, %s", taskname, fn)
967                bb.parse.siggen.invalidate_task(taskname, taskfn)
968
969        self.target_tids = []
970        for (mc, target, task, fn) in self.targets:
971
972            if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]:
973                continue
974
975            if target in taskData[mc].failed_deps:
976                continue
977
978            parents = False
979            if task.endswith('-'):
980                parents = True
981                task = task[:-1]
982
983            if fn in taskData[mc].failed_fns:
984                continue
985
986            # fn already has mc prefix
987            tid = fn + ":" + task
988            self.target_tids.append(tid)
989            if tid not in taskData[mc].taskentries:
990                import difflib
991                tasks = []
992                for x in taskData[mc].taskentries:
993                    if x.startswith(fn + ":"):
994                        tasks.append(taskname_from_tid(x))
995                close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7)
996                if close_matches:
997                    extra = ". Close matches:\n  %s" % "\n  ".join(close_matches)
998                else:
999                    extra = ""
1000                bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra))
1001
1002            # For tasks called "XXXX-", ony run their dependencies
1003            if parents:
1004                for i in self.runtaskentries[tid].depends:
1005                    mark_active(i, 1)
1006            else:
1007                mark_active(tid, 1)
1008
1009        self.init_progress_reporter.next_stage()
1010        bb.event.check_for_interrupts(self.cooker.data)
1011
1012        # Step C - Prune all inactive tasks
1013        #
1014        # Once all active tasks are marked, prune the ones we don't need.
1015
1016        # Handle --runall
1017        if self.cooker.configuration.runall:
1018            # re-run the mark_active and then drop unused tasks from new list
1019
1020            runall_tids = set()
1021            added = True
1022            while added:
1023                reduced_tasklist = set(self.runtaskentries.keys())
1024                for tid in list(self.runtaskentries.keys()):
1025                    if tid not in runq_build:
1026                       reduced_tasklist.remove(tid)
1027                runq_build = {}
1028
1029                orig = runall_tids
1030                runall_tids = set()
1031                for task in self.cooker.configuration.runall:
1032                    if not task.startswith("do_"):
1033                        task = "do_{0}".format(task)
1034                    for tid in reduced_tasklist:
1035                        wanttid = "{0}:{1}".format(fn_from_tid(tid), task)
1036                        if wanttid in self.runtaskentries:
1037                            runall_tids.add(wanttid)
1038
1039                    for tid in list(runall_tids):
1040                        mark_active(tid, 1)
1041                        self.target_tids.append(tid)
1042                        if self.cooker.configuration.force:
1043                            invalidate_task(tid, False)
1044                added = runall_tids - orig
1045
1046        delcount = set()
1047        for tid in list(self.runtaskentries.keys()):
1048            if tid not in runq_build:
1049                delcount.add(tid)
1050                del self.runtaskentries[tid]
1051
1052        if self.cooker.configuration.runall:
1053            if not self.runtaskentries:
1054                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets)))
1055
1056        self.init_progress_reporter.next_stage()
1057        bb.event.check_for_interrupts(self.cooker.data)
1058
1059        # Handle runonly
1060        if self.cooker.configuration.runonly:
1061            # re-run the mark_active and then drop unused tasks from new list
1062            runq_build = {}
1063
1064            for task in self.cooker.configuration.runonly:
1065                if not task.startswith("do_"):
1066                    task = "do_{0}".format(task)
1067                runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task]
1068
1069                for tid in runonly_tids:
1070                    mark_active(tid, 1)
1071                    if self.cooker.configuration.force:
1072                        invalidate_task(tid, False)
1073
1074            for tid in list(self.runtaskentries.keys()):
1075                if tid not in runq_build:
1076                    delcount.add(tid)
1077                    del self.runtaskentries[tid]
1078
1079            if not self.runtaskentries:
1080                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets)))
1081
1082        #
1083        # Step D - Sanity checks and computation
1084        #
1085
1086        # Check to make sure we still have tasks to run
1087        if not self.runtaskentries:
1088            if not taskData[''].halt:
1089                bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
1090            else:
1091                bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
1092
1093        logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries))
1094
1095        logger.verbose("Assign Weightings")
1096
1097        self.init_progress_reporter.next_stage()
1098        bb.event.check_for_interrupts(self.cooker.data)
1099
1100        # Generate a list of reverse dependencies to ease future calculations
1101        for tid in self.runtaskentries:
1102            for dep in self.runtaskentries[tid].depends:
1103                self.runtaskentries[dep].revdeps.add(tid)
1104
1105        self.init_progress_reporter.next_stage()
1106        bb.event.check_for_interrupts(self.cooker.data)
1107
1108        # Identify tasks at the end of dependency chains
1109        # Error on circular dependency loops (length two)
1110        endpoints = []
1111        for tid in self.runtaskentries:
1112            revdeps = self.runtaskentries[tid].revdeps
1113            if not revdeps:
1114                endpoints.append(tid)
1115            for dep in revdeps:
1116                if dep in self.runtaskentries[tid].depends:
1117                    bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep))
1118
1119
1120        logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
1121
1122        self.init_progress_reporter.next_stage()
1123        bb.event.check_for_interrupts(self.cooker.data)
1124
1125        # Calculate task weights
1126        # Check of higher length circular dependencies
1127        self.runq_weight = self.calculate_task_weights(endpoints)
1128
1129        self.init_progress_reporter.next_stage()
1130        bb.event.check_for_interrupts(self.cooker.data)
1131
1132        # Sanity Check - Check for multiple tasks building the same provider
1133        for mc in self.dataCaches:
1134            prov_list = {}
1135            seen_fn = []
1136            for tid in self.runtaskentries:
1137                (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1138                if taskfn in seen_fn:
1139                    continue
1140                if mc != tidmc:
1141                    continue
1142                seen_fn.append(taskfn)
1143                for prov in self.dataCaches[mc].fn_provides[taskfn]:
1144                    if prov not in prov_list:
1145                        prov_list[prov] = [taskfn]
1146                    elif taskfn not in prov_list[prov]:
1147                        prov_list[prov].append(taskfn)
1148            for prov in prov_list:
1149                if len(prov_list[prov]) < 2:
1150                    continue
1151                if prov in self.multi_provider_allowed:
1152                    continue
1153                seen_pn = []
1154                # If two versions of the same PN are being built its fatal, we don't support it.
1155                for fn in prov_list[prov]:
1156                    pn = self.dataCaches[mc].pkg_fn[fn]
1157                    if pn not in seen_pn:
1158                        seen_pn.append(pn)
1159                    else:
1160                        bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
1161                msgs = ["Multiple .bb files are due to be built which each provide %s:\n  %s" % (prov, "\n  ".join(prov_list[prov]))]
1162                #
1163                # Construct a list of things which uniquely depend on each provider
1164                # since this may help the user figure out which dependency is triggering this warning
1165                #
1166                msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.")
1167                deplist = {}
1168                commondeps = None
1169                for provfn in prov_list[prov]:
1170                    deps = set()
1171                    for tid in self.runtaskentries:
1172                        fn = fn_from_tid(tid)
1173                        if fn != provfn:
1174                            continue
1175                        for dep in self.runtaskentries[tid].revdeps:
1176                            fn = fn_from_tid(dep)
1177                            if fn == provfn:
1178                                continue
1179                            deps.add(dep)
1180                    if not commondeps:
1181                        commondeps = set(deps)
1182                    else:
1183                        commondeps &= deps
1184                    deplist[provfn] = deps
1185                for provfn in deplist:
1186                    msgs.append("\n%s has unique dependees:\n  %s" % (provfn, "\n  ".join(deplist[provfn] - commondeps)))
1187                #
1188                # Construct a list of provides and runtime providers for each recipe
1189                # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC)
1190                #
1191                msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.")
1192                provide_results = {}
1193                rprovide_results = {}
1194                commonprovs = None
1195                commonrprovs = None
1196                for provfn in prov_list[prov]:
1197                    provides = set(self.dataCaches[mc].fn_provides[provfn])
1198                    rprovides = set()
1199                    for rprovide in self.dataCaches[mc].rproviders:
1200                        if provfn in self.dataCaches[mc].rproviders[rprovide]:
1201                            rprovides.add(rprovide)
1202                    for package in self.dataCaches[mc].packages:
1203                        if provfn in self.dataCaches[mc].packages[package]:
1204                            rprovides.add(package)
1205                    for package in self.dataCaches[mc].packages_dynamic:
1206                        if provfn in self.dataCaches[mc].packages_dynamic[package]:
1207                            rprovides.add(package)
1208                    if not commonprovs:
1209                        commonprovs = set(provides)
1210                    else:
1211                        commonprovs &= provides
1212                    provide_results[provfn] = provides
1213                    if not commonrprovs:
1214                        commonrprovs = set(rprovides)
1215                    else:
1216                        commonrprovs &= rprovides
1217                    rprovide_results[provfn] = rprovides
1218                #msgs.append("\nCommon provides:\n  %s" % ("\n  ".join(commonprovs)))
1219                #msgs.append("\nCommon rprovides:\n  %s" % ("\n  ".join(commonrprovs)))
1220                for provfn in prov_list[prov]:
1221                    msgs.append("\n%s has unique provides:\n  %s" % (provfn, "\n  ".join(provide_results[provfn] - commonprovs)))
1222                    msgs.append("\n%s has unique rprovides:\n  %s" % (provfn, "\n  ".join(rprovide_results[provfn] - commonrprovs)))
1223
1224                if self.warn_multi_bb:
1225                    logger.verbnote("".join(msgs))
1226                else:
1227                    logger.error("".join(msgs))
1228
1229        self.init_progress_reporter.next_stage()
1230        self.init_progress_reporter.next_stage()
1231        bb.event.check_for_interrupts(self.cooker.data)
1232
1233        # Iterate over the task list looking for tasks with a 'setscene' function
1234        self.runq_setscene_tids = set()
1235        if not self.cooker.configuration.nosetscene:
1236            for tid in self.runtaskentries:
1237                (mc, fn, taskname, _) = split_tid_mcfn(tid)
1238                setscenetid = tid + "_setscene"
1239                if setscenetid not in taskData[mc].taskentries:
1240                    continue
1241                self.runq_setscene_tids.add(tid)
1242
1243        self.init_progress_reporter.next_stage()
1244        bb.event.check_for_interrupts(self.cooker.data)
1245
1246        # Invalidate task if force mode active
1247        if self.cooker.configuration.force:
1248            for tid in self.target_tids:
1249                invalidate_task(tid, False)
1250
1251        # Invalidate task if invalidate mode active
1252        if self.cooker.configuration.invalidate_stamp:
1253            for tid in self.target_tids:
1254                fn = fn_from_tid(tid)
1255                for st in self.cooker.configuration.invalidate_stamp.split(','):
1256                    if not st.startswith("do_"):
1257                        st = "do_%s" % st
1258                    invalidate_task(fn + ":" + st, True)
1259
1260        self.init_progress_reporter.next_stage()
1261        bb.event.check_for_interrupts(self.cooker.data)
1262
1263        # Create and print to the logs a virtual/xxxx -> PN (fn) table
1264        for mc in taskData:
1265            virtmap = taskData[mc].get_providermap(prefix="virtual/")
1266            virtpnmap = {}
1267            for v in virtmap:
1268                virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]]
1269                bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v]))
1270            if hasattr(bb.parse.siggen, "tasks_resolved"):
1271                bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc])
1272
1273        self.init_progress_reporter.next_stage()
1274        bb.event.check_for_interrupts(self.cooker.data)
1275
1276        bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids)
1277
1278        starttime = time.time()
1279        lasttime = starttime
1280
1281        # Iterate over the task list and call into the siggen code
1282        dealtwith = set()
1283        todeal = set(self.runtaskentries)
1284        while todeal:
1285            ready = set()
1286            for tid in todeal.copy():
1287                if not (self.runtaskentries[tid].depends - dealtwith):
1288                    self.runtaskentries[tid].taskhash_deps = bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
1289                    # get_taskhash for a given tid *must* be called before get_unihash* below
1290                    self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
1291                    ready.add(tid)
1292            unihashes = bb.parse.siggen.get_unihashes(ready)
1293            for tid in ready:
1294                dealtwith.add(tid)
1295                todeal.remove(tid)
1296                self.runtaskentries[tid].unihash = unihashes[tid]
1297
1298            bb.event.check_for_interrupts(self.cooker.data)
1299
1300            if time.time() > (lasttime + 30):
1301                lasttime = time.time()
1302                hashequiv_logger.verbose("Initial setup loop progress: %s of %s in %s" % (len(todeal), len(self.runtaskentries), lasttime - starttime))
1303
1304        endtime = time.time()
1305        if (endtime-starttime > 60):
1306            hashequiv_logger.verbose("Initial setup loop took: %s" % (endtime-starttime))
1307
1308        bb.parse.siggen.writeout_file_checksum_cache()
1309
1310        #self.dump_data()
1311        return len(self.runtaskentries)
1312
1313    def dump_data(self):
1314        """
1315        Dump some debug information on the internal data structures
1316        """
1317        logger.debug3("run_tasks:")
1318        for tid in self.runtaskentries:
1319            logger.debug3(" %s: %s   Deps %s RevDeps %s", tid,
1320                         self.runtaskentries[tid].weight,
1321                         self.runtaskentries[tid].depends,
1322                         self.runtaskentries[tid].revdeps)
1323
1324class RunQueueWorker():
1325    def __init__(self, process, pipe):
1326        self.process = process
1327        self.pipe = pipe
1328
1329class RunQueue:
1330    def __init__(self, cooker, cfgData, dataCaches, taskData, targets):
1331
1332        self.cooker = cooker
1333        self.cfgData = cfgData
1334        self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets)
1335
1336        self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None
1337        self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None
1338
1339        self.state = runQueuePrepare
1340
1341        # For disk space monitor
1342        # Invoked at regular time intervals via the bitbake heartbeat event
1343        # while the build is running. We generate a unique name for the handler
1344        # here, just in case that there ever is more than one RunQueue instance,
1345        # start the handler when reaching runQueueSceneInit, and stop it when
1346        # done with the build.
1347        self.dm = monitordisk.diskMonitor(cfgData)
1348        self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self))
1349        self.dm_event_handler_registered = False
1350        self.rqexe = None
1351        self.worker = {}
1352        self.fakeworker = {}
1353
1354    @staticmethod
1355    def send_pickled_data(worker, data, name):
1356        msg = bytearray()
1357        msg.extend(b"<" + name.encode() + b">")
1358        pickled_data = pickle.dumps(data)
1359        msg.extend(len(pickled_data).to_bytes(4, 'big'))
1360        msg.extend(pickled_data)
1361        msg.extend(b"</" + name.encode() + b">")
1362        worker.stdin.write(msg)
1363
1364    def _start_worker(self, mc, fakeroot = False, rqexec = None):
1365        logger.debug("Starting bitbake-worker")
1366        magic = "decafbad"
1367        if self.cooker.configuration.profile:
1368            magic = "decafbadbad"
1369        fakerootlogs = None
1370
1371        workerscript = os.path.realpath(os.path.dirname(__file__) + "/../../bin/bitbake-worker")
1372        if fakeroot:
1373            magic = magic + "beef"
1374            mcdata = self.cooker.databuilder.mcdata[mc]
1375            fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD"))
1376            fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split()
1377            env = os.environ.copy()
1378            for key, value in (var.split('=',1) for var in fakerootenv):
1379                env[key] = value
1380            worker = subprocess.Popen(fakerootcmd + [sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
1381            fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs
1382        else:
1383            worker = subprocess.Popen([sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1384        bb.utils.nonblockingfd(worker.stdout)
1385        workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs)
1386
1387        workerdata = {
1388            "sigdata" : bb.parse.siggen.get_taskdata(),
1389            "logdefaultlevel" : bb.msg.loggerDefaultLogLevel,
1390            "build_verbose_shell" : self.cooker.configuration.build_verbose_shell,
1391            "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout,
1392            "logdefaultdomain" : bb.msg.loggerDefaultDomains,
1393            "prhost" : self.cooker.prhost,
1394            "buildname" : self.cfgData.getVar("BUILDNAME"),
1395            "date" : self.cfgData.getVar("DATE"),
1396            "time" : self.cfgData.getVar("TIME"),
1397            "hashservaddr" : self.cooker.hashservaddr,
1398            "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
1399        }
1400
1401        RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig")
1402        RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata")
1403        RunQueue.send_pickled_data(worker, workerdata, "workerdata")
1404        worker.stdin.flush()
1405
1406        return RunQueueWorker(worker, workerpipe)
1407
1408    def _teardown_worker(self, worker):
1409        if not worker:
1410            return
1411        logger.debug("Teardown for bitbake-worker")
1412        try:
1413           RunQueue.send_pickled_data(worker.process, b"", "quit")
1414           worker.process.stdin.flush()
1415           worker.process.stdin.close()
1416        except IOError:
1417           pass
1418        while worker.process.returncode is None:
1419            worker.pipe.read()
1420            worker.process.poll()
1421        while worker.pipe.read():
1422            continue
1423        worker.pipe.close()
1424
1425    def start_worker(self, rqexec):
1426        if self.worker:
1427            self.teardown_workers()
1428        self.teardown = False
1429        for mc in self.rqdata.dataCaches:
1430            self.worker[mc] = self._start_worker(mc, False, rqexec)
1431
1432    def start_fakeworker(self, rqexec, mc):
1433        if not mc in self.fakeworker:
1434            self.fakeworker[mc] = self._start_worker(mc, True, rqexec)
1435
1436    def teardown_workers(self):
1437        self.teardown = True
1438        for mc in self.worker:
1439            self._teardown_worker(self.worker[mc])
1440        self.worker = {}
1441        for mc in self.fakeworker:
1442            self._teardown_worker(self.fakeworker[mc])
1443        self.fakeworker = {}
1444
1445    def read_workers(self):
1446        for mc in self.worker:
1447            self.worker[mc].pipe.read()
1448        for mc in self.fakeworker:
1449            self.fakeworker[mc].pipe.read()
1450
1451    def active_fds(self):
1452        fds = []
1453        for mc in self.worker:
1454            fds.append(self.worker[mc].pipe.input)
1455        for mc in self.fakeworker:
1456            fds.append(self.fakeworker[mc].pipe.input)
1457        return fds
1458
1459    def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None):
1460        def get_timestamp(f):
1461            try:
1462                if not os.access(f, os.F_OK):
1463                    return None
1464                return os.stat(f)[stat.ST_MTIME]
1465            except:
1466                return None
1467
1468        (mc, fn, tn, taskfn) = split_tid_mcfn(tid)
1469        if taskname is None:
1470            taskname = tn
1471
1472        stampfile = bb.parse.siggen.stampfile_mcfn(taskname, taskfn)
1473
1474        # If the stamp is missing, it's not current
1475        if not os.access(stampfile, os.F_OK):
1476            logger.debug2("Stampfile %s not available", stampfile)
1477            return False
1478        # If it's a 'nostamp' task, it's not current
1479        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1480        if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
1481            logger.debug2("%s.%s is nostamp\n", fn, taskname)
1482            return False
1483
1484        if taskname.endswith("_setscene"):
1485            return True
1486
1487        if cache is None:
1488            cache = {}
1489
1490        iscurrent = True
1491        t1 = get_timestamp(stampfile)
1492        for dep in self.rqdata.runtaskentries[tid].depends:
1493            if iscurrent:
1494                (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep)
1495                stampfile2 = bb.parse.siggen.stampfile_mcfn(taskname2, taskfn2)
1496                stampfile3 = bb.parse.siggen.stampfile_mcfn(taskname2 + "_setscene", taskfn2)
1497                t2 = get_timestamp(stampfile2)
1498                t3 = get_timestamp(stampfile3)
1499                if t3 and not t2:
1500                    continue
1501                if t3 and t3 > t2:
1502                    continue
1503                if fn == fn2:
1504                    if not t2:
1505                        logger.debug2('Stampfile %s does not exist', stampfile2)
1506                        iscurrent = False
1507                        break
1508                    if t1 < t2:
1509                        logger.debug2('Stampfile %s < %s', stampfile, stampfile2)
1510                        iscurrent = False
1511                        break
1512                    if recurse and iscurrent:
1513                        if dep in cache:
1514                            iscurrent = cache[dep]
1515                            if not iscurrent:
1516                                logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1517                        else:
1518                            iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1519                            cache[dep] = iscurrent
1520        if recurse:
1521            cache[tid] = iscurrent
1522        return iscurrent
1523
1524    def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True):
1525        valid = set()
1526        if self.hashvalidate:
1527            sq_data = {}
1528            sq_data['hash'] = {}
1529            sq_data['hashfn'] = {}
1530            sq_data['unihash'] = {}
1531            for tid in tocheck:
1532                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1533                sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash
1534                sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn]
1535                sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash
1536
1537            valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary)
1538
1539        return valid
1540
1541    def validate_hash(self, sq_data, d, siginfo, currentcount, summary):
1542        locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary}
1543
1544        # Metadata has **kwargs so args can be added, sq_data can also gain new fields
1545        call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)"
1546
1547        return bb.utils.better_eval(call, locs)
1548
1549    def _execute_runqueue(self):
1550        """
1551        Run the tasks in a queue prepared by rqdata.prepare()
1552        Upon failure, optionally try to recover the build using any alternate providers
1553        (if the halt on failure configuration option isn't set)
1554        """
1555
1556        retval = True
1557        bb.event.check_for_interrupts(self.cooker.data)
1558
1559        if self.state is runQueuePrepare:
1560            # NOTE: if you add, remove or significantly refactor the stages of this
1561            # process then you should recalculate the weightings here. This is quite
1562            # easy to do - just change the next line temporarily to pass debug=True as
1563            # the last parameter and you'll get a printout of the weightings as well
1564            # as a map to the lines where next_stage() was called. Of course this isn't
1565            # critical, but it helps to keep the progress reporting accurate.
1566            self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data,
1567                                                            "Initialising tasks",
1568                                                            [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244])
1569            if self.rqdata.prepare() == 0:
1570                self.state = runQueueComplete
1571            else:
1572                self.state = runQueueSceneInit
1573                bb.parse.siggen.save_unitaskhashes()
1574
1575        if self.state is runQueueSceneInit:
1576            self.rqdata.init_progress_reporter.next_stage()
1577
1578            # we are ready to run,  emit dependency info to any UI or class which
1579            # needs it
1580            depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1581            self.rqdata.init_progress_reporter.next_stage()
1582            bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1583
1584            if not self.dm_event_handler_registered:
1585                 res = bb.event.register(self.dm_event_handler_name,
1586                                         lambda x, y: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False,
1587                                         ('bb.event.HeartbeatEvent',), data=self.cfgData)
1588                 self.dm_event_handler_registered = True
1589
1590            self.rqdata.init_progress_reporter.next_stage()
1591            self.rqexe = RunQueueExecute(self)
1592
1593            dumpsigs = self.cooker.configuration.dump_signatures
1594            if dumpsigs:
1595                self.rqdata.init_progress_reporter.finish()
1596                if 'printdiff' in dumpsigs:
1597                    self.invalidtasks_dump = self.print_diffscenetasks()
1598                self.state = runQueueDumpSigs
1599
1600        if self.state is runQueueDumpSigs:
1601            dumpsigs = self.cooker.configuration.dump_signatures
1602            retval = self.dump_signatures(dumpsigs)
1603            if retval is False:
1604                if 'printdiff' in dumpsigs:
1605                    self.write_diffscenetasks(self.invalidtasks_dump)
1606                self.state = runQueueComplete
1607
1608        if self.state is runQueueSceneInit:
1609            self.start_worker(self.rqexe)
1610            self.rqdata.init_progress_reporter.finish()
1611
1612            # If we don't have any setscene functions, skip execution
1613            if not self.rqdata.runq_setscene_tids:
1614                logger.info('No setscene tasks')
1615                for tid in self.rqdata.runtaskentries:
1616                    if not self.rqdata.runtaskentries[tid].depends:
1617                        self.rqexe.setbuildable(tid)
1618                    self.rqexe.tasks_notcovered.add(tid)
1619                self.rqexe.sqdone = True
1620            logger.info('Executing Tasks')
1621            self.state = runQueueRunning
1622
1623        if self.state is runQueueRunning:
1624            retval = self.rqexe.execute()
1625
1626        if self.state is runQueueCleanUp:
1627            retval = self.rqexe.finish()
1628
1629        build_done = self.state is runQueueComplete or self.state is runQueueFailed
1630
1631        if build_done and self.dm_event_handler_registered:
1632            bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData)
1633            self.dm_event_handler_registered = False
1634
1635        if build_done and self.rqexe:
1636            bb.parse.siggen.save_unitaskhashes()
1637            self.teardown_workers()
1638            if self.rqexe:
1639                if self.rqexe.stats.failed:
1640                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1641                else:
1642                    # Let's avoid the word "failed" if nothing actually did
1643                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1644
1645        if self.state is runQueueFailed:
1646            raise bb.runqueue.TaskFailure(self.rqexe.failed_tids)
1647
1648        if self.state is runQueueComplete:
1649            # All done
1650            return False
1651
1652        # Loop
1653        return retval
1654
1655    def execute_runqueue(self):
1656        # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1657        try:
1658            return self._execute_runqueue()
1659        except bb.runqueue.TaskFailure:
1660            raise
1661        except SystemExit:
1662            raise
1663        except bb.BBHandledException:
1664            try:
1665                self.teardown_workers()
1666            except:
1667                pass
1668            self.state = runQueueComplete
1669            raise
1670        except Exception as err:
1671            logger.exception("An uncaught exception occurred in runqueue")
1672            try:
1673                self.teardown_workers()
1674            except:
1675                pass
1676            self.state = runQueueComplete
1677            raise
1678
1679    def finish_runqueue(self, now = False):
1680        if not self.rqexe:
1681            self.state = runQueueComplete
1682            return
1683
1684        if now:
1685            self.rqexe.finish_now()
1686        else:
1687            self.rqexe.finish()
1688
1689    def _rq_dump_sigtid(self, tids):
1690        for tid in tids:
1691            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1692            dataCaches = self.rqdata.dataCaches
1693            bb.parse.siggen.dump_sigtask(taskfn, taskname, dataCaches[mc].stamp[taskfn], True)
1694
1695    def dump_signatures(self, options):
1696        if not hasattr(self, "dumpsigs_launched"):
1697            if bb.cooker.CookerFeatures.RECIPE_SIGGEN_INFO not in self.cooker.featureset:
1698                bb.fatal("The dump signatures functionality needs the RECIPE_SIGGEN_INFO feature enabled")
1699
1700            bb.note("Writing task signature files")
1701
1702            max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1)
1703            def chunkify(l, n):
1704                return [l[i::n] for i in range(n)]
1705            dumpsigs_tids = chunkify(list(self.rqdata.runtaskentries), max_process)
1706
1707            # We cannot use the real multiprocessing.Pool easily due to some local data
1708            # that can't be pickled. This is a cheap multi-process solution.
1709            self.dumpsigs_launched = []
1710
1711            for tids in dumpsigs_tids:
1712                p = Process(target=self._rq_dump_sigtid, args=(tids, ))
1713                p.start()
1714                self.dumpsigs_launched.append(p)
1715
1716            return 1.0
1717
1718        for q in self.dumpsigs_launched:
1719            # The finished processes are joined when calling is_alive()
1720            if not q.is_alive():
1721                self.dumpsigs_launched.remove(q)
1722
1723        if self.dumpsigs_launched:
1724            return 1.0
1725
1726        for p in self.dumpsigs_launched:
1727                p.join()
1728
1729        bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options)
1730
1731        return False
1732
1733    def print_diffscenetasks(self):
1734        def get_root_invalid_tasks(task, taskdepends, valid, noexec, visited_invalid):
1735            invalidtasks = []
1736            for t in taskdepends[task].depends:
1737                if t not in valid and t not in visited_invalid:
1738                    invalidtasks.extend(get_root_invalid_tasks(t, taskdepends, valid, noexec, visited_invalid))
1739                    visited_invalid.add(t)
1740
1741            direct_invalid = [t for t in taskdepends[task].depends if t not in valid]
1742            if not direct_invalid and task not in noexec:
1743                invalidtasks = [task]
1744            return invalidtasks
1745
1746        noexec = []
1747        tocheck = set()
1748
1749        for tid in self.rqdata.runtaskentries:
1750            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1751            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1752
1753            if 'noexec' in taskdep and taskname in taskdep['noexec']:
1754                noexec.append(tid)
1755                continue
1756
1757            tocheck.add(tid)
1758
1759        valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False)
1760
1761        # Tasks which are both setscene and noexec never care about dependencies
1762        # We therefore find tasks which are setscene and noexec and mark their
1763        # unique dependencies as valid.
1764        for tid in noexec:
1765            if tid not in self.rqdata.runq_setscene_tids:
1766                continue
1767            for dep in self.rqdata.runtaskentries[tid].depends:
1768                hasnoexecparents = True
1769                for dep2 in self.rqdata.runtaskentries[dep].revdeps:
1770                    if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec:
1771                        continue
1772                    hasnoexecparents = False
1773                    break
1774                if hasnoexecparents:
1775                    valid_new.add(dep)
1776
1777        invalidtasks = set()
1778
1779        toptasks = set(["{}:{}".format(t[3], t[2]) for t in self.rqdata.targets])
1780        for tid in toptasks:
1781            toprocess = set([tid])
1782            while toprocess:
1783                next = set()
1784                visited_invalid = set()
1785                for t in toprocess:
1786                    if t not in valid_new and t not in noexec:
1787                        invalidtasks.update(get_root_invalid_tasks(t, self.rqdata.runtaskentries, valid_new, noexec, visited_invalid))
1788                        continue
1789                    if t in self.rqdata.runq_setscene_tids:
1790                        for dep in self.rqexe.sqdata.sq_deps[t]:
1791                            next.add(dep)
1792                        continue
1793
1794                    for dep in self.rqdata.runtaskentries[t].depends:
1795                        next.add(dep)
1796
1797                toprocess = next
1798
1799        tasklist = []
1800        for tid in invalidtasks:
1801            tasklist.append(tid)
1802
1803        if tasklist:
1804            bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1805
1806        return invalidtasks
1807
1808    def write_diffscenetasks(self, invalidtasks):
1809        bb.siggen.check_siggen_version(bb.siggen)
1810
1811        # Define recursion callback
1812        def recursecb(key, hash1, hash2):
1813            hashes = [hash1, hash2]
1814            bb.debug(1, "Recursively looking for recipe {} hashes {}".format(key, hashes))
1815            hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1816            bb.debug(1, "Found hashfiles:\n{}".format(hashfiles))
1817
1818            recout = []
1819            if len(hashfiles) == 2:
1820                out2 = bb.siggen.compare_sigfiles(hashfiles[hash1]['path'], hashfiles[hash2]['path'], recursecb)
1821                recout.extend(list('    ' + l for l in out2))
1822            else:
1823                recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1824
1825            return recout
1826
1827
1828        for tid in invalidtasks:
1829            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1830            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1831            h = self.rqdata.runtaskentries[tid].unihash
1832            bb.debug(1, "Looking for recipe {} task {}".format(pn, taskname))
1833            matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc])
1834            bb.debug(1, "Found hashfiles:\n{}".format(matches))
1835            match = None
1836            for m in matches.values():
1837                if h in m['path']:
1838                    match = m['path']
1839            if match is None:
1840                bb.fatal("Can't find a task we're supposed to have written out? (hash: %s tid: %s)?" % (h, tid))
1841            matches = {k : v for k, v in iter(matches.items()) if h not in k}
1842            matches_local = {k : v for k, v in iter(matches.items()) if h not in k and not v['sstate']}
1843            if matches_local:
1844                matches = matches_local
1845            if matches:
1846                latestmatch = matches[sorted(matches.keys(), key=lambda h: matches[h]['time'])[-1]]['path']
1847                prevh = __find_sha256__.search(latestmatch).group(0)
1848                output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1849                bb.plain("\nTask %s:%s couldn't be used from the cache because:\n  We need hash %s, most recent matching task was %s\n  " % (pn, taskname, h, prevh) + '\n  '.join(output))
1850
1851
1852class RunQueueExecute:
1853
1854    def __init__(self, rq):
1855        self.rq = rq
1856        self.cooker = rq.cooker
1857        self.cfgData = rq.cfgData
1858        self.rqdata = rq.rqdata
1859
1860        self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1)
1861        self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed"
1862        self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU")
1863        self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO")
1864        self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY")
1865        self.max_loadfactor = self.cfgData.getVar("BB_LOADFACTOR_MAX")
1866
1867        self.sq_buildable = set()
1868        self.sq_running = set()
1869        self.sq_live = set()
1870
1871        self.updated_taskhash_queue = []
1872        self.pending_migrations = set()
1873
1874        self.runq_buildable = set()
1875        self.runq_running = set()
1876        self.runq_complete = set()
1877        self.runq_tasksrun = set()
1878
1879        self.build_stamps = {}
1880        self.build_stamps2 = []
1881        self.failed_tids = []
1882        self.sq_deferred = {}
1883        self.sq_needed_harddeps = set()
1884        self.sq_harddep_deferred = set()
1885
1886        self.stampcache = {}
1887
1888        self.holdoff_tasks = set()
1889        self.holdoff_need_update = True
1890        self.sqdone = False
1891
1892        self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids))
1893
1894        if self.number_tasks <= 0:
1895             bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
1896
1897        lower_limit = 1.0
1898        upper_limit = 1000000.0
1899        if self.max_cpu_pressure:
1900            self.max_cpu_pressure = float(self.max_cpu_pressure)
1901            if self.max_cpu_pressure < lower_limit:
1902                bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit))
1903            if self.max_cpu_pressure > upper_limit:
1904                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure))
1905
1906        if self.max_io_pressure:
1907            self.max_io_pressure = float(self.max_io_pressure)
1908            if self.max_io_pressure < lower_limit:
1909                bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit))
1910            if self.max_io_pressure > upper_limit:
1911                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
1912
1913        if self.max_memory_pressure:
1914            self.max_memory_pressure = float(self.max_memory_pressure)
1915            if self.max_memory_pressure < lower_limit:
1916                bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit))
1917            if self.max_memory_pressure > upper_limit:
1918                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
1919
1920        if self.max_loadfactor:
1921            self.max_loadfactor = float(self.max_loadfactor)
1922            if self.max_loadfactor <= 0:
1923                bb.fatal("Invalid BB_LOADFACTOR_MAX %s, needs to be greater than zero." % (self.max_loadfactor))
1924
1925        # List of setscene tasks which we've covered
1926        self.scenequeue_covered = set()
1927        # List of tasks which are covered (including setscene ones)
1928        self.tasks_covered = set()
1929        self.tasks_scenequeue_done = set()
1930        self.scenequeue_notcovered = set()
1931        self.tasks_notcovered = set()
1932        self.scenequeue_notneeded = set()
1933
1934        schedulers = self.get_schedulers()
1935        for scheduler in schedulers:
1936            if self.scheduler == scheduler.name:
1937                self.sched = scheduler(self, self.rqdata)
1938                logger.debug("Using runqueue scheduler '%s'", scheduler.name)
1939                break
1940        else:
1941            bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
1942                     (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1943
1944        #if self.rqdata.runq_setscene_tids:
1945        self.sqdata = SQData()
1946        build_scenequeue_data(self.sqdata, self.rqdata, self)
1947
1948        update_scenequeue_data(self.sqdata.sq_revdeps, self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=True)
1949
1950        # Compute a list of 'stale' sstate tasks where the current hash does not match the one
1951        # in any stamp files. Pass the list out to metadata as an event.
1952        found = {}
1953        for tid in self.rqdata.runq_setscene_tids:
1954            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1955            stamps = bb.build.find_stale_stamps(taskname, taskfn)
1956            if stamps:
1957                if mc not in found:
1958                    found[mc] = {}
1959                found[mc][tid] = stamps
1960        for mc in found:
1961            event = bb.event.StaleSetSceneTasks(found[mc])
1962            bb.event.fire(event, self.cooker.databuilder.mcdata[mc])
1963
1964        self.build_taskdepdata_cache()
1965
1966    def runqueue_process_waitpid(self, task, status, fakerootlog=None):
1967
1968        # self.build_stamps[pid] may not exist when use shared work directory.
1969        if task in self.build_stamps:
1970            self.build_stamps2.remove(self.build_stamps[task])
1971            del self.build_stamps[task]
1972
1973        if task in self.sq_live:
1974            if status != 0:
1975                self.sq_task_fail(task, status)
1976            else:
1977                self.sq_task_complete(task)
1978            self.sq_live.remove(task)
1979            self.stats.updateActiveSetscene(len(self.sq_live))
1980        else:
1981            if status != 0:
1982                self.task_fail(task, status, fakerootlog=fakerootlog)
1983            else:
1984                self.task_complete(task)
1985        return True
1986
1987    def finish_now(self):
1988        for mc in self.rq.worker:
1989            try:
1990                RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow")
1991                self.rq.worker[mc].process.stdin.flush()
1992            except IOError:
1993                # worker must have died?
1994                pass
1995        for mc in self.rq.fakeworker:
1996            try:
1997                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow")
1998                self.rq.fakeworker[mc].process.stdin.flush()
1999            except IOError:
2000                # worker must have died?
2001                pass
2002
2003        if self.failed_tids:
2004            self.rq.state = runQueueFailed
2005            return
2006
2007        self.rq.state = runQueueComplete
2008        return
2009
2010    def finish(self):
2011        self.rq.state = runQueueCleanUp
2012
2013        active = self.stats.active + len(self.sq_live)
2014        if active > 0:
2015            bb.event.fire(runQueueExitWait(active), self.cfgData)
2016            self.rq.read_workers()
2017            return self.rq.active_fds()
2018
2019        if self.failed_tids:
2020            self.rq.state = runQueueFailed
2021            return True
2022
2023        self.rq.state = runQueueComplete
2024        return True
2025
2026    # Used by setscene only
2027    def check_dependencies(self, task, taskdeps):
2028        if not self.rq.depvalidate:
2029            return False
2030
2031        # Must not edit parent data
2032        taskdeps = set(taskdeps)
2033
2034        taskdata = {}
2035        taskdeps.add(task)
2036        for dep in taskdeps:
2037            (mc, fn, taskname, taskfn) = split_tid_mcfn(dep)
2038            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2039            taskdata[dep] = [pn, taskname, fn]
2040        call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
2041        locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data }
2042        valid = bb.utils.better_eval(call, locs)
2043        return valid
2044
2045    def can_start_task(self):
2046        active = self.stats.active + len(self.sq_live)
2047        can_start = active < self.number_tasks
2048        return can_start
2049
2050    def get_schedulers(self):
2051        schedulers = set(obj for obj in globals().values()
2052                             if type(obj) is type and
2053                                issubclass(obj, RunQueueScheduler))
2054
2055        user_schedulers = self.cfgData.getVar("BB_SCHEDULERS")
2056        if user_schedulers:
2057            for sched in user_schedulers.split():
2058                if not "." in sched:
2059                    bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
2060                    continue
2061
2062                modname, name = sched.rsplit(".", 1)
2063                try:
2064                    module = __import__(modname, fromlist=(name,))
2065                except ImportError as exc:
2066                    bb.fatal("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
2067                else:
2068                    schedulers.add(getattr(module, name))
2069        return schedulers
2070
2071    def setbuildable(self, task):
2072        self.runq_buildable.add(task)
2073        self.sched.newbuildable(task)
2074
2075    def task_completeoutright(self, task):
2076        """
2077        Mark a task as completed
2078        Look at the reverse dependencies and mark any task with
2079        completed dependencies as buildable
2080        """
2081        self.runq_complete.add(task)
2082        for revdep in self.rqdata.runtaskentries[task].revdeps:
2083            if revdep in self.runq_running:
2084                continue
2085            if revdep in self.runq_buildable:
2086                continue
2087            alldeps = True
2088            for dep in self.rqdata.runtaskentries[revdep].depends:
2089                if dep not in self.runq_complete:
2090                    alldeps = False
2091                    break
2092            if alldeps:
2093                self.setbuildable(revdep)
2094                logger.debug("Marking task %s as buildable", revdep)
2095
2096        found = None
2097        for t in sorted(self.sq_deferred.copy()):
2098            if self.sq_deferred[t] == task:
2099                # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task.
2100                # We shouldn't allow all to run at once as it is prone to races.
2101                if not found:
2102                    bb.debug(1, "Deferred task %s now buildable" % t)
2103                    del self.sq_deferred[t]
2104                    update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
2105                    found = t
2106                else:
2107                    bb.debug(1, "Deferring %s after %s" % (t, found))
2108                    self.sq_deferred[t] = found
2109
2110    def task_complete(self, task):
2111        self.stats.taskCompleted()
2112        bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
2113        self.task_completeoutright(task)
2114        self.runq_tasksrun.add(task)
2115
2116    def task_fail(self, task, exitcode, fakerootlog=None):
2117        """
2118        Called when a task has failed
2119        Updates the state engine with the failure
2120        """
2121        self.stats.taskFailed()
2122        self.failed_tids.append(task)
2123
2124        fakeroot_log = []
2125        if fakerootlog and os.path.exists(fakerootlog):
2126            with open(fakerootlog) as fakeroot_log_file:
2127                fakeroot_failed = False
2128                for line in reversed(fakeroot_log_file.readlines()):
2129                    for fakeroot_error in ['mismatch', 'error', 'fatal']:
2130                        if fakeroot_error in line.lower():
2131                            fakeroot_failed = True
2132                    if 'doing new pid setup and server start' in line:
2133                        break
2134                    fakeroot_log.append(line)
2135
2136            if not fakeroot_failed:
2137                fakeroot_log = []
2138
2139        bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData)
2140
2141        if self.rqdata.taskData[''].halt:
2142            self.rq.state = runQueueCleanUp
2143
2144    def task_skip(self, task, reason):
2145        self.runq_running.add(task)
2146        self.setbuildable(task)
2147        bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
2148        self.task_completeoutright(task)
2149        self.stats.taskSkipped()
2150        self.stats.taskCompleted()
2151
2152    def summarise_scenequeue_errors(self):
2153        err = False
2154        if not self.sqdone:
2155            logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
2156            completeevent = sceneQueueComplete(self.stats, self.rq)
2157            bb.event.fire(completeevent, self.cfgData)
2158        if self.sq_deferred:
2159            logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred))
2160            err = True
2161        if self.updated_taskhash_queue:
2162            logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue))
2163            err = True
2164        if self.holdoff_tasks:
2165            logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks))
2166            err = True
2167
2168        for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered):
2169            # No task should end up in both covered and uncovered, that is a bug.
2170            logger.error("Setscene task %s in both covered and notcovered." % tid)
2171
2172        for tid in self.rqdata.runq_setscene_tids:
2173            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2174                err = True
2175                logger.error("Setscene Task %s was never marked as covered or not covered" % tid)
2176            if tid not in self.sq_buildable:
2177                err = True
2178                logger.error("Setscene Task %s was never marked as buildable" % tid)
2179            if tid not in self.sq_running:
2180                err = True
2181                logger.error("Setscene Task %s was never marked as running" % tid)
2182
2183        for x in self.rqdata.runtaskentries:
2184            if x not in self.tasks_covered and x not in self.tasks_notcovered:
2185                logger.error("Task %s was never moved from the setscene queue" % x)
2186                err = True
2187            if x not in self.tasks_scenequeue_done:
2188                logger.error("Task %s was never processed by the setscene code" % x)
2189                err = True
2190            if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable:
2191                logger.error("Task %s was never marked as buildable by the setscene code" % x)
2192                err = True
2193        return err
2194
2195
2196    def execute(self):
2197        """
2198        Run the tasks in a queue prepared by prepare_runqueue
2199        """
2200
2201        self.rq.read_workers()
2202        if self.updated_taskhash_queue or self.pending_migrations:
2203            self.process_possible_migrations()
2204
2205        if not hasattr(self, "sorted_setscene_tids"):
2206            # Don't want to sort this set every execution
2207            self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids)
2208            # Resume looping where we left off when we returned to feed the mainloop
2209            self.setscene_tids_generator = itertools.cycle(self.rqdata.runq_setscene_tids)
2210
2211        task = None
2212        if not self.sqdone and self.can_start_task():
2213            loopcount = 0
2214            # Find the next setscene to run, exit the loop when we've processed all tids or found something to execute
2215            while loopcount < len(self.rqdata.runq_setscene_tids):
2216                loopcount += 1
2217                nexttask = next(self.setscene_tids_generator)
2218                if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values() and nexttask not in self.sq_harddep_deferred:
2219                    if nexttask in self.sq_deferred and self.sq_deferred[nexttask] not in self.runq_complete:
2220                        # Skip deferred tasks quickly before the 'expensive' tests below - this is key to performant multiconfig builds
2221                        continue
2222                    if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and \
2223                            nexttask not in self.sq_needed_harddeps and \
2224                            self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and \
2225                            self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]):
2226                        if nexttask not in self.rqdata.target_tids:
2227                            logger.debug2("Skipping setscene for task %s" % nexttask)
2228                            self.sq_task_skip(nexttask)
2229                            self.scenequeue_notneeded.add(nexttask)
2230                            if nexttask in self.sq_deferred:
2231                                del self.sq_deferred[nexttask]
2232                            return True
2233                    if nexttask in self.sqdata.sq_harddeps_rev and not self.sqdata.sq_harddeps_rev[nexttask].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2234                        logger.debug2("Deferring %s due to hard dependencies" % nexttask)
2235                        updated = False
2236                        for dep in self.sqdata.sq_harddeps_rev[nexttask]:
2237                            if dep not in self.sq_needed_harddeps:
2238                                logger.debug2("Enabling task %s as it is a hard dependency" % dep)
2239                                self.sq_buildable.add(dep)
2240                                self.sq_needed_harddeps.add(dep)
2241                                updated = True
2242                        self.sq_harddep_deferred.add(nexttask)
2243                        if updated:
2244                            return True
2245                        continue
2246                    # If covered tasks are running, need to wait for them to complete
2247                    for t in self.sqdata.sq_covered_tasks[nexttask]:
2248                        if t in self.runq_running and t not in self.runq_complete:
2249                            continue
2250                    if nexttask in self.sq_deferred:
2251                        # Deferred tasks that were still deferred were skipped above so we now need to process
2252                        logger.debug("Task %s no longer deferred" % nexttask)
2253                        del self.sq_deferred[nexttask]
2254                        valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False)
2255                        if not valid:
2256                            logger.debug("%s didn't become valid, skipping setscene" % nexttask)
2257                            self.sq_task_failoutright(nexttask)
2258                            return True
2259                    if nexttask in self.sqdata.outrightfail:
2260                        logger.debug2('No package found, so skipping setscene task %s', nexttask)
2261                        self.sq_task_failoutright(nexttask)
2262                        return True
2263                    if nexttask in self.sqdata.unskippable:
2264                        logger.debug2("Setscene task %s is unskippable" % nexttask)
2265                    task = nexttask
2266                    break
2267        if task is not None:
2268            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2269            taskname = taskname + "_setscene"
2270            if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache):
2271                logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task)
2272                self.sq_task_failoutright(task)
2273                return True
2274
2275            if self.cooker.configuration.force:
2276                if task in self.rqdata.target_tids:
2277                    self.sq_task_failoutright(task)
2278                    return True
2279
2280            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2281                logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task)
2282                self.sq_task_skip(task)
2283                return True
2284
2285            if self.cooker.configuration.skipsetscene:
2286                logger.debug2('No setscene tasks should be executed. Skipping %s', task)
2287                self.sq_task_failoutright(task)
2288                return True
2289
2290            startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
2291            bb.event.fire(startevent, self.cfgData)
2292
2293            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2294            realfn = bb.cache.virtualfn2realfn(taskfn)[0]
2295            runtask = {
2296                'fn' : taskfn,
2297                'task' : task,
2298                'taskname' : taskname,
2299                'taskhash' : self.rqdata.get_task_hash(task),
2300                'unihash' : self.rqdata.get_task_unihash(task),
2301                'quieterrors' : True,
2302                'appends' : self.cooker.collections[mc].get_file_appends(taskfn),
2303                'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2],
2304                'taskdepdata' : self.sq_build_taskdepdata(task),
2305                'dry_run' : False,
2306                'taskdep': taskdep,
2307                'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn],
2308                'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn],
2309                'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn]
2310            }
2311
2312            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
2313                if not mc in self.rq.fakeworker:
2314                    self.rq.start_fakeworker(self, mc)
2315                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
2316                self.rq.fakeworker[mc].process.stdin.flush()
2317            else:
2318                RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
2319                self.rq.worker[mc].process.stdin.flush()
2320
2321            self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2322            self.build_stamps2.append(self.build_stamps[task])
2323            self.sq_running.add(task)
2324            self.sq_live.add(task)
2325            self.stats.updateActiveSetscene(len(self.sq_live))
2326            if self.can_start_task():
2327                return True
2328
2329        self.update_holdofftasks()
2330
2331        if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks:
2332            hashequiv_logger.verbose("Setscene tasks completed")
2333
2334            err = self.summarise_scenequeue_errors()
2335            if err:
2336                self.rq.state = runQueueFailed
2337                return True
2338
2339            if self.cooker.configuration.setsceneonly:
2340                self.rq.state = runQueueComplete
2341                return True
2342            self.sqdone = True
2343
2344            if self.stats.total == 0:
2345                # nothing to do
2346                self.rq.state = runQueueComplete
2347                return True
2348
2349        if self.cooker.configuration.setsceneonly:
2350            task = None
2351        else:
2352            task = self.sched.next()
2353        if task is not None:
2354            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2355
2356            if self.rqdata.setscene_ignore_tasks is not None:
2357                if self.check_setscene_ignore_tasks(task):
2358                    self.task_fail(task, "setscene ignore_tasks")
2359                    return True
2360
2361            if task in self.tasks_covered:
2362                logger.debug2("Setscene covered task %s", task)
2363                self.task_skip(task, "covered")
2364                return True
2365
2366            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2367                logger.debug2("Stamp current task %s", task)
2368
2369                self.task_skip(task, "existing")
2370                self.runq_tasksrun.add(task)
2371                return True
2372
2373            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2374            if 'noexec' in taskdep and taskname in taskdep['noexec']:
2375                startevent = runQueueTaskStarted(task, self.stats, self.rq,
2376                                                 noexec=True)
2377                bb.event.fire(startevent, self.cfgData)
2378                self.runq_running.add(task)
2379                self.stats.taskActive()
2380                if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2381                    bb.build.make_stamp_mcfn(taskname, taskfn)
2382                self.task_complete(task)
2383                return True
2384            else:
2385                startevent = runQueueTaskStarted(task, self.stats, self.rq)
2386                bb.event.fire(startevent, self.cfgData)
2387
2388            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2389            realfn = bb.cache.virtualfn2realfn(taskfn)[0]
2390            runtask = {
2391                'fn' : taskfn,
2392                'task' : task,
2393                'taskname' : taskname,
2394                'taskhash' : self.rqdata.get_task_hash(task),
2395                'unihash' : self.rqdata.get_task_unihash(task),
2396                'quieterrors' : False,
2397                'appends' : self.cooker.collections[mc].get_file_appends(taskfn),
2398                'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2],
2399                'taskdepdata' : self.build_taskdepdata(task),
2400                'dry_run' : self.rqdata.setscene_enforce,
2401                'taskdep': taskdep,
2402                'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn],
2403                'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn],
2404                'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn]
2405            }
2406
2407            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2408                if not mc in self.rq.fakeworker:
2409                    try:
2410                        self.rq.start_fakeworker(self, mc)
2411                    except OSError as exc:
2412                        logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc)))
2413                        self.rq.state = runQueueFailed
2414                        self.stats.taskFailed()
2415                        return True
2416                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
2417                self.rq.fakeworker[mc].process.stdin.flush()
2418            else:
2419                RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
2420                self.rq.worker[mc].process.stdin.flush()
2421
2422            self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2423            self.build_stamps2.append(self.build_stamps[task])
2424            self.runq_running.add(task)
2425            self.stats.taskActive()
2426            if self.can_start_task():
2427                return True
2428
2429        if self.stats.active > 0 or self.sq_live:
2430            self.rq.read_workers()
2431            return self.rq.active_fds()
2432
2433        # No more tasks can be run. If we have deferred setscene tasks we should run them.
2434        if self.sq_deferred:
2435            deferred_tid = list(self.sq_deferred.keys())[0]
2436            blocking_tid = self.sq_deferred.pop(deferred_tid)
2437            logger.warning("Runqueue deadlocked on deferred tasks, forcing task %s blocked by %s" % (deferred_tid, blocking_tid))
2438            return True
2439
2440        if self.failed_tids:
2441            self.rq.state = runQueueFailed
2442            return True
2443
2444        # Sanity Checks
2445        err = self.summarise_scenequeue_errors()
2446        for task in self.rqdata.runtaskentries:
2447            if task not in self.runq_buildable:
2448                logger.error("Task %s never buildable!", task)
2449                err = True
2450            elif task not in self.runq_running:
2451                logger.error("Task %s never ran!", task)
2452                err = True
2453            elif task not in self.runq_complete:
2454                logger.error("Task %s never completed!", task)
2455                err = True
2456
2457        if err:
2458            self.rq.state = runQueueFailed
2459        else:
2460            self.rq.state = runQueueComplete
2461
2462        return True
2463
2464    def filtermcdeps(self, task, mc, deps):
2465        ret = set()
2466        for dep in deps:
2467            thismc = mc_from_tid(dep)
2468            if thismc != mc:
2469                continue
2470            ret.add(dep)
2471        return ret
2472
2473    # Build the individual cache entries in advance once to save time
2474    def build_taskdepdata_cache(self):
2475        taskdepdata_cache = {}
2476        for task in self.rqdata.runtaskentries:
2477            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2478            taskdepdata_cache[task] = bb.TaskData(
2479                pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn],
2480                taskname = taskname,
2481                fn = fn,
2482                deps = self.filtermcdeps(task, mc, self.rqdata.runtaskentries[task].depends),
2483                provides = self.rqdata.dataCaches[mc].fn_provides[taskfn],
2484                taskhash = self.rqdata.runtaskentries[task].hash,
2485                unihash = self.rqdata.runtaskentries[task].unihash,
2486                hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn],
2487                taskhash_deps = self.rqdata.runtaskentries[task].taskhash_deps,
2488            )
2489
2490        self.taskdepdata_cache = taskdepdata_cache
2491
2492    # We filter out multiconfig dependencies from taskdepdata we pass to the tasks
2493    # as most code can't handle them
2494    def build_taskdepdata(self, task):
2495        taskdepdata = {}
2496        mc = mc_from_tid(task)
2497        next = self.rqdata.runtaskentries[task].depends.copy()
2498        next.add(task)
2499        next = self.filtermcdeps(task, mc, next)
2500        while next:
2501            additional = []
2502            for revdep in next:
2503                self.taskdepdata_cache[revdep] = self.taskdepdata_cache[revdep]._replace(
2504                    unihash=self.rqdata.runtaskentries[revdep].unihash
2505                )
2506                taskdepdata[revdep] = self.taskdepdata_cache[revdep]
2507                for revdep2 in self.taskdepdata_cache[revdep].deps:
2508                    if revdep2 not in taskdepdata:
2509                        additional.append(revdep2)
2510            next = additional
2511
2512        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2513        return taskdepdata
2514
2515    def update_holdofftasks(self):
2516
2517        if not self.holdoff_need_update:
2518            return
2519
2520        notcovered = set(self.scenequeue_notcovered)
2521        notcovered |= self.sqdata.cantskip
2522        for tid in self.scenequeue_notcovered:
2523            notcovered |= self.sqdata.sq_covered_tasks[tid]
2524        notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids)
2525        notcovered.intersection_update(self.tasks_scenequeue_done)
2526
2527        covered = set(self.scenequeue_covered)
2528        for tid in self.scenequeue_covered:
2529            covered |= self.sqdata.sq_covered_tasks[tid]
2530        covered.difference_update(notcovered)
2531        covered.intersection_update(self.tasks_scenequeue_done)
2532
2533        for tid in notcovered | covered:
2534            if not self.rqdata.runtaskentries[tid].depends:
2535                self.setbuildable(tid)
2536            elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
2537                 self.setbuildable(tid)
2538
2539        self.tasks_covered = covered
2540        self.tasks_notcovered = notcovered
2541
2542        self.holdoff_tasks = set()
2543
2544        for tid in self.rqdata.runq_setscene_tids:
2545            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2546                self.holdoff_tasks.add(tid)
2547
2548        for tid in self.holdoff_tasks.copy():
2549            for dep in self.sqdata.sq_covered_tasks[tid]:
2550                if dep not in self.runq_complete:
2551                    self.holdoff_tasks.add(dep)
2552
2553        self.holdoff_need_update = False
2554
2555    def process_possible_migrations(self):
2556
2557        changed = set()
2558        toprocess = set()
2559        for tid, unihash in self.updated_taskhash_queue.copy():
2560            if tid in self.runq_running and tid not in self.runq_complete:
2561                continue
2562
2563            self.updated_taskhash_queue.remove((tid, unihash))
2564
2565            if unihash != self.rqdata.runtaskentries[tid].unihash:
2566                # Make sure we rehash any other tasks with the same task hash that we're deferred against.
2567                torehash = [tid]
2568                for deftid in self.sq_deferred:
2569                    if self.sq_deferred[deftid] == tid:
2570                        torehash.append(deftid)
2571                for hashtid in torehash:
2572                    hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash))
2573                    self.rqdata.runtaskentries[hashtid].unihash = unihash
2574                    bb.parse.siggen.set_unihash(hashtid, unihash)
2575                    toprocess.add(hashtid)
2576
2577        # Work out all tasks which depend upon these
2578        total = set()
2579        next = set()
2580        for p in toprocess:
2581            next |= self.rqdata.runtaskentries[p].revdeps
2582        while next:
2583            current = next.copy()
2584            total = total | next
2585            next = set()
2586            for ntid in current:
2587                next |= self.rqdata.runtaskentries[ntid].revdeps
2588            next.difference_update(total)
2589
2590        # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
2591        next = set()
2592        for p in total:
2593            if not self.rqdata.runtaskentries[p].depends:
2594                next.add(p)
2595            elif self.rqdata.runtaskentries[p].depends.isdisjoint(total):
2596                next.add(p)
2597
2598        starttime = time.time()
2599        lasttime = starttime
2600
2601        # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled
2602        while next:
2603            current = next.copy()
2604            next = set()
2605            ready = {}
2606            for tid in current:
2607                if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
2608                    continue
2609                # get_taskhash for a given tid *must* be called before get_unihash* below
2610                ready[tid] = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches)
2611
2612            unihashes = bb.parse.siggen.get_unihashes(ready.keys())
2613
2614            for tid in ready:
2615                orighash = self.rqdata.runtaskentries[tid].hash
2616                newhash = ready[tid]
2617                origuni = self.rqdata.runtaskentries[tid].unihash
2618                newuni = unihashes[tid]
2619
2620                # FIXME, need to check it can come from sstate at all for determinism?
2621                remapped = False
2622                if newuni == origuni:
2623                    # Nothing to do, we match, skip code below
2624                    remapped = True
2625                elif tid in self.scenequeue_covered or tid in self.sq_live:
2626                    # Already ran this setscene task or it running. Report the new taskhash
2627                    bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches)
2628                    hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid))
2629                    remapped = True
2630
2631                if not remapped:
2632                    #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni))
2633                    self.rqdata.runtaskentries[tid].hash = newhash
2634                    self.rqdata.runtaskentries[tid].unihash = newuni
2635                    changed.add(tid)
2636
2637                next |= self.rqdata.runtaskentries[tid].revdeps
2638                total.remove(tid)
2639                next.intersection_update(total)
2640                bb.event.check_for_interrupts(self.cooker.data)
2641
2642                if time.time() > (lasttime + 30):
2643                    lasttime = time.time()
2644                    hashequiv_logger.verbose("Rehash loop slow progress: %s in %s" % (len(total), lasttime - starttime))
2645
2646        endtime = time.time()
2647        if (endtime-starttime > 60):
2648            hashequiv_logger.verbose("Rehash loop took more than 60s: %s" % (endtime-starttime))
2649
2650        if changed:
2651            for mc in self.rq.worker:
2652                RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
2653            for mc in self.rq.fakeworker:
2654                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
2655
2656            hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))
2657
2658        for tid in changed:
2659            if tid not in self.rqdata.runq_setscene_tids:
2660                continue
2661            if tid not in self.pending_migrations:
2662                self.pending_migrations.add(tid)
2663
2664        update_tasks = []
2665        for tid in self.pending_migrations.copy():
2666            if tid in self.runq_running or tid in self.sq_live:
2667                # Too late, task already running, not much we can do now
2668                self.pending_migrations.remove(tid)
2669                continue
2670
2671            valid = True
2672            # Check no tasks this covers are running
2673            for dep in self.sqdata.sq_covered_tasks[tid]:
2674                if dep in self.runq_running and dep not in self.runq_complete:
2675                    hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid))
2676                    valid = False
2677                    break
2678            if not valid:
2679                continue
2680
2681            self.pending_migrations.remove(tid)
2682            changed = True
2683
2684            if tid in self.tasks_scenequeue_done:
2685                self.tasks_scenequeue_done.remove(tid)
2686            for dep in self.sqdata.sq_covered_tasks[tid]:
2687                if dep in self.runq_complete and dep not in self.runq_tasksrun:
2688                    bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep)
2689                    self.failed_tids.append(tid)
2690                    self.rq.state = runQueueCleanUp
2691                    return
2692
2693                if dep not in self.runq_complete:
2694                    if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable:
2695                        self.tasks_scenequeue_done.remove(dep)
2696
2697            if tid in self.sq_buildable:
2698                self.sq_buildable.remove(tid)
2699            if tid in self.sq_running:
2700                self.sq_running.remove(tid)
2701            if tid in self.sqdata.outrightfail:
2702                self.sqdata.outrightfail.remove(tid)
2703            if tid in self.scenequeue_notcovered:
2704                self.scenequeue_notcovered.remove(tid)
2705            if tid in self.scenequeue_covered:
2706                self.scenequeue_covered.remove(tid)
2707            if tid in self.scenequeue_notneeded:
2708                self.scenequeue_notneeded.remove(tid)
2709
2710            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2711            self.sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2712
2713            if tid in self.stampcache:
2714                del self.stampcache[tid]
2715
2716            if tid in self.build_stamps:
2717                del self.build_stamps[tid]
2718
2719            update_tasks.append(tid)
2720
2721        update_tasks2 = []
2722        for tid in update_tasks:
2723            harddepfail = False
2724            for t in self.sqdata.sq_harddeps_rev[tid]:
2725                if t in self.scenequeue_notcovered:
2726                    harddepfail = True
2727                    break
2728            if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2729                if tid not in self.sq_buildable:
2730                    self.sq_buildable.add(tid)
2731            if not self.sqdata.sq_revdeps[tid]:
2732                self.sq_buildable.add(tid)
2733
2734            update_tasks2.append((tid, harddepfail, tid in self.sqdata.valid))
2735
2736        if update_tasks2:
2737            self.sqdone = False
2738            for mc in sorted(self.sqdata.multiconfigs):
2739                for tid in sorted([t[0] for t in update_tasks2]):
2740                    if mc_from_tid(tid) != mc:
2741                        continue
2742                    h = pending_hash_index(tid, self.rqdata)
2743                    if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]:
2744                        self.sq_deferred[tid] = self.sqdata.hashes[h]
2745                        bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h]))
2746            update_scenequeue_data([t[0] for t in update_tasks2], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
2747
2748        for (tid, harddepfail, origvalid) in update_tasks2:
2749            if tid in self.sqdata.valid and not origvalid:
2750                hashequiv_logger.verbose("Setscene task %s became valid" % tid)
2751            if harddepfail:
2752                logger.debug2("%s has an unavailable hard dependency so skipping" % (tid))
2753                self.sq_task_failoutright(tid)
2754
2755        if changed:
2756            self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
2757            self.sq_needed_harddeps = set()
2758            self.sq_harddep_deferred = set()
2759            self.holdoff_need_update = True
2760
2761    def scenequeue_updatecounters(self, task, fail=False):
2762
2763        if fail and task in self.sqdata.sq_harddeps:
2764            for dep in sorted(self.sqdata.sq_harddeps[task]):
2765                if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered:
2766                    # dependency could be already processed, e.g. noexec setscene task
2767                    continue
2768                noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache)
2769                if noexec or stamppresent:
2770                    continue
2771                logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
2772                self.sq_task_failoutright(dep)
2773                continue
2774
2775        # For performance, only compute allcovered once if needed
2776        if self.sqdata.sq_deps[task]:
2777            allcovered = self.scenequeue_covered | self.scenequeue_notcovered
2778        for dep in sorted(self.sqdata.sq_deps[task]):
2779            if self.sqdata.sq_revdeps[dep].issubset(allcovered):
2780                if dep not in self.sq_buildable:
2781                    self.sq_buildable.add(dep)
2782
2783        next = set([task])
2784        while next:
2785            new = set()
2786            for t in sorted(next):
2787                self.tasks_scenequeue_done.add(t)
2788                # Look down the dependency chain for non-setscene things which this task depends on
2789                # and mark as 'done'
2790                for dep in self.rqdata.runtaskentries[t].depends:
2791                    if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done:
2792                        continue
2793                    if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done):
2794                        new.add(dep)
2795            next = new
2796
2797        # If this task was one which other setscene tasks have a hard dependency upon, we need
2798        # to walk through the hard dependencies and allow execution of those which have completed dependencies.
2799        if task in self.sqdata.sq_harddeps:
2800            for dep in self.sq_harddep_deferred.copy():
2801                if self.sqdata.sq_harddeps_rev[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2802                    self.sq_harddep_deferred.remove(dep)
2803
2804        self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
2805        self.holdoff_need_update = True
2806
2807    def sq_task_completeoutright(self, task):
2808        """
2809        Mark a task as completed
2810        Look at the reverse dependencies and mark any task with
2811        completed dependencies as buildable
2812        """
2813
2814        logger.debug('Found task %s which could be accelerated', task)
2815        self.scenequeue_covered.add(task)
2816        self.scenequeue_updatecounters(task)
2817
2818    def sq_check_taskfail(self, task):
2819        if self.rqdata.setscene_ignore_tasks is not None:
2820            realtask = task.split('_setscene')[0]
2821            (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask)
2822            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2823            if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
2824                logger.error('Task %s.%s failed' % (pn, taskname + "_setscene"))
2825                self.rq.state = runQueueCleanUp
2826
2827    def sq_task_complete(self, task):
2828        bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
2829        self.sq_task_completeoutright(task)
2830
2831    def sq_task_fail(self, task, result):
2832        bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
2833        self.scenequeue_notcovered.add(task)
2834        self.scenequeue_updatecounters(task, True)
2835        self.sq_check_taskfail(task)
2836
2837    def sq_task_failoutright(self, task):
2838        self.sq_running.add(task)
2839        self.sq_buildable.add(task)
2840        self.scenequeue_notcovered.add(task)
2841        self.scenequeue_updatecounters(task, True)
2842
2843    def sq_task_skip(self, task):
2844        self.sq_running.add(task)
2845        self.sq_buildable.add(task)
2846        self.sq_task_completeoutright(task)
2847
2848    def sq_build_taskdepdata(self, task):
2849        def getsetscenedeps(tid):
2850            deps = set()
2851            (mc, fn, taskname, _) = split_tid_mcfn(tid)
2852            realtid = tid + "_setscene"
2853            idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends
2854            for (depname, idependtask) in idepends:
2855                if depname not in self.rqdata.taskData[mc].build_targets:
2856                    continue
2857
2858                depfn = self.rqdata.taskData[mc].build_targets[depname][0]
2859                if depfn is None:
2860                     continue
2861                deptid = depfn + ":" + idependtask.replace("_setscene", "")
2862                deps.add(deptid)
2863            return deps
2864
2865        taskdepdata = {}
2866        next = getsetscenedeps(task)
2867        next.add(task)
2868        while next:
2869            additional = []
2870            for revdep in next:
2871                (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2872                deps = getsetscenedeps(revdep)
2873
2874                taskdepdata[revdep] = bb.TaskData(
2875                    pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn],
2876                    taskname = taskname,
2877                    fn = fn,
2878                    deps = deps,
2879                    provides = self.rqdata.dataCaches[mc].fn_provides[taskfn],
2880                    taskhash = self.rqdata.runtaskentries[revdep].hash,
2881                    unihash = self.rqdata.runtaskentries[revdep].unihash,
2882                    hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn],
2883                    taskhash_deps = self.rqdata.runtaskentries[revdep].taskhash_deps,
2884                )
2885                for revdep2 in deps:
2886                    if revdep2 not in taskdepdata:
2887                        additional.append(revdep2)
2888            next = additional
2889
2890        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2891        return taskdepdata
2892
2893    def check_setscene_ignore_tasks(self, tid):
2894        # Check task that is going to run against the ignore tasks list
2895        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2896        # Ignore covered tasks
2897        if tid in self.tasks_covered:
2898            return False
2899        # Ignore stamped tasks
2900        if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache):
2901            return False
2902        # Ignore noexec tasks
2903        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2904        if 'noexec' in taskdep and taskname in taskdep['noexec']:
2905            return False
2906
2907        pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2908        if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
2909            if tid in self.rqdata.runq_setscene_tids:
2910                msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)]
2911            else:
2912                msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)]
2913            for t in self.scenequeue_notcovered:
2914                msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash))
2915            msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered))
2916            logger.error("".join(msg))
2917            return True
2918        return False
2919
2920class SQData(object):
2921    def __init__(self):
2922        # SceneQueue dependencies
2923        self.sq_deps = {}
2924        # SceneQueue reverse dependencies
2925        self.sq_revdeps = {}
2926        # Injected inter-setscene task dependencies
2927        self.sq_harddeps = {}
2928        self.sq_harddeps_rev = {}
2929        # Cache of stamp files so duplicates can't run in parallel
2930        self.stamps = {}
2931        # Setscene tasks directly depended upon by the build
2932        self.unskippable = set()
2933        # List of setscene tasks which aren't present
2934        self.outrightfail = set()
2935        # A list of normal tasks a setscene task covers
2936        self.sq_covered_tasks = {}
2937
2938def build_scenequeue_data(sqdata, rqdata, sqrq):
2939
2940    sq_revdeps = {}
2941    sq_revdeps_squash = {}
2942    sq_collated_deps = {}
2943
2944    # We can't skip specified target tasks which aren't setscene tasks
2945    sqdata.cantskip = set(rqdata.target_tids)
2946    sqdata.cantskip.difference_update(rqdata.runq_setscene_tids)
2947    sqdata.cantskip.intersection_update(rqdata.runtaskentries)
2948
2949    # We need to construct a dependency graph for the setscene functions. Intermediate
2950    # dependencies between the setscene tasks only complicate the code. This code
2951    # therefore aims to collapse the huge runqueue dependency tree into a smaller one
2952    # only containing the setscene functions.
2953
2954    rqdata.init_progress_reporter.next_stage()
2955
2956    # First process the chains up to the first setscene task.
2957    endpoints = {}
2958    for tid in rqdata.runtaskentries:
2959        sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps)
2960        sq_revdeps_squash[tid] = set()
2961        if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids:
2962            #bb.warn("Added endpoint %s" % (tid))
2963            endpoints[tid] = set()
2964
2965    rqdata.init_progress_reporter.next_stage()
2966
2967    # Secondly process the chains between setscene tasks.
2968    for tid in rqdata.runq_setscene_tids:
2969        sq_collated_deps[tid] = set()
2970        #bb.warn("Added endpoint 2 %s" % (tid))
2971        for dep in rqdata.runtaskentries[tid].depends:
2972                if tid in sq_revdeps[dep]:
2973                    sq_revdeps[dep].remove(tid)
2974                if dep not in endpoints:
2975                    endpoints[dep] = set()
2976                #bb.warn("  Added endpoint 3 %s" % (dep))
2977                endpoints[dep].add(tid)
2978
2979    rqdata.init_progress_reporter.next_stage()
2980
2981    def process_endpoints(endpoints):
2982        newendpoints = {}
2983        for point, task in endpoints.items():
2984            tasks = set()
2985            if task:
2986                tasks |= task
2987            if sq_revdeps_squash[point]:
2988                tasks |= sq_revdeps_squash[point]
2989            if point not in rqdata.runq_setscene_tids:
2990                for t in tasks:
2991                    sq_collated_deps[t].add(point)
2992            sq_revdeps_squash[point] = set()
2993            if point in rqdata.runq_setscene_tids:
2994                sq_revdeps_squash[point] = tasks
2995                continue
2996            for dep in rqdata.runtaskentries[point].depends:
2997                if point in sq_revdeps[dep]:
2998                    sq_revdeps[dep].remove(point)
2999                if tasks:
3000                    sq_revdeps_squash[dep] |= tasks
3001                if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids:
3002                    newendpoints[dep] = task
3003        if newendpoints:
3004            process_endpoints(newendpoints)
3005
3006    process_endpoints(endpoints)
3007
3008    rqdata.init_progress_reporter.next_stage()
3009
3010    # Build a list of tasks which are "unskippable"
3011    # These are direct endpoints referenced by the build upto and including setscene tasks
3012    # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon
3013    new = True
3014    for tid in rqdata.runtaskentries:
3015        if not rqdata.runtaskentries[tid].revdeps:
3016            sqdata.unskippable.add(tid)
3017    sqdata.unskippable |= sqdata.cantskip
3018    while new:
3019        new = False
3020        orig = sqdata.unskippable.copy()
3021        for tid in sorted(orig, reverse=True):
3022            if tid in rqdata.runq_setscene_tids:
3023                continue
3024            if not rqdata.runtaskentries[tid].depends:
3025                # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
3026                sqrq.setbuildable(tid)
3027            sqdata.unskippable |= rqdata.runtaskentries[tid].depends
3028            if sqdata.unskippable != orig:
3029                new = True
3030
3031    sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids)
3032
3033    rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries))
3034
3035    # Sanity check all dependencies could be changed to setscene task references
3036    for taskcounter, tid in enumerate(rqdata.runtaskentries):
3037        if tid in rqdata.runq_setscene_tids:
3038            pass
3039        elif sq_revdeps_squash[tid]:
3040            bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.")
3041        else:
3042            del sq_revdeps_squash[tid]
3043        rqdata.init_progress_reporter.update(taskcounter)
3044
3045    rqdata.init_progress_reporter.next_stage()
3046
3047    # Resolve setscene inter-task dependencies
3048    # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
3049    # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
3050    for tid in rqdata.runq_setscene_tids:
3051        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
3052        realtid = tid + "_setscene"
3053        idepends = rqdata.taskData[mc].taskentries[realtid].idepends
3054        sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
3055
3056        sqdata.sq_harddeps_rev[tid] = set()
3057        for (depname, idependtask) in idepends:
3058
3059            if depname not in rqdata.taskData[mc].build_targets:
3060                continue
3061
3062            depfn = rqdata.taskData[mc].build_targets[depname][0]
3063            if depfn is None:
3064                continue
3065            deptid = depfn + ":" + idependtask.replace("_setscene", "")
3066            if deptid not in rqdata.runtaskentries:
3067                bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask))
3068
3069            logger.debug2("Adding hard setscene dependency %s for %s" % (deptid, tid))
3070
3071            if not deptid in sqdata.sq_harddeps:
3072                sqdata.sq_harddeps[deptid] = set()
3073            sqdata.sq_harddeps[deptid].add(tid)
3074            sqdata.sq_harddeps_rev[tid].add(deptid)
3075
3076    rqdata.init_progress_reporter.next_stage()
3077
3078    rqdata.init_progress_reporter.next_stage()
3079
3080    #for tid in sq_revdeps_squash:
3081    #    data = ""
3082    #    for dep in sq_revdeps_squash[tid]:
3083    #        data = data + "\n   %s" % dep
3084    #    bb.warn("Task %s_setscene: is %s " % (tid, data))
3085
3086    sqdata.sq_revdeps = sq_revdeps_squash
3087    sqdata.sq_covered_tasks = sq_collated_deps
3088
3089    # Build reverse version of revdeps to populate deps structure
3090    for tid in sqdata.sq_revdeps:
3091        sqdata.sq_deps[tid] = set()
3092    for tid in sqdata.sq_revdeps:
3093        for dep in sqdata.sq_revdeps[tid]:
3094            sqdata.sq_deps[dep].add(tid)
3095
3096    rqdata.init_progress_reporter.next_stage()
3097
3098    sqdata.multiconfigs = set()
3099    for tid in sqdata.sq_revdeps:
3100        sqdata.multiconfigs.add(mc_from_tid(tid))
3101        if not sqdata.sq_revdeps[tid]:
3102            sqrq.sq_buildable.add(tid)
3103
3104    rqdata.init_progress_reporter.next_stage()
3105
3106    sqdata.noexec = set()
3107    sqdata.stamppresent = set()
3108    sqdata.valid = set()
3109
3110    sqdata.hashes = {}
3111    sqrq.sq_deferred = {}
3112    for mc in sorted(sqdata.multiconfigs):
3113        for tid in sorted(sqdata.sq_revdeps):
3114            if mc_from_tid(tid) != mc:
3115                continue
3116            h = pending_hash_index(tid, rqdata)
3117            if h not in sqdata.hashes:
3118                sqdata.hashes[h] = tid
3119            else:
3120                sqrq.sq_deferred[tid] = sqdata.hashes[h]
3121                bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h]))
3122
3123def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False):
3124
3125    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
3126
3127    taskdep = rqdata.dataCaches[mc].task_deps[taskfn]
3128
3129    if 'noexec' in taskdep and taskname in taskdep['noexec']:
3130        bb.build.make_stamp_mcfn(taskname + "_setscene", taskfn)
3131        return True, False
3132
3133    if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache):
3134        logger.debug2('Setscene stamp current for task %s', tid)
3135        return False, True
3136
3137    if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache):
3138        logger.debug2('Normal stamp current for task %s', tid)
3139        return False, True
3140
3141    return False, False
3142
3143def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True):
3144
3145    tocheck = set()
3146
3147    for tid in sorted(tids):
3148        if tid in sqdata.stamppresent:
3149            sqdata.stamppresent.remove(tid)
3150        if tid in sqdata.valid:
3151            sqdata.valid.remove(tid)
3152        if tid in sqdata.outrightfail:
3153            sqdata.outrightfail.remove(tid)
3154
3155        noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True)
3156
3157        if noexec:
3158            sqdata.noexec.add(tid)
3159            sqrq.sq_task_skip(tid)
3160            logger.debug2("%s is noexec so skipping setscene" % (tid))
3161            continue
3162
3163        if stamppresent:
3164            sqdata.stamppresent.add(tid)
3165            sqrq.sq_task_skip(tid)
3166            logger.debug2("%s has a valid stamp, skipping" % (tid))
3167            continue
3168
3169        tocheck.add(tid)
3170
3171    sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary)
3172
3173    for tid in tids:
3174        if tid in sqdata.stamppresent:
3175            continue
3176        if tid in sqdata.valid:
3177            continue
3178        if tid in sqdata.noexec:
3179            continue
3180        if tid in sqrq.scenequeue_covered:
3181            continue
3182        if tid in sqrq.scenequeue_notcovered:
3183            continue
3184        if tid in sqrq.sq_deferred:
3185            continue
3186        sqdata.outrightfail.add(tid)
3187        logger.debug2("%s already handled (fallthrough), skipping" % (tid))
3188
3189class TaskFailure(Exception):
3190    """
3191    Exception raised when a task in a runqueue fails
3192    """
3193    def __init__(self, x):
3194        self.args = x
3195
3196
3197class runQueueExitWait(bb.event.Event):
3198    """
3199    Event when waiting for task processes to exit
3200    """
3201
3202    def __init__(self, remain):
3203        self.remain = remain
3204        self.message = "Waiting for %s active tasks to finish" % remain
3205        bb.event.Event.__init__(self)
3206
3207class runQueueEvent(bb.event.Event):
3208    """
3209    Base runQueue event class
3210    """
3211    def __init__(self, task, stats, rq):
3212        self.taskid = task
3213        self.taskstring = task
3214        self.taskname = taskname_from_tid(task)
3215        self.taskfile = fn_from_tid(task)
3216        self.taskhash = rq.rqdata.get_task_hash(task)
3217        self.stats = stats.copy()
3218        bb.event.Event.__init__(self)
3219
3220class sceneQueueEvent(runQueueEvent):
3221    """
3222    Base sceneQueue event class
3223    """
3224    def __init__(self, task, stats, rq, noexec=False):
3225        runQueueEvent.__init__(self, task, stats, rq)
3226        self.taskstring = task + "_setscene"
3227        self.taskname = taskname_from_tid(task) + "_setscene"
3228        self.taskfile = fn_from_tid(task)
3229        self.taskhash = rq.rqdata.get_task_hash(task)
3230
3231class runQueueTaskStarted(runQueueEvent):
3232    """
3233    Event notifying a task was started
3234    """
3235    def __init__(self, task, stats, rq, noexec=False):
3236        runQueueEvent.__init__(self, task, stats, rq)
3237        self.noexec = noexec
3238
3239class sceneQueueTaskStarted(sceneQueueEvent):
3240    """
3241    Event notifying a setscene task was started
3242    """
3243    def __init__(self, task, stats, rq, noexec=False):
3244        sceneQueueEvent.__init__(self, task, stats, rq)
3245        self.noexec = noexec
3246
3247class runQueueTaskFailed(runQueueEvent):
3248    """
3249    Event notifying a task failed
3250    """
3251    def __init__(self, task, stats, exitcode, rq, fakeroot_log=None):
3252        runQueueEvent.__init__(self, task, stats, rq)
3253        self.exitcode = exitcode
3254        self.fakeroot_log = fakeroot_log
3255
3256    def __str__(self):
3257        if self.fakeroot_log:
3258            return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log)
3259        else:
3260            return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode)
3261
3262class sceneQueueTaskFailed(sceneQueueEvent):
3263    """
3264    Event notifying a setscene task failed
3265    """
3266    def __init__(self, task, stats, exitcode, rq):
3267        sceneQueueEvent.__init__(self, task, stats, rq)
3268        self.exitcode = exitcode
3269
3270    def __str__(self):
3271        return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode)
3272
3273class sceneQueueComplete(sceneQueueEvent):
3274    """
3275    Event when all the sceneQueue tasks are complete
3276    """
3277    def __init__(self, stats, rq):
3278        self.stats = stats.copy()
3279        bb.event.Event.__init__(self)
3280
3281class runQueueTaskCompleted(runQueueEvent):
3282    """
3283    Event notifying a task completed
3284    """
3285
3286class sceneQueueTaskCompleted(sceneQueueEvent):
3287    """
3288    Event notifying a setscene task completed
3289    """
3290
3291class runQueueTaskSkipped(runQueueEvent):
3292    """
3293    Event notifying a task was skipped
3294    """
3295    def __init__(self, task, stats, rq, reason):
3296        runQueueEvent.__init__(self, task, stats, rq)
3297        self.reason = reason
3298
3299class taskUniHashUpdate(bb.event.Event):
3300    """
3301    Base runQueue event class
3302    """
3303    def __init__(self, task, unihash):
3304        self.taskid = task
3305        self.unihash = unihash
3306        bb.event.Event.__init__(self)
3307
3308class runQueuePipe():
3309    """
3310    Abstraction for a pipe between a worker thread and the server
3311    """
3312    def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None):
3313        self.input = pipein
3314        if pipeout:
3315            pipeout.close()
3316        bb.utils.nonblockingfd(self.input)
3317        self.queue = bytearray()
3318        self.d = d
3319        self.rq = rq
3320        self.rqexec = rqexec
3321        self.fakerootlogs = fakerootlogs
3322
3323    def read(self):
3324        for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]:
3325            for worker in workers.values():
3326                worker.process.poll()
3327                if worker.process.returncode is not None and not self.rq.teardown:
3328                    bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode)))
3329                    self.rq.finish_runqueue(True)
3330
3331        start = len(self.queue)
3332        try:
3333            self.queue.extend(self.input.read(512 * 1024) or b"")
3334        except (OSError, IOError) as e:
3335            if e.errno != errno.EAGAIN:
3336                raise
3337        end = len(self.queue)
3338        found = True
3339        while found and self.queue:
3340            found = False
3341            index = self.queue.find(b"</event>")
3342            while index != -1 and self.queue.startswith(b"<event>"):
3343                try:
3344                    event = pickle.loads(self.queue[7:index])
3345                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3346                    if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e):
3347                        # The pickled data could contain "</event>" so search for the next occurance
3348                        # unpickling again, this should be the only way an unpickle error could occur
3349                        index = self.queue.find(b"</event>", index + 1)
3350                        continue
3351                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
3352                bb.event.fire_from_worker(event, self.d)
3353                if isinstance(event, taskUniHashUpdate):
3354                    self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash))
3355                found = True
3356                self.queue = self.queue[index+8:]
3357                index = self.queue.find(b"</event>")
3358            index = self.queue.find(b"</exitcode>")
3359            while index != -1 and self.queue.startswith(b"<exitcode>"):
3360                try:
3361                    task, status = pickle.loads(self.queue[10:index])
3362                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3363                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
3364                (_, _, _, taskfn) = split_tid_mcfn(task)
3365                fakerootlog = None
3366                if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs:
3367                    fakerootlog = self.fakerootlogs[taskfn]
3368                self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog)
3369                found = True
3370                self.queue = self.queue[index+11:]
3371                index = self.queue.find(b"</exitcode>")
3372        return (end > start)
3373
3374    def close(self):
3375        while self.read():
3376            continue
3377        if self.queue:
3378            print("Warning, worker left partial message: %s" % self.queue)
3379        self.input.close()
3380
3381def get_setscene_enforce_ignore_tasks(d, targets):
3382    if d.getVar('BB_SETSCENE_ENFORCE') != '1':
3383        return None
3384    ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split()
3385    outlist = []
3386    for item in ignore_tasks[:]:
3387        if item.startswith('%:'):
3388            for (mc, target, task, fn) in targets:
3389                outlist.append(target + ':' + item.split(':')[1])
3390        else:
3391            outlist.append(item)
3392    return outlist
3393
3394def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks):
3395    import fnmatch
3396    if ignore_tasks is not None:
3397        item = '%s:%s' % (pn, taskname)
3398        for ignore_tasks in ignore_tasks:
3399            if fnmatch.fnmatch(item, ignore_tasks):
3400                return True
3401        return False
3402    return True
3403