xref: /openbmc/openbmc/poky/bitbake/lib/bb/runqueue.py (revision c9537f57ab488bf5d90132917b0184e2527970a5)
1"""
2BitBake 'RunQueue' implementation
3
4Handles preparation and execution of a queue of tasks
5"""
6
7# Copyright (C) 2006-2007  Richard Purdie
8#
9# SPDX-License-Identifier: GPL-2.0-only
10#
11
12import copy
13import os
14import sys
15import stat
16import errno
17import itertools
18import logging
19import re
20import bb
21from bb import msg, event
22from bb import monitordisk
23import subprocess
24import pickle
25from multiprocessing import Process
26import shlex
27import pprint
28import time
29
30bblogger = logging.getLogger("BitBake")
31logger = logging.getLogger("BitBake.RunQueue")
32hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv")
33
34__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' )
35
36def fn_from_tid(tid):
37     return tid.rsplit(":", 1)[0]
38
39def taskname_from_tid(tid):
40    return tid.rsplit(":", 1)[1]
41
42def mc_from_tid(tid):
43    if tid.startswith('mc:') and tid.count(':') >= 2:
44        return tid.split(':')[1]
45    return ""
46
47def split_tid(tid):
48    (mc, fn, taskname, _) = split_tid_mcfn(tid)
49    return (mc, fn, taskname)
50
51def split_mc(n):
52    if n.startswith("mc:") and n.count(':') >= 2:
53        _, mc, n = n.split(":", 2)
54        return (mc, n)
55    return ('', n)
56
57def split_tid_mcfn(tid):
58    if tid.startswith('mc:') and tid.count(':') >= 2:
59        elems = tid.split(':')
60        mc = elems[1]
61        fn = ":".join(elems[2:-1])
62        taskname = elems[-1]
63        mcfn = "mc:" + mc + ":" + fn
64    else:
65        tid = tid.rsplit(":", 1)
66        mc = ""
67        fn = tid[0]
68        taskname = tid[1]
69        mcfn = fn
70
71    return (mc, fn, taskname, mcfn)
72
73def build_tid(mc, fn, taskname):
74    if mc:
75        return "mc:" + mc + ":" + fn + ":" + taskname
76    return fn + ":" + taskname
77
78# Index used to pair up potentially matching multiconfig tasks
79# We match on PN, taskname and hash being equal
80def pending_hash_index(tid, rqdata):
81    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
82    pn = rqdata.dataCaches[mc].pkg_fn[taskfn]
83    h = rqdata.runtaskentries[tid].unihash
84    return pn + ":" + "taskname" + h
85
86class RunQueueStats:
87    """
88    Holds statistics on the tasks handled by the associated runQueue
89    """
90    def __init__(self, total, setscene_total):
91        self.completed = 0
92        self.skipped = 0
93        self.failed = 0
94        self.active = 0
95        self.setscene_active = 0
96        self.setscene_covered = 0
97        self.setscene_notcovered = 0
98        self.setscene_total = setscene_total
99        self.total = total
100
101    def copy(self):
102        obj = self.__class__(self.total, self.setscene_total)
103        obj.__dict__.update(self.__dict__)
104        return obj
105
106    def taskFailed(self):
107        self.active = self.active - 1
108        self.failed = self.failed + 1
109
110    def taskCompleted(self):
111        self.active = self.active - 1
112        self.completed = self.completed + 1
113
114    def taskSkipped(self):
115        self.active = self.active + 1
116        self.skipped = self.skipped + 1
117
118    def taskActive(self):
119        self.active = self.active + 1
120
121    def updateCovered(self, covered, notcovered):
122        self.setscene_covered = covered
123        self.setscene_notcovered = notcovered
124
125    def updateActiveSetscene(self, active):
126        self.setscene_active = active
127
128# These values indicate the next step due to be run in the
129# runQueue state machine
130runQueuePrepare = 2
131runQueueSceneInit = 3
132runQueueDumpSigs = 4
133runQueueRunning = 6
134runQueueFailed = 7
135runQueueCleanUp = 8
136runQueueComplete = 9
137
138class RunQueueScheduler(object):
139    """
140    Control the order tasks are scheduled in.
141    """
142    name = "basic"
143
144    def __init__(self, runqueue, rqdata):
145        """
146        The default scheduler just returns the first buildable task (the
147        priority map is sorted by task number)
148        """
149        self.rq = runqueue
150        self.rqdata = rqdata
151        self.numTasks = len(self.rqdata.runtaskentries)
152
153        self.prio_map = [self.rqdata.runtaskentries.keys()]
154
155        self.buildable = set()
156        self.skip_maxthread = {}
157        self.stamps = {}
158        for tid in self.rqdata.runtaskentries:
159            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
160            self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
161            if tid in self.rq.runq_buildable:
162                self.buildable.add(tid)
163
164        self.rev_prio_map = None
165        self.is_pressure_usable()
166
167    def is_pressure_usable(self):
168        """
169        If monitoring pressure, return True if pressure files can be open and read. For example
170        openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported)
171        is returned.
172        """
173        if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure:
174            try:
175                with open("/proc/pressure/cpu") as cpu_pressure_fds, \
176                    open("/proc/pressure/io") as io_pressure_fds, \
177                    open("/proc/pressure/memory") as memory_pressure_fds:
178
179                    self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
180                    self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
181                    self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
182                    self.prev_pressure_time = time.time()
183                self.check_pressure = True
184            except:
185                bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure")
186                self.check_pressure = False
187        else:
188            self.check_pressure = False
189
190    def exceeds_max_pressure(self):
191        """
192        Monitor the difference in total pressure at least once per second, if
193        BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold.
194        """
195        if self.check_pressure:
196            with open("/proc/pressure/cpu") as cpu_pressure_fds, \
197                open("/proc/pressure/io") as io_pressure_fds, \
198                open("/proc/pressure/memory") as memory_pressure_fds:
199                # extract "total" from /proc/pressure/{cpu|io}
200                curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
201                curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
202                curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
203                now = time.time()
204                tdiff = now - self.prev_pressure_time
205                psi_accumulation_interval = 1.0
206                cpu_pressure = (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) / tdiff
207                io_pressure = (float(curr_io_pressure) - float(self.prev_io_pressure)) / tdiff
208                memory_pressure = (float(curr_memory_pressure) - float(self.prev_memory_pressure)) / tdiff
209                exceeds_cpu_pressure =  self.rq.max_cpu_pressure and cpu_pressure > self.rq.max_cpu_pressure
210                exceeds_io_pressure =  self.rq.max_io_pressure and io_pressure > self.rq.max_io_pressure
211                exceeds_memory_pressure =  self.rq.max_memory_pressure and memory_pressure > self.rq.max_memory_pressure
212
213                if tdiff > psi_accumulation_interval:
214                    self.prev_cpu_pressure = curr_cpu_pressure
215                    self.prev_io_pressure = curr_io_pressure
216                    self.prev_memory_pressure = curr_memory_pressure
217                    self.prev_pressure_time = now
218
219            pressure_state = (exceeds_cpu_pressure, exceeds_io_pressure, exceeds_memory_pressure)
220            pressure_values = (round(cpu_pressure,1), self.rq.max_cpu_pressure, round(io_pressure,1), self.rq.max_io_pressure, round(memory_pressure,1), self.rq.max_memory_pressure)
221            if hasattr(self, "pressure_state") and pressure_state != self.pressure_state:
222                bb.note("Pressure status changed to CPU: %s, IO: %s, Mem: %s (CPU: %s/%s, IO: %s/%s, Mem: %s/%s) - using %s/%s bitbake threads" % (pressure_state + pressure_values + (len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks)))
223            self.pressure_state = pressure_state
224            return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure)
225        elif self.rq.max_loadfactor:
226            limit = False
227            loadfactor = float(os.getloadavg()[0]) / os.cpu_count()
228            # bb.warn("Comparing %s to %s" % (loadfactor, self.rq.max_loadfactor))
229            if loadfactor > self.rq.max_loadfactor:
230                limit = True
231            if hasattr(self, "loadfactor_limit") and limit != self.loadfactor_limit:
232                bb.note("Load average limiting set to %s as load average: %s - using %s/%s bitbake threads" % (limit, loadfactor, len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks))
233            self.loadfactor_limit = limit
234            return limit
235        return False
236
237    def next_buildable_task(self):
238        """
239        Return the id of the first task we find that is buildable
240        """
241        # Once tasks are running we don't need to worry about them again
242        self.buildable.difference_update(self.rq.runq_running)
243        buildable = set(self.buildable)
244        buildable.difference_update(self.rq.holdoff_tasks)
245        buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered)
246        if not buildable:
247            return None
248
249        # Bitbake requires that at least one task be active. Only check for pressure if
250        # this is the case, otherwise the pressure limitation could result in no tasks
251        # being active and no new tasks started thereby, at times, breaking the scheduler.
252        if self.rq.stats.active and self.exceeds_max_pressure():
253            return None
254
255        # Filter out tasks that have a max number of threads that have been exceeded
256        skip_buildable = {}
257        for running in self.rq.runq_running.difference(self.rq.runq_complete):
258            rtaskname = taskname_from_tid(running)
259            if rtaskname not in self.skip_maxthread:
260                self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads")
261            if not self.skip_maxthread[rtaskname]:
262                continue
263            if rtaskname in skip_buildable:
264                skip_buildable[rtaskname] += 1
265            else:
266                skip_buildable[rtaskname] = 1
267
268        if len(buildable) == 1:
269            tid = buildable.pop()
270            taskname = taskname_from_tid(tid)
271            if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
272                return None
273            stamp = self.stamps[tid]
274            if stamp not in self.rq.build_stamps.values():
275                return tid
276
277        if not self.rev_prio_map:
278            self.rev_prio_map = {}
279            for tid in self.rqdata.runtaskentries:
280                self.rev_prio_map[tid] = self.prio_map.index(tid)
281
282        best = None
283        bestprio = None
284        for tid in buildable:
285            prio = self.rev_prio_map[tid]
286            if bestprio is None or bestprio > prio:
287                taskname = taskname_from_tid(tid)
288                if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
289                    continue
290                stamp = self.stamps[tid]
291                if stamp in self.rq.build_stamps.values():
292                    continue
293                bestprio = prio
294                best = tid
295
296        return best
297
298    def next(self):
299        """
300        Return the id of the task we should build next
301        """
302        if self.rq.can_start_task():
303            return self.next_buildable_task()
304
305    def newbuildable(self, task):
306        self.buildable.add(task)
307
308    def removebuildable(self, task):
309        self.buildable.remove(task)
310
311    def describe_task(self, taskid):
312        result = 'ID %s' % taskid
313        if self.rev_prio_map:
314            result = result + (' pri %d' % self.rev_prio_map[taskid])
315        return result
316
317    def dump_prio(self, comment):
318        bb.debug(3, '%s (most important first):\n%s' %
319                 (comment,
320                  '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for
321                             index, taskid in enumerate(self.prio_map)])))
322
323class RunQueueSchedulerSpeed(RunQueueScheduler):
324    """
325    A scheduler optimised for speed. The priority map is sorted by task weight,
326    heavier weighted tasks (tasks needed by the most other tasks) are run first.
327    """
328    name = "speed"
329
330    def __init__(self, runqueue, rqdata):
331        """
332        The priority map is sorted by task weight.
333        """
334        RunQueueScheduler.__init__(self, runqueue, rqdata)
335
336        weights = {}
337        for tid in self.rqdata.runtaskentries:
338            weight = self.rqdata.runtaskentries[tid].weight
339            if not weight in weights:
340                weights[weight] = []
341            weights[weight].append(tid)
342
343        self.prio_map = []
344        for weight in sorted(weights):
345            for w in weights[weight]:
346                self.prio_map.append(w)
347
348        self.prio_map.reverse()
349
350class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
351    """
352    A scheduler optimised to complete .bb files as quickly as possible. The
353    priority map is sorted by task weight, but then reordered so once a given
354    .bb file starts to build, it's completed as quickly as possible by
355    running all tasks related to the same .bb file one after the after.
356    This works well where disk space is at a premium and classes like OE's
357    rm_work are in force.
358    """
359    name = "completion"
360
361    def __init__(self, runqueue, rqdata):
362        super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata)
363
364        # Extract list of tasks for each recipe, with tasks sorted
365        # ascending from "must run first" (typically do_fetch) to
366        # "runs last" (do_build). The speed scheduler prioritizes
367        # tasks that must run first before the ones that run later;
368        # this is what we depend on here.
369        task_lists = {}
370        for taskid in self.prio_map:
371            fn, taskname = taskid.rsplit(':', 1)
372            task_lists.setdefault(fn, []).append(taskname)
373
374        # Now unify the different task lists. The strategy is that
375        # common tasks get skipped and new ones get inserted after the
376        # preceeding common one(s) as they are found. Because task
377        # lists should differ only by their number of tasks, but not
378        # the ordering of the common tasks, this should result in a
379        # deterministic result that is a superset of the individual
380        # task ordering.
381        all_tasks = []
382        for recipe, new_tasks in task_lists.items():
383            index = 0
384            old_task = all_tasks[index] if index < len(all_tasks) else None
385            for new_task in new_tasks:
386                if old_task == new_task:
387                    # Common task, skip it. This is the fast-path which
388                    # avoids a full search.
389                    index += 1
390                    old_task = all_tasks[index] if index < len(all_tasks) else None
391                else:
392                    try:
393                        index = all_tasks.index(new_task)
394                        # Already present, just not at the current
395                        # place. We re-synchronized by changing the
396                        # index so that it matches again. Now
397                        # move on to the next existing task.
398                        index += 1
399                        old_task = all_tasks[index] if index < len(all_tasks) else None
400                    except ValueError:
401                        # Not present. Insert before old_task, which
402                        # remains the same (but gets shifted back).
403                        all_tasks.insert(index, new_task)
404                        index += 1
405        bb.debug(3, 'merged task list: %s'  % all_tasks)
406
407        # Now reverse the order so that tasks that finish the work on one
408        # recipe are considered more imporant (= come first). The ordering
409        # is now so that do_build is most important.
410        all_tasks.reverse()
411
412        # Group tasks of the same kind before tasks of less important
413        # kinds at the head of the queue (because earlier = lower
414        # priority number = runs earlier), while preserving the
415        # ordering by recipe. If recipe foo is more important than
416        # bar, then the goal is to work on foo's do_populate_sysroot
417        # before bar's do_populate_sysroot and on the more important
418        # tasks of foo before any of the less important tasks in any
419        # other recipe (if those other recipes are more important than
420        # foo).
421        #
422        # All of this only applies when tasks are runable. Explicit
423        # dependencies still override this ordering by priority.
424        #
425        # Here's an example why this priority re-ordering helps with
426        # minimizing disk usage. Consider a recipe foo with a higher
427        # priority than bar where foo DEPENDS on bar. Then the
428        # implicit rule (from base.bbclass) is that foo's do_configure
429        # depends on bar's do_populate_sysroot. This ensures that
430        # bar's do_populate_sysroot gets done first. Normally the
431        # tasks from foo would continue to run once that is done, and
432        # bar only gets completed and cleaned up later. By ordering
433        # bar's task that depend on bar's do_populate_sysroot before foo's
434        # do_configure, that problem gets avoided.
435        task_index = 0
436        self.dump_prio('original priorities')
437        for task in all_tasks:
438            for index in range(task_index, self.numTasks):
439                taskid = self.prio_map[index]
440                taskname = taskid.rsplit(':', 1)[1]
441                if taskname == task:
442                    del self.prio_map[index]
443                    self.prio_map.insert(task_index, taskid)
444                    task_index += 1
445        self.dump_prio('completion priorities')
446
447class RunTaskEntry(object):
448    def __init__(self):
449        self.depends = set()
450        self.revdeps = set()
451        self.hash = None
452        self.unihash = None
453        self.task = None
454        self.weight = 1
455
456class RunQueueData:
457    """
458    BitBake Run Queue implementation
459    """
460    def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets):
461        self.cooker = cooker
462        self.dataCaches = dataCaches
463        self.taskData = taskData
464        self.targets = targets
465        self.rq = rq
466        self.warn_multi_bb = False
467
468        self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split()
469        self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets)
470        self.setscene_ignore_tasks_checked = False
471        self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1")
472        self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter()
473
474        self.reset()
475
476    def reset(self):
477        self.runtaskentries = {}
478
479    def runq_depends_names(self, ids):
480        ret = []
481        for id in ids:
482            nam = os.path.basename(id)
483            nam = re.sub("_[^,]*,", ",", nam)
484            ret.extend([nam])
485        return ret
486
487    def get_task_hash(self, tid):
488        return self.runtaskentries[tid].hash
489
490    def get_task_unihash(self, tid):
491        return self.runtaskentries[tid].unihash
492
493    def get_user_idstring(self, tid, task_name_suffix = ""):
494        return tid + task_name_suffix
495
496    def get_short_user_idstring(self, task, task_name_suffix = ""):
497        (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
498        pn = self.dataCaches[mc].pkg_fn[taskfn]
499        taskname = taskname_from_tid(task) + task_name_suffix
500        return "%s:%s" % (pn, taskname)
501
502    def circular_depchains_handler(self, tasks):
503        """
504        Some tasks aren't buildable, likely due to circular dependency issues.
505        Identify the circular dependencies and print them in a user readable format.
506        """
507        from copy import deepcopy
508
509        valid_chains = []
510        explored_deps = {}
511        msgs = []
512
513        class TooManyLoops(Exception):
514            pass
515
516        def chain_reorder(chain):
517            """
518            Reorder a dependency chain so the lowest task id is first
519            """
520            lowest = 0
521            new_chain = []
522            for entry in range(len(chain)):
523                if chain[entry] < chain[lowest]:
524                    lowest = entry
525            new_chain.extend(chain[lowest:])
526            new_chain.extend(chain[:lowest])
527            return new_chain
528
529        def chain_compare_equal(chain1, chain2):
530            """
531            Compare two dependency chains and see if they're the same
532            """
533            if len(chain1) != len(chain2):
534                return False
535            for index in range(len(chain1)):
536                if chain1[index] != chain2[index]:
537                    return False
538            return True
539
540        def chain_array_contains(chain, chain_array):
541            """
542            Return True if chain_array contains chain
543            """
544            for ch in chain_array:
545                if chain_compare_equal(ch, chain):
546                    return True
547            return False
548
549        def find_chains(tid, prev_chain):
550            prev_chain.append(tid)
551            total_deps = []
552            total_deps.extend(self.runtaskentries[tid].revdeps)
553            for revdep in self.runtaskentries[tid].revdeps:
554                if revdep in prev_chain:
555                    idx = prev_chain.index(revdep)
556                    # To prevent duplicates, reorder the chain to start with the lowest taskid
557                    # and search through an array of those we've already printed
558                    chain = prev_chain[idx:]
559                    new_chain = chain_reorder(chain)
560                    if not chain_array_contains(new_chain, valid_chains):
561                        valid_chains.append(new_chain)
562                        msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
563                        for dep in new_chain:
564                            msgs.append("  Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends)))
565                        msgs.append("\n")
566                    if len(valid_chains) > 10:
567                        msgs.append("Halted dependency loops search after 10 matches.\n")
568                        raise TooManyLoops
569                    continue
570                scan = False
571                if revdep not in explored_deps:
572                    scan = True
573                elif revdep in explored_deps[revdep]:
574                    scan = True
575                else:
576                    for dep in prev_chain:
577                        if dep in explored_deps[revdep]:
578                            scan = True
579                if scan:
580                    find_chains(revdep, copy.deepcopy(prev_chain))
581                for dep in explored_deps[revdep]:
582                    if dep not in total_deps:
583                        total_deps.append(dep)
584
585            explored_deps[tid] = total_deps
586
587        try:
588            for task in tasks:
589                find_chains(task, [])
590        except TooManyLoops:
591            pass
592
593        return msgs
594
595    def calculate_task_weights(self, endpoints):
596        """
597        Calculate a number representing the "weight" of each task. Heavier weighted tasks
598        have more dependencies and hence should be executed sooner for maximum speed.
599
600        This function also sanity checks the task list finding tasks that are not
601        possible to execute due to circular dependencies.
602        """
603
604        numTasks = len(self.runtaskentries)
605        weight = {}
606        deps_left = {}
607        task_done = {}
608
609        for tid in self.runtaskentries:
610            task_done[tid] = False
611            weight[tid] = 1
612            deps_left[tid] = len(self.runtaskentries[tid].revdeps)
613
614        for tid in endpoints:
615            weight[tid] = 10
616            task_done[tid] = True
617
618        while True:
619            next_points = []
620            for tid in endpoints:
621                for revdep in self.runtaskentries[tid].depends:
622                    weight[revdep] = weight[revdep] + weight[tid]
623                    deps_left[revdep] = deps_left[revdep] - 1
624                    if deps_left[revdep] == 0:
625                        next_points.append(revdep)
626                        task_done[revdep] = True
627            endpoints = next_points
628            if not next_points:
629                break
630
631        # Circular dependency sanity check
632        problem_tasks = []
633        for tid in self.runtaskentries:
634            if task_done[tid] is False or deps_left[tid] != 0:
635                problem_tasks.append(tid)
636                logger.debug2("Task %s is not buildable", tid)
637                logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid])
638            self.runtaskentries[tid].weight = weight[tid]
639
640        if problem_tasks:
641            message = "%s unbuildable tasks were found.\n" % len(problem_tasks)
642            message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
643            message = message + "Identifying dependency loops (this may take a short while)...\n"
644            logger.error(message)
645
646            msgs = self.circular_depchains_handler(problem_tasks)
647
648            message = "\n"
649            for msg in msgs:
650                message = message + msg
651            bb.msg.fatal("RunQueue", message)
652
653        return weight
654
655    def prepare(self):
656        """
657        Turn a set of taskData into a RunQueue and compute data needed
658        to optimise the execution order.
659        """
660
661        runq_build = {}
662        recursivetasks = {}
663        recursiveitasks = {}
664        recursivetasksselfref = set()
665
666        taskData = self.taskData
667
668        found = False
669        for mc in self.taskData:
670            if taskData[mc].taskentries:
671                found = True
672                break
673        if not found:
674            # Nothing to do
675            return 0
676
677        bb.parse.siggen.setup_datacache(self.dataCaches)
678
679        self.init_progress_reporter.start()
680        self.init_progress_reporter.next_stage()
681        bb.event.check_for_interrupts(self.cooker.data)
682
683        # Step A - Work out a list of tasks to run
684        #
685        # Taskdata gives us a list of possible providers for every build and run
686        # target ordered by priority. It also gives information on each of those
687        # providers.
688        #
689        # To create the actual list of tasks to execute we fix the list of
690        # providers and then resolve the dependencies into task IDs. This
691        # process is repeated for each type of dependency (tdepends, deptask,
692        # rdeptast, recrdeptask, idepends).
693
694        def add_build_dependencies(depids, tasknames, depends, mc):
695            for depname in depids:
696                # Won't be in build_targets if ASSUME_PROVIDED
697                if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]:
698                    continue
699                depdata = taskData[mc].build_targets[depname][0]
700                if depdata is None:
701                    continue
702                for taskname in tasknames:
703                    t = depdata + ":" + taskname
704                    if t in taskData[mc].taskentries:
705                        depends.add(t)
706
707        def add_runtime_dependencies(depids, tasknames, depends, mc):
708            for depname in depids:
709                if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]:
710                    continue
711                depdata = taskData[mc].run_targets[depname][0]
712                if depdata is None:
713                    continue
714                for taskname in tasknames:
715                    t = depdata + ":" + taskname
716                    if t in taskData[mc].taskentries:
717                        depends.add(t)
718
719        def add_mc_dependencies(mc, tid):
720            mcdeps = taskData[mc].get_mcdepends()
721            for dep in mcdeps:
722                mcdependency = dep.split(':')
723                pn = mcdependency[3]
724                frommc = mcdependency[1]
725                mcdep = mcdependency[2]
726                deptask = mcdependency[4]
727                if mcdep not in taskData:
728                    bb.fatal("Multiconfig '%s' is referenced in multiconfig dependency '%s' but not enabled in BBMULTICONFIG?" % (mcdep, dep))
729                if mc == frommc:
730                    fn = taskData[mcdep].build_targets[pn][0]
731                    newdep = '%s:%s' % (fn,deptask)
732                    if newdep not in taskData[mcdep].taskentries:
733                        bb.fatal("Task mcdepends on non-existent task %s" % (newdep))
734                    taskData[mc].taskentries[tid].tdepends.append(newdep)
735
736        for mc in taskData:
737            for tid in taskData[mc].taskentries:
738
739                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
740                #runtid = build_tid(mc, fn, taskname)
741
742                #logger.debug2("Processing %s,%s:%s", mc, fn, taskname)
743
744                depends = set()
745                task_deps = self.dataCaches[mc].task_deps[taskfn]
746
747                self.runtaskentries[tid] = RunTaskEntry()
748
749                if fn in taskData[mc].failed_fns:
750                    continue
751
752                # We add multiconfig dependencies before processing internal task deps (tdepends)
753                if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']:
754                    add_mc_dependencies(mc, tid)
755
756                # Resolve task internal dependencies
757                #
758                # e.g. addtask before X after Y
759                for t in taskData[mc].taskentries[tid].tdepends:
760                    (depmc, depfn, deptaskname, _) = split_tid_mcfn(t)
761                    depends.add(build_tid(depmc, depfn, deptaskname))
762
763                # Resolve 'deptask' dependencies
764                #
765                # e.g. do_sometask[deptask] = "do_someothertask"
766                # (makes sure sometask runs after someothertask of all DEPENDS)
767                if 'deptask' in task_deps and taskname in task_deps['deptask']:
768                    tasknames = task_deps['deptask'][taskname].split()
769                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
770
771                # Resolve 'rdeptask' dependencies
772                #
773                # e.g. do_sometask[rdeptask] = "do_someothertask"
774                # (makes sure sometask runs after someothertask of all RDEPENDS)
775                if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']:
776                    tasknames = task_deps['rdeptask'][taskname].split()
777                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
778
779                # Resolve inter-task dependencies
780                #
781                # e.g. do_sometask[depends] = "targetname:do_someothertask"
782                # (makes sure sometask runs after targetname's someothertask)
783                idepends = taskData[mc].taskentries[tid].idepends
784                for (depname, idependtask) in idepends:
785                    if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps:
786                        # Won't be in build_targets if ASSUME_PROVIDED
787                        depdata = taskData[mc].build_targets[depname][0]
788                        if depdata is not None:
789                            t = depdata + ":" + idependtask
790                            depends.add(t)
791                            if t not in taskData[mc].taskentries:
792                                bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
793                irdepends = taskData[mc].taskentries[tid].irdepends
794                for (depname, idependtask) in irdepends:
795                    if depname in taskData[mc].run_targets:
796                        # Won't be in run_targets if ASSUME_PROVIDED
797                        if not taskData[mc].run_targets[depname]:
798                            continue
799                        depdata = taskData[mc].run_targets[depname][0]
800                        if depdata is not None:
801                            t = depdata + ":" + idependtask
802                            depends.add(t)
803                            if t not in taskData[mc].taskentries:
804                                bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
805
806                # Resolve recursive 'recrdeptask' dependencies (Part A)
807                #
808                # e.g. do_sometask[recrdeptask] = "do_someothertask"
809                # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
810                # We cover the recursive part of the dependencies below
811                if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']:
812                    tasknames = task_deps['recrdeptask'][taskname].split()
813                    recursivetasks[tid] = tasknames
814                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
815                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
816                    if taskname in tasknames:
817                        recursivetasksselfref.add(tid)
818
819                    if 'recideptask' in task_deps and taskname in task_deps['recideptask']:
820                        recursiveitasks[tid] = []
821                        for t in task_deps['recideptask'][taskname].split():
822                            newdep = build_tid(mc, fn, t)
823                            recursiveitasks[tid].append(newdep)
824
825                self.runtaskentries[tid].depends = depends
826                # Remove all self references
827                self.runtaskentries[tid].depends.discard(tid)
828
829        #self.dump_data()
830
831        self.init_progress_reporter.next_stage()
832        bb.event.check_for_interrupts(self.cooker.data)
833
834        # Resolve recursive 'recrdeptask' dependencies (Part B)
835        #
836        # e.g. do_sometask[recrdeptask] = "do_someothertask"
837        # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
838        # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed
839
840        # Generating/interating recursive lists of dependencies is painful and potentially slow
841        # Precompute recursive task dependencies here by:
842        #     a) create a temp list of reverse dependencies (revdeps)
843        #     b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0)
844        #     c) combine the total list of dependencies in cumulativedeps
845        #     d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower)
846
847
848        revdeps = {}
849        deps = {}
850        cumulativedeps = {}
851        for tid in self.runtaskentries:
852            deps[tid] = set(self.runtaskentries[tid].depends)
853            revdeps[tid] = set()
854            cumulativedeps[tid] = set()
855        # Generate a temp list of reverse dependencies
856        for tid in self.runtaskentries:
857            for dep in self.runtaskentries[tid].depends:
858                revdeps[dep].add(tid)
859        # Find the dependency chain endpoints
860        endpoints = set()
861        for tid in self.runtaskentries:
862            if not deps[tid]:
863                endpoints.add(tid)
864        # Iterate the chains collating dependencies
865        while endpoints:
866            next = set()
867            for tid in endpoints:
868                for dep in revdeps[tid]:
869                    cumulativedeps[dep].add(fn_from_tid(tid))
870                    cumulativedeps[dep].update(cumulativedeps[tid])
871                    if tid in deps[dep]:
872                        deps[dep].remove(tid)
873                    if not deps[dep]:
874                        next.add(dep)
875            endpoints = next
876        #for tid in deps:
877        #    if deps[tid]:
878        #        bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid]))
879
880        # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to
881        # resolve these recursively until we aren't adding any further extra dependencies
882        extradeps = True
883        while extradeps:
884            extradeps = 0
885            for tid in recursivetasks:
886                tasknames = recursivetasks[tid]
887
888                totaldeps = set(self.runtaskentries[tid].depends)
889                if tid in recursiveitasks:
890                    totaldeps.update(recursiveitasks[tid])
891                    for dep in recursiveitasks[tid]:
892                        if dep not in self.runtaskentries:
893                            continue
894                        totaldeps.update(self.runtaskentries[dep].depends)
895
896                deps = set()
897                for dep in totaldeps:
898                    if dep in cumulativedeps:
899                        deps.update(cumulativedeps[dep])
900
901                for t in deps:
902                    for taskname in tasknames:
903                        newtid = t + ":" + taskname
904                        if newtid == tid:
905                            continue
906                        if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends:
907                            extradeps += 1
908                            self.runtaskentries[tid].depends.add(newtid)
909
910                # Handle recursive tasks which depend upon other recursive tasks
911                deps = set()
912                for dep in self.runtaskentries[tid].depends.intersection(recursivetasks):
913                    deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends))
914                for newtid in deps:
915                    for taskname in tasknames:
916                        if not newtid.endswith(":" + taskname):
917                            continue
918                        if newtid in self.runtaskentries:
919                            extradeps += 1
920                            self.runtaskentries[tid].depends.add(newtid)
921
922            bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps)
923
924        # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work
925        for tid in recursivetasksselfref:
926            self.runtaskentries[tid].depends.difference_update(recursivetasksselfref)
927
928        self.init_progress_reporter.next_stage()
929        bb.event.check_for_interrupts(self.cooker.data)
930
931        #self.dump_data()
932
933        # Step B - Mark all active tasks
934        #
935        # Start with the tasks we were asked to run and mark all dependencies
936        # as active too. If the task is to be 'forced', clear its stamp. Once
937        # all active tasks are marked, prune the ones we don't need.
938
939        logger.verbose("Marking Active Tasks")
940
941        def mark_active(tid, depth):
942            """
943            Mark an item as active along with its depends
944            (calls itself recursively)
945            """
946
947            if tid in runq_build:
948                return
949
950            runq_build[tid] = 1
951
952            depends = self.runtaskentries[tid].depends
953            for depend in depends:
954                mark_active(depend, depth+1)
955
956        def invalidate_task(tid, error_nostamp):
957            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
958            taskdep = self.dataCaches[mc].task_deps[taskfn]
959            if fn + ":" + taskname not in taskData[mc].taskentries:
960                logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname)
961            if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
962                if error_nostamp:
963                    bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
964                else:
965                    bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
966            else:
967                logger.verbose("Invalidate task %s, %s", taskname, fn)
968                bb.parse.siggen.invalidate_task(taskname, taskfn)
969
970        self.target_tids = []
971        for (mc, target, task, fn) in self.targets:
972
973            if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]:
974                continue
975
976            if target in taskData[mc].failed_deps:
977                continue
978
979            parents = False
980            if task.endswith('-'):
981                parents = True
982                task = task[:-1]
983
984            if fn in taskData[mc].failed_fns:
985                continue
986
987            # fn already has mc prefix
988            tid = fn + ":" + task
989            self.target_tids.append(tid)
990            if tid not in taskData[mc].taskentries:
991                import difflib
992                tasks = []
993                for x in taskData[mc].taskentries:
994                    if x.startswith(fn + ":"):
995                        tasks.append(taskname_from_tid(x))
996                close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7)
997                if close_matches:
998                    extra = ". Close matches:\n  %s" % "\n  ".join(close_matches)
999                else:
1000                    extra = ""
1001                bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra))
1002
1003            # For tasks called "XXXX-", ony run their dependencies
1004            if parents:
1005                for i in self.runtaskentries[tid].depends:
1006                    mark_active(i, 1)
1007            else:
1008                mark_active(tid, 1)
1009
1010        self.init_progress_reporter.next_stage()
1011        bb.event.check_for_interrupts(self.cooker.data)
1012
1013        # Step C - Prune all inactive tasks
1014        #
1015        # Once all active tasks are marked, prune the ones we don't need.
1016
1017        # Handle --runall
1018        if self.cooker.configuration.runall:
1019            # re-run the mark_active and then drop unused tasks from new list
1020
1021            runall_tids = set()
1022            added = True
1023            while added:
1024                reduced_tasklist = set(self.runtaskentries.keys())
1025                for tid in list(self.runtaskentries.keys()):
1026                    if tid not in runq_build:
1027                       reduced_tasklist.remove(tid)
1028                runq_build = {}
1029
1030                orig = runall_tids
1031                runall_tids = set()
1032                for task in self.cooker.configuration.runall:
1033                    if not task.startswith("do_"):
1034                        task = "do_{0}".format(task)
1035                    for tid in reduced_tasklist:
1036                        wanttid = "{0}:{1}".format(fn_from_tid(tid), task)
1037                        if wanttid in self.runtaskentries:
1038                            runall_tids.add(wanttid)
1039
1040                    for tid in list(runall_tids):
1041                        mark_active(tid, 1)
1042                        self.target_tids.append(tid)
1043                        if self.cooker.configuration.force:
1044                            invalidate_task(tid, False)
1045                added = runall_tids - orig
1046
1047        delcount = set()
1048        for tid in list(self.runtaskentries.keys()):
1049            if tid not in runq_build:
1050                delcount.add(tid)
1051                del self.runtaskentries[tid]
1052
1053        if self.cooker.configuration.runall:
1054            if not self.runtaskentries:
1055                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets)))
1056
1057        self.init_progress_reporter.next_stage()
1058        bb.event.check_for_interrupts(self.cooker.data)
1059
1060        # Handle runonly
1061        if self.cooker.configuration.runonly:
1062            # re-run the mark_active and then drop unused tasks from new list
1063            runq_build = {}
1064
1065            for task in self.cooker.configuration.runonly:
1066                if not task.startswith("do_"):
1067                    task = "do_{0}".format(task)
1068                runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task]
1069
1070                for tid in runonly_tids:
1071                    mark_active(tid, 1)
1072                    if self.cooker.configuration.force:
1073                        invalidate_task(tid, False)
1074
1075            for tid in list(self.runtaskentries.keys()):
1076                if tid not in runq_build:
1077                    delcount.add(tid)
1078                    del self.runtaskentries[tid]
1079
1080            if not self.runtaskentries:
1081                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets)))
1082
1083        #
1084        # Step D - Sanity checks and computation
1085        #
1086
1087        # Check to make sure we still have tasks to run
1088        if not self.runtaskentries:
1089            if not taskData[''].halt:
1090                bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
1091            else:
1092                bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
1093
1094        logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries))
1095
1096        logger.verbose("Assign Weightings")
1097
1098        self.init_progress_reporter.next_stage()
1099        bb.event.check_for_interrupts(self.cooker.data)
1100
1101        # Generate a list of reverse dependencies to ease future calculations
1102        for tid in self.runtaskentries:
1103            for dep in self.runtaskentries[tid].depends:
1104                self.runtaskentries[dep].revdeps.add(tid)
1105
1106        self.init_progress_reporter.next_stage()
1107        bb.event.check_for_interrupts(self.cooker.data)
1108
1109        # Identify tasks at the end of dependency chains
1110        # Error on circular dependency loops (length two)
1111        endpoints = []
1112        for tid in self.runtaskentries:
1113            revdeps = self.runtaskentries[tid].revdeps
1114            if not revdeps:
1115                endpoints.append(tid)
1116            for dep in revdeps:
1117                if dep in self.runtaskentries[tid].depends:
1118                    bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep))
1119
1120
1121        logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
1122
1123        self.init_progress_reporter.next_stage()
1124        bb.event.check_for_interrupts(self.cooker.data)
1125
1126        # Calculate task weights
1127        # Check of higher length circular dependencies
1128        self.runq_weight = self.calculate_task_weights(endpoints)
1129
1130        self.init_progress_reporter.next_stage()
1131        bb.event.check_for_interrupts(self.cooker.data)
1132
1133        # Sanity Check - Check for multiple tasks building the same provider
1134        for mc in self.dataCaches:
1135            prov_list = {}
1136            seen_fn = []
1137            for tid in self.runtaskentries:
1138                (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1139                if taskfn in seen_fn:
1140                    continue
1141                if mc != tidmc:
1142                    continue
1143                seen_fn.append(taskfn)
1144                for prov in self.dataCaches[mc].fn_provides[taskfn]:
1145                    if prov not in prov_list:
1146                        prov_list[prov] = [taskfn]
1147                    elif taskfn not in prov_list[prov]:
1148                        prov_list[prov].append(taskfn)
1149            for prov in prov_list:
1150                if len(prov_list[prov]) < 2:
1151                    continue
1152                if prov in self.multi_provider_allowed:
1153                    continue
1154                seen_pn = []
1155                # If two versions of the same PN are being built its fatal, we don't support it.
1156                for fn in prov_list[prov]:
1157                    pn = self.dataCaches[mc].pkg_fn[fn]
1158                    if pn not in seen_pn:
1159                        seen_pn.append(pn)
1160                    else:
1161                        bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
1162                msgs = ["Multiple .bb files are due to be built which each provide %s:\n  %s" % (prov, "\n  ".join(prov_list[prov]))]
1163                #
1164                # Construct a list of things which uniquely depend on each provider
1165                # since this may help the user figure out which dependency is triggering this warning
1166                #
1167                msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.")
1168                deplist = {}
1169                commondeps = None
1170                for provfn in prov_list[prov]:
1171                    deps = set()
1172                    for tid in self.runtaskentries:
1173                        fn = fn_from_tid(tid)
1174                        if fn != provfn:
1175                            continue
1176                        for dep in self.runtaskentries[tid].revdeps:
1177                            fn = fn_from_tid(dep)
1178                            if fn == provfn:
1179                                continue
1180                            deps.add(dep)
1181                    if not commondeps:
1182                        commondeps = set(deps)
1183                    else:
1184                        commondeps &= deps
1185                    deplist[provfn] = deps
1186                for provfn in deplist:
1187                    msgs.append("\n%s has unique dependees:\n  %s" % (provfn, "\n  ".join(deplist[provfn] - commondeps)))
1188                #
1189                # Construct a list of provides and runtime providers for each recipe
1190                # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC)
1191                #
1192                msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.")
1193                provide_results = {}
1194                rprovide_results = {}
1195                commonprovs = None
1196                commonrprovs = None
1197                for provfn in prov_list[prov]:
1198                    provides = set(self.dataCaches[mc].fn_provides[provfn])
1199                    rprovides = set()
1200                    for rprovide in self.dataCaches[mc].rproviders:
1201                        if provfn in self.dataCaches[mc].rproviders[rprovide]:
1202                            rprovides.add(rprovide)
1203                    for package in self.dataCaches[mc].packages:
1204                        if provfn in self.dataCaches[mc].packages[package]:
1205                            rprovides.add(package)
1206                    for package in self.dataCaches[mc].packages_dynamic:
1207                        if provfn in self.dataCaches[mc].packages_dynamic[package]:
1208                            rprovides.add(package)
1209                    if not commonprovs:
1210                        commonprovs = set(provides)
1211                    else:
1212                        commonprovs &= provides
1213                    provide_results[provfn] = provides
1214                    if not commonrprovs:
1215                        commonrprovs = set(rprovides)
1216                    else:
1217                        commonrprovs &= rprovides
1218                    rprovide_results[provfn] = rprovides
1219                #msgs.append("\nCommon provides:\n  %s" % ("\n  ".join(commonprovs)))
1220                #msgs.append("\nCommon rprovides:\n  %s" % ("\n  ".join(commonrprovs)))
1221                for provfn in prov_list[prov]:
1222                    msgs.append("\n%s has unique provides:\n  %s" % (provfn, "\n  ".join(provide_results[provfn] - commonprovs)))
1223                    msgs.append("\n%s has unique rprovides:\n  %s" % (provfn, "\n  ".join(rprovide_results[provfn] - commonrprovs)))
1224
1225                if self.warn_multi_bb:
1226                    logger.verbnote("".join(msgs))
1227                else:
1228                    logger.error("".join(msgs))
1229
1230        self.init_progress_reporter.next_stage()
1231        self.init_progress_reporter.next_stage()
1232        bb.event.check_for_interrupts(self.cooker.data)
1233
1234        # Iterate over the task list looking for tasks with a 'setscene' function
1235        self.runq_setscene_tids = set()
1236        if not self.cooker.configuration.nosetscene:
1237            for tid in self.runtaskentries:
1238                (mc, fn, taskname, _) = split_tid_mcfn(tid)
1239                setscenetid = tid + "_setscene"
1240                if setscenetid not in taskData[mc].taskentries:
1241                    continue
1242                self.runq_setscene_tids.add(tid)
1243
1244        self.init_progress_reporter.next_stage()
1245        bb.event.check_for_interrupts(self.cooker.data)
1246
1247        # Invalidate task if force mode active
1248        if self.cooker.configuration.force:
1249            for tid in self.target_tids:
1250                invalidate_task(tid, False)
1251
1252        # Invalidate task if invalidate mode active
1253        if self.cooker.configuration.invalidate_stamp:
1254            for tid in self.target_tids:
1255                fn = fn_from_tid(tid)
1256                for st in self.cooker.configuration.invalidate_stamp.split(','):
1257                    if not st.startswith("do_"):
1258                        st = "do_%s" % st
1259                    invalidate_task(fn + ":" + st, True)
1260
1261        self.init_progress_reporter.next_stage()
1262        bb.event.check_for_interrupts(self.cooker.data)
1263
1264        # Create and print to the logs a virtual/xxxx -> PN (fn) table
1265        for mc in taskData:
1266            virtmap = taskData[mc].get_providermap(prefix="virtual/")
1267            virtpnmap = {}
1268            for v in virtmap:
1269                virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]]
1270                bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v]))
1271            if hasattr(bb.parse.siggen, "tasks_resolved"):
1272                bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc])
1273
1274        self.init_progress_reporter.next_stage()
1275        bb.event.check_for_interrupts(self.cooker.data)
1276
1277        bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids)
1278
1279        starttime = time.time()
1280        lasttime = starttime
1281
1282        # Iterate over the task list and call into the siggen code
1283        dealtwith = set()
1284        todeal = set(self.runtaskentries)
1285        while todeal:
1286            ready = set()
1287            for tid in todeal.copy():
1288                if not (self.runtaskentries[tid].depends - dealtwith):
1289                    self.runtaskentries[tid].taskhash_deps = bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
1290                    # get_taskhash for a given tid *must* be called before get_unihash* below
1291                    self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
1292                    ready.add(tid)
1293            unihashes = bb.parse.siggen.get_unihashes(ready)
1294            for tid in ready:
1295                dealtwith.add(tid)
1296                todeal.remove(tid)
1297                self.runtaskentries[tid].unihash = unihashes[tid]
1298
1299            bb.event.check_for_interrupts(self.cooker.data)
1300
1301            if time.time() > (lasttime + 30):
1302                lasttime = time.time()
1303                hashequiv_logger.verbose("Initial setup loop progress: %s of %s in %s" % (len(todeal), len(self.runtaskentries), lasttime - starttime))
1304
1305        endtime = time.time()
1306        if (endtime-starttime > 60):
1307            hashequiv_logger.verbose("Initial setup loop took: %s" % (endtime-starttime))
1308
1309        bb.parse.siggen.writeout_file_checksum_cache()
1310
1311        #self.dump_data()
1312        return len(self.runtaskentries)
1313
1314    def dump_data(self):
1315        """
1316        Dump some debug information on the internal data structures
1317        """
1318        logger.debug3("run_tasks:")
1319        for tid in self.runtaskentries:
1320            logger.debug3(" %s: %s   Deps %s RevDeps %s", tid,
1321                         self.runtaskentries[tid].weight,
1322                         self.runtaskentries[tid].depends,
1323                         self.runtaskentries[tid].revdeps)
1324
1325class RunQueueWorker():
1326    def __init__(self, process, pipe):
1327        self.process = process
1328        self.pipe = pipe
1329
1330class RunQueue:
1331    def __init__(self, cooker, cfgData, dataCaches, taskData, targets):
1332
1333        self.cooker = cooker
1334        self.cfgData = cfgData
1335        self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets)
1336
1337        self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None
1338        self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None
1339
1340        self.state = runQueuePrepare
1341
1342        # For disk space monitor
1343        # Invoked at regular time intervals via the bitbake heartbeat event
1344        # while the build is running. We generate a unique name for the handler
1345        # here, just in case that there ever is more than one RunQueue instance,
1346        # start the handler when reaching runQueueSceneInit, and stop it when
1347        # done with the build.
1348        self.dm = monitordisk.diskMonitor(cfgData)
1349        self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self))
1350        self.dm_event_handler_registered = False
1351        self.rqexe = None
1352        self.worker = {}
1353        self.fakeworker = {}
1354
1355    @staticmethod
1356    def send_pickled_data(worker, data, name):
1357        msg = bytearray()
1358        msg.extend(b"<" + name.encode() + b">")
1359        pickled_data = pickle.dumps(data)
1360        msg.extend(len(pickled_data).to_bytes(4, 'big'))
1361        msg.extend(pickled_data)
1362        msg.extend(b"</" + name.encode() + b">")
1363        worker.stdin.write(msg)
1364
1365    def _start_worker(self, mc, fakeroot = False, rqexec = None):
1366        logger.debug("Starting bitbake-worker")
1367        magic = "decafbad"
1368        if self.cooker.configuration.profile:
1369            magic = "decafbadbad"
1370        fakerootlogs = None
1371
1372        workerscript = os.path.realpath(os.path.dirname(__file__) + "/../../bin/bitbake-worker")
1373        if fakeroot:
1374            magic = magic + "beef"
1375            mcdata = self.cooker.databuilder.mcdata[mc]
1376            fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD"))
1377            fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split()
1378            env = os.environ.copy()
1379            for key, value in (var.split('=',1) for var in fakerootenv):
1380                env[key] = value
1381            worker = subprocess.Popen(fakerootcmd + [sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
1382            fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs
1383        else:
1384            worker = subprocess.Popen([sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1385        bb.utils.nonblockingfd(worker.stdout)
1386        workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs)
1387
1388        workerdata = {
1389            "sigdata" : bb.parse.siggen.get_taskdata(),
1390            "logdefaultlevel" : bb.msg.loggerDefaultLogLevel,
1391            "build_verbose_shell" : self.cooker.configuration.build_verbose_shell,
1392            "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout,
1393            "logdefaultdomain" : bb.msg.loggerDefaultDomains,
1394            "prhost" : self.cooker.prhost,
1395            "buildname" : self.cfgData.getVar("BUILDNAME"),
1396            "date" : self.cfgData.getVar("DATE"),
1397            "time" : self.cfgData.getVar("TIME"),
1398            "hashservaddr" : self.cooker.hashservaddr,
1399            "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
1400        }
1401
1402        RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig")
1403        RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata")
1404        RunQueue.send_pickled_data(worker, workerdata, "workerdata")
1405        worker.stdin.flush()
1406
1407        return RunQueueWorker(worker, workerpipe)
1408
1409    def _teardown_worker(self, worker):
1410        if not worker:
1411            return
1412        logger.debug("Teardown for bitbake-worker")
1413        try:
1414           RunQueue.send_pickled_data(worker.process, b"", "quit")
1415           worker.process.stdin.flush()
1416           worker.process.stdin.close()
1417        except IOError:
1418           pass
1419        while worker.process.returncode is None:
1420            worker.pipe.read()
1421            worker.process.poll()
1422        while worker.pipe.read():
1423            continue
1424        worker.pipe.close()
1425
1426    def start_worker(self, rqexec):
1427        if self.worker:
1428            self.teardown_workers()
1429        self.teardown = False
1430        for mc in self.rqdata.dataCaches:
1431            self.worker[mc] = self._start_worker(mc, False, rqexec)
1432
1433    def start_fakeworker(self, rqexec, mc):
1434        if not mc in self.fakeworker:
1435            self.fakeworker[mc] = self._start_worker(mc, True, rqexec)
1436
1437    def teardown_workers(self):
1438        self.teardown = True
1439        for mc in self.worker:
1440            self._teardown_worker(self.worker[mc])
1441        self.worker = {}
1442        for mc in self.fakeworker:
1443            self._teardown_worker(self.fakeworker[mc])
1444        self.fakeworker = {}
1445
1446    def read_workers(self):
1447        for mc in self.worker:
1448            self.worker[mc].pipe.read()
1449        for mc in self.fakeworker:
1450            self.fakeworker[mc].pipe.read()
1451
1452    def active_fds(self):
1453        fds = []
1454        for mc in self.worker:
1455            fds.append(self.worker[mc].pipe.input)
1456        for mc in self.fakeworker:
1457            fds.append(self.fakeworker[mc].pipe.input)
1458        return fds
1459
1460    def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None):
1461        def get_timestamp(f):
1462            try:
1463                if not os.access(f, os.F_OK):
1464                    return None
1465                return os.stat(f)[stat.ST_MTIME]
1466            except:
1467                return None
1468
1469        (mc, fn, tn, taskfn) = split_tid_mcfn(tid)
1470        if taskname is None:
1471            taskname = tn
1472
1473        stampfile = bb.parse.siggen.stampfile_mcfn(taskname, taskfn)
1474
1475        # If the stamp is missing, it's not current
1476        if not os.access(stampfile, os.F_OK):
1477            logger.debug2("Stampfile %s not available", stampfile)
1478            return False
1479        # If it's a 'nostamp' task, it's not current
1480        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1481        if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
1482            logger.debug2("%s.%s is nostamp\n", fn, taskname)
1483            return False
1484
1485        if taskname.endswith("_setscene"):
1486            return True
1487
1488        if cache is None:
1489            cache = {}
1490
1491        iscurrent = True
1492        t1 = get_timestamp(stampfile)
1493        for dep in self.rqdata.runtaskentries[tid].depends:
1494            if iscurrent:
1495                (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep)
1496                stampfile2 = bb.parse.siggen.stampfile_mcfn(taskname2, taskfn2)
1497                stampfile3 = bb.parse.siggen.stampfile_mcfn(taskname2 + "_setscene", taskfn2)
1498                t2 = get_timestamp(stampfile2)
1499                t3 = get_timestamp(stampfile3)
1500                if t3 and not t2:
1501                    continue
1502                if t3 and t3 > t2:
1503                    continue
1504                if fn == fn2:
1505                    if not t2:
1506                        logger.debug2('Stampfile %s does not exist', stampfile2)
1507                        iscurrent = False
1508                        break
1509                    if t1 < t2:
1510                        logger.debug2('Stampfile %s < %s', stampfile, stampfile2)
1511                        iscurrent = False
1512                        break
1513                    if recurse and iscurrent:
1514                        if dep in cache:
1515                            iscurrent = cache[dep]
1516                            if not iscurrent:
1517                                logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1518                        else:
1519                            iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1520                            cache[dep] = iscurrent
1521        if recurse:
1522            cache[tid] = iscurrent
1523        return iscurrent
1524
1525    def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True):
1526        valid = set()
1527        if self.hashvalidate:
1528            sq_data = {}
1529            sq_data['hash'] = {}
1530            sq_data['hashfn'] = {}
1531            sq_data['unihash'] = {}
1532            for tid in tocheck:
1533                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1534                sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash
1535                sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn]
1536                sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash
1537
1538            valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary)
1539
1540        return valid
1541
1542    def validate_hash(self, sq_data, d, siginfo, currentcount, summary):
1543        locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary}
1544
1545        # Metadata has **kwargs so args can be added, sq_data can also gain new fields
1546        call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)"
1547
1548        return bb.utils.better_eval(call, locs)
1549
1550    def _execute_runqueue(self):
1551        """
1552        Run the tasks in a queue prepared by rqdata.prepare()
1553        Upon failure, optionally try to recover the build using any alternate providers
1554        (if the halt on failure configuration option isn't set)
1555        """
1556
1557        retval = True
1558        bb.event.check_for_interrupts(self.cooker.data)
1559
1560        if self.state is runQueuePrepare:
1561            # NOTE: if you add, remove or significantly refactor the stages of this
1562            # process then you should recalculate the weightings here. This is quite
1563            # easy to do - just change the next line temporarily to pass debug=True as
1564            # the last parameter and you'll get a printout of the weightings as well
1565            # as a map to the lines where next_stage() was called. Of course this isn't
1566            # critical, but it helps to keep the progress reporting accurate.
1567            self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data,
1568                                                            "Initialising tasks",
1569                                                            [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244])
1570            if self.rqdata.prepare() == 0:
1571                self.state = runQueueComplete
1572            else:
1573                self.state = runQueueSceneInit
1574                bb.parse.siggen.save_unitaskhashes()
1575
1576        if self.state is runQueueSceneInit:
1577            self.rqdata.init_progress_reporter.next_stage()
1578
1579            # we are ready to run,  emit dependency info to any UI or class which
1580            # needs it
1581            depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1582            self.rqdata.init_progress_reporter.next_stage()
1583            bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1584
1585            if not self.dm_event_handler_registered:
1586                 res = bb.event.register(self.dm_event_handler_name,
1587                                         lambda x, y: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False,
1588                                         ('bb.event.HeartbeatEvent',), data=self.cfgData)
1589                 self.dm_event_handler_registered = True
1590
1591            self.rqdata.init_progress_reporter.next_stage()
1592            self.rqexe = RunQueueExecute(self)
1593
1594            dumpsigs = self.cooker.configuration.dump_signatures
1595            if dumpsigs:
1596                self.rqdata.init_progress_reporter.finish()
1597                if 'printdiff' in dumpsigs:
1598                    self.invalidtasks_dump = self.print_diffscenetasks()
1599                self.state = runQueueDumpSigs
1600
1601        if self.state is runQueueDumpSigs:
1602            dumpsigs = self.cooker.configuration.dump_signatures
1603            retval = self.dump_signatures(dumpsigs)
1604            if retval is False:
1605                if 'printdiff' in dumpsigs:
1606                    self.write_diffscenetasks(self.invalidtasks_dump)
1607                self.state = runQueueComplete
1608
1609        if self.state is runQueueSceneInit:
1610            self.start_worker(self.rqexe)
1611            self.rqdata.init_progress_reporter.finish()
1612
1613            # If we don't have any setscene functions, skip execution
1614            if not self.rqdata.runq_setscene_tids:
1615                logger.info('No setscene tasks')
1616                for tid in self.rqdata.runtaskentries:
1617                    if not self.rqdata.runtaskentries[tid].depends:
1618                        self.rqexe.setbuildable(tid)
1619                    self.rqexe.tasks_notcovered.add(tid)
1620                self.rqexe.sqdone = True
1621            logger.info('Executing Tasks')
1622            self.state = runQueueRunning
1623
1624        if self.state is runQueueRunning:
1625            retval = self.rqexe.execute()
1626
1627        if self.state is runQueueCleanUp:
1628            retval = self.rqexe.finish()
1629
1630        build_done = self.state is runQueueComplete or self.state is runQueueFailed
1631
1632        if build_done and self.dm_event_handler_registered:
1633            bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData)
1634            self.dm_event_handler_registered = False
1635
1636        if build_done and self.rqexe:
1637            bb.parse.siggen.save_unitaskhashes()
1638            self.teardown_workers()
1639            if self.rqexe:
1640                if self.rqexe.stats.failed:
1641                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1642                else:
1643                    # Let's avoid the word "failed" if nothing actually did
1644                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1645
1646        if self.state is runQueueFailed:
1647            raise bb.runqueue.TaskFailure(self.rqexe.failed_tids)
1648
1649        if self.state is runQueueComplete:
1650            # All done
1651            return False
1652
1653        # Loop
1654        return retval
1655
1656    def execute_runqueue(self):
1657        # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1658        try:
1659            return self._execute_runqueue()
1660        except bb.runqueue.TaskFailure:
1661            raise
1662        except SystemExit:
1663            raise
1664        except bb.BBHandledException:
1665            try:
1666                self.teardown_workers()
1667            except:
1668                pass
1669            self.state = runQueueComplete
1670            raise
1671        except Exception as err:
1672            logger.exception("An uncaught exception occurred in runqueue")
1673            try:
1674                self.teardown_workers()
1675            except:
1676                pass
1677            self.state = runQueueComplete
1678            raise
1679
1680    def finish_runqueue(self, now = False):
1681        if not self.rqexe:
1682            self.state = runQueueComplete
1683            return
1684
1685        if now:
1686            self.rqexe.finish_now()
1687        else:
1688            self.rqexe.finish()
1689
1690    def _rq_dump_sigtid(self, tids):
1691        for tid in tids:
1692            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1693            dataCaches = self.rqdata.dataCaches
1694            bb.parse.siggen.dump_sigtask(taskfn, taskname, dataCaches[mc].stamp[taskfn], True)
1695
1696    def dump_signatures(self, options):
1697        if not hasattr(self, "dumpsigs_launched"):
1698            if bb.cooker.CookerFeatures.RECIPE_SIGGEN_INFO not in self.cooker.featureset:
1699                bb.fatal("The dump signatures functionality needs the RECIPE_SIGGEN_INFO feature enabled")
1700
1701            bb.note("Writing task signature files")
1702
1703            max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1)
1704            def chunkify(l, n):
1705                return [l[i::n] for i in range(n)]
1706            dumpsigs_tids = chunkify(list(self.rqdata.runtaskentries), max_process)
1707
1708            # We cannot use the real multiprocessing.Pool easily due to some local data
1709            # that can't be pickled. This is a cheap multi-process solution.
1710            self.dumpsigs_launched = []
1711
1712            for tids in dumpsigs_tids:
1713                p = Process(target=self._rq_dump_sigtid, args=(tids, ))
1714                p.start()
1715                self.dumpsigs_launched.append(p)
1716
1717            return 1.0
1718
1719        for q in self.dumpsigs_launched:
1720            # The finished processes are joined when calling is_alive()
1721            if not q.is_alive():
1722                self.dumpsigs_launched.remove(q)
1723
1724        if self.dumpsigs_launched:
1725            return 1.0
1726
1727        for p in self.dumpsigs_launched:
1728                p.join()
1729
1730        bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options)
1731
1732        return False
1733
1734    def print_diffscenetasks(self):
1735        def get_root_invalid_tasks(task, taskdepends, valid, noexec, visited_invalid):
1736            invalidtasks = []
1737            for t in taskdepends[task].depends:
1738                if t not in valid and t not in visited_invalid:
1739                    invalidtasks.extend(get_root_invalid_tasks(t, taskdepends, valid, noexec, visited_invalid))
1740                    visited_invalid.add(t)
1741
1742            direct_invalid = [t for t in taskdepends[task].depends if t not in valid]
1743            if not direct_invalid and task not in noexec:
1744                invalidtasks = [task]
1745            return invalidtasks
1746
1747        noexec = []
1748        tocheck = set()
1749
1750        for tid in self.rqdata.runtaskentries:
1751            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1752            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1753
1754            if 'noexec' in taskdep and taskname in taskdep['noexec']:
1755                noexec.append(tid)
1756                continue
1757
1758            tocheck.add(tid)
1759
1760        valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False)
1761
1762        # Tasks which are both setscene and noexec never care about dependencies
1763        # We therefore find tasks which are setscene and noexec and mark their
1764        # unique dependencies as valid.
1765        for tid in noexec:
1766            if tid not in self.rqdata.runq_setscene_tids:
1767                continue
1768            for dep in self.rqdata.runtaskentries[tid].depends:
1769                hasnoexecparents = True
1770                for dep2 in self.rqdata.runtaskentries[dep].revdeps:
1771                    if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec:
1772                        continue
1773                    hasnoexecparents = False
1774                    break
1775                if hasnoexecparents:
1776                    valid_new.add(dep)
1777
1778        invalidtasks = set()
1779
1780        toptasks = set(["{}:{}".format(t[3], t[2]) for t in self.rqdata.targets])
1781        for tid in toptasks:
1782            toprocess = set([tid])
1783            while toprocess:
1784                next = set()
1785                visited_invalid = set()
1786                for t in toprocess:
1787                    if t not in valid_new and t not in noexec:
1788                        invalidtasks.update(get_root_invalid_tasks(t, self.rqdata.runtaskentries, valid_new, noexec, visited_invalid))
1789                        continue
1790                    if t in self.rqdata.runq_setscene_tids:
1791                        for dep in self.rqexe.sqdata.sq_deps[t]:
1792                            next.add(dep)
1793                        continue
1794
1795                    for dep in self.rqdata.runtaskentries[t].depends:
1796                        next.add(dep)
1797
1798                toprocess = next
1799
1800        tasklist = []
1801        for tid in invalidtasks:
1802            tasklist.append(tid)
1803
1804        if tasklist:
1805            bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1806
1807        return invalidtasks
1808
1809    def write_diffscenetasks(self, invalidtasks):
1810        bb.siggen.check_siggen_version(bb.siggen)
1811
1812        # Define recursion callback
1813        def recursecb(key, hash1, hash2):
1814            hashes = [hash1, hash2]
1815            bb.debug(1, "Recursively looking for recipe {} hashes {}".format(key, hashes))
1816            hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1817            bb.debug(1, "Found hashfiles:\n{}".format(hashfiles))
1818
1819            recout = []
1820            if len(hashfiles) == 2:
1821                out2 = bb.siggen.compare_sigfiles(hashfiles[hash1]['path'], hashfiles[hash2]['path'], recursecb)
1822                recout.extend(list('    ' + l for l in out2))
1823            else:
1824                recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1825
1826            return recout
1827
1828
1829        for tid in invalidtasks:
1830            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1831            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1832            h = self.rqdata.runtaskentries[tid].unihash
1833            bb.debug(1, "Looking for recipe {} task {}".format(pn, taskname))
1834            matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc])
1835            bb.debug(1, "Found hashfiles:\n{}".format(matches))
1836            match = None
1837            for m in matches.values():
1838                if h in m['path']:
1839                    match = m['path']
1840            if match is None:
1841                bb.fatal("Can't find a task we're supposed to have written out? (hash: %s tid: %s)?" % (h, tid))
1842            matches = {k : v for k, v in iter(matches.items()) if h not in k}
1843            matches_local = {k : v for k, v in iter(matches.items()) if h not in k and not v['sstate']}
1844            if matches_local:
1845                matches = matches_local
1846            if matches:
1847                latestmatch = matches[sorted(matches.keys(), key=lambda h: matches[h]['time'])[-1]]['path']
1848                prevh = __find_sha256__.search(latestmatch).group(0)
1849                output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1850                bb.plain("\nTask %s:%s couldn't be used from the cache because:\n  We need hash %s, most recent matching task was %s\n  " % (pn, taskname, h, prevh) + '\n  '.join(output))
1851
1852
1853class RunQueueExecute:
1854
1855    def __init__(self, rq):
1856        self.rq = rq
1857        self.cooker = rq.cooker
1858        self.cfgData = rq.cfgData
1859        self.rqdata = rq.rqdata
1860
1861        self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1)
1862        self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed"
1863        self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU")
1864        self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO")
1865        self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY")
1866        self.max_loadfactor = self.cfgData.getVar("BB_LOADFACTOR_MAX")
1867
1868        self.sq_buildable = set()
1869        self.sq_running = set()
1870        self.sq_live = set()
1871
1872        self.updated_taskhash_queue = []
1873        self.pending_migrations = set()
1874
1875        self.runq_buildable = set()
1876        self.runq_running = set()
1877        self.runq_complete = set()
1878        self.runq_tasksrun = set()
1879
1880        self.build_stamps = {}
1881        self.build_stamps2 = []
1882        self.failed_tids = []
1883        self.sq_deferred = {}
1884        self.sq_needed_harddeps = set()
1885        self.sq_harddep_deferred = set()
1886
1887        self.stampcache = {}
1888
1889        self.holdoff_tasks = set()
1890        self.holdoff_need_update = True
1891        self.sqdone = False
1892
1893        self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids))
1894
1895        if self.number_tasks <= 0:
1896             bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
1897
1898        lower_limit = 1.0
1899        upper_limit = 1000000.0
1900        if self.max_cpu_pressure:
1901            self.max_cpu_pressure = float(self.max_cpu_pressure)
1902            if self.max_cpu_pressure < lower_limit:
1903                bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit))
1904            if self.max_cpu_pressure > upper_limit:
1905                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure))
1906
1907        if self.max_io_pressure:
1908            self.max_io_pressure = float(self.max_io_pressure)
1909            if self.max_io_pressure < lower_limit:
1910                bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit))
1911            if self.max_io_pressure > upper_limit:
1912                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
1913
1914        if self.max_memory_pressure:
1915            self.max_memory_pressure = float(self.max_memory_pressure)
1916            if self.max_memory_pressure < lower_limit:
1917                bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit))
1918            if self.max_memory_pressure > upper_limit:
1919                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
1920
1921        if self.max_loadfactor:
1922            self.max_loadfactor = float(self.max_loadfactor)
1923            if self.max_loadfactor <= 0:
1924                bb.fatal("Invalid BB_LOADFACTOR_MAX %s, needs to be greater than zero." % (self.max_loadfactor))
1925
1926        # List of setscene tasks which we've covered
1927        self.scenequeue_covered = set()
1928        # List of tasks which are covered (including setscene ones)
1929        self.tasks_covered = set()
1930        self.tasks_scenequeue_done = set()
1931        self.scenequeue_notcovered = set()
1932        self.tasks_notcovered = set()
1933        self.scenequeue_notneeded = set()
1934
1935        schedulers = self.get_schedulers()
1936        for scheduler in schedulers:
1937            if self.scheduler == scheduler.name:
1938                self.sched = scheduler(self, self.rqdata)
1939                logger.debug("Using runqueue scheduler '%s'", scheduler.name)
1940                break
1941        else:
1942            bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
1943                     (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1944
1945        #if self.rqdata.runq_setscene_tids:
1946        self.sqdata = SQData()
1947        build_scenequeue_data(self.sqdata, self.rqdata, self)
1948
1949        update_scenequeue_data(self.sqdata.sq_revdeps, self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=True)
1950
1951        # Compute a list of 'stale' sstate tasks where the current hash does not match the one
1952        # in any stamp files. Pass the list out to metadata as an event.
1953        found = {}
1954        for tid in self.rqdata.runq_setscene_tids:
1955            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1956            stamps = bb.build.find_stale_stamps(taskname, taskfn)
1957            if stamps:
1958                if mc not in found:
1959                    found[mc] = {}
1960                found[mc][tid] = stamps
1961        for mc in found:
1962            event = bb.event.StaleSetSceneTasks(found[mc])
1963            bb.event.fire(event, self.cooker.databuilder.mcdata[mc])
1964
1965        self.build_taskdepdata_cache()
1966
1967    def runqueue_process_waitpid(self, task, status, fakerootlog=None):
1968
1969        # self.build_stamps[pid] may not exist when use shared work directory.
1970        if task in self.build_stamps:
1971            self.build_stamps2.remove(self.build_stamps[task])
1972            del self.build_stamps[task]
1973
1974        if task in self.sq_live:
1975            if status != 0:
1976                self.sq_task_fail(task, status)
1977            else:
1978                self.sq_task_complete(task)
1979            self.sq_live.remove(task)
1980            self.stats.updateActiveSetscene(len(self.sq_live))
1981        else:
1982            if status != 0:
1983                self.task_fail(task, status, fakerootlog=fakerootlog)
1984            else:
1985                self.task_complete(task)
1986        return True
1987
1988    def finish_now(self):
1989        for mc in self.rq.worker:
1990            try:
1991                RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow")
1992                self.rq.worker[mc].process.stdin.flush()
1993            except IOError:
1994                # worker must have died?
1995                pass
1996        for mc in self.rq.fakeworker:
1997            try:
1998                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow")
1999                self.rq.fakeworker[mc].process.stdin.flush()
2000            except IOError:
2001                # worker must have died?
2002                pass
2003
2004        if self.failed_tids:
2005            self.rq.state = runQueueFailed
2006            return
2007
2008        self.rq.state = runQueueComplete
2009        return
2010
2011    def finish(self):
2012        self.rq.state = runQueueCleanUp
2013
2014        active = self.stats.active + len(self.sq_live)
2015        if active > 0:
2016            bb.event.fire(runQueueExitWait(active), self.cfgData)
2017            self.rq.read_workers()
2018            return self.rq.active_fds()
2019
2020        if self.failed_tids:
2021            self.rq.state = runQueueFailed
2022            return True
2023
2024        self.rq.state = runQueueComplete
2025        return True
2026
2027    # Used by setscene only
2028    def check_dependencies(self, task, taskdeps):
2029        if not self.rq.depvalidate:
2030            return False
2031
2032        # Must not edit parent data
2033        taskdeps = set(taskdeps)
2034
2035        taskdata = {}
2036        taskdeps.add(task)
2037        for dep in taskdeps:
2038            (mc, fn, taskname, taskfn) = split_tid_mcfn(dep)
2039            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2040            taskdata[dep] = [pn, taskname, fn]
2041        call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
2042        locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data }
2043        valid = bb.utils.better_eval(call, locs)
2044        return valid
2045
2046    def can_start_task(self):
2047        active = self.stats.active + len(self.sq_live)
2048        can_start = active < self.number_tasks
2049        return can_start
2050
2051    def get_schedulers(self):
2052        schedulers = set(obj for obj in globals().values()
2053                             if type(obj) is type and
2054                                issubclass(obj, RunQueueScheduler))
2055
2056        user_schedulers = self.cfgData.getVar("BB_SCHEDULERS")
2057        if user_schedulers:
2058            for sched in user_schedulers.split():
2059                if not "." in sched:
2060                    bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
2061                    continue
2062
2063                modname, name = sched.rsplit(".", 1)
2064                try:
2065                    module = __import__(modname, fromlist=(name,))
2066                except ImportError as exc:
2067                    bb.fatal("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
2068                else:
2069                    schedulers.add(getattr(module, name))
2070        return schedulers
2071
2072    def setbuildable(self, task):
2073        self.runq_buildable.add(task)
2074        self.sched.newbuildable(task)
2075
2076    def task_completeoutright(self, task):
2077        """
2078        Mark a task as completed
2079        Look at the reverse dependencies and mark any task with
2080        completed dependencies as buildable
2081        """
2082        self.runq_complete.add(task)
2083        for revdep in self.rqdata.runtaskentries[task].revdeps:
2084            if revdep in self.runq_running:
2085                continue
2086            if revdep in self.runq_buildable:
2087                continue
2088            alldeps = True
2089            for dep in self.rqdata.runtaskentries[revdep].depends:
2090                if dep not in self.runq_complete:
2091                    alldeps = False
2092                    break
2093            if alldeps:
2094                self.setbuildable(revdep)
2095                logger.debug("Marking task %s as buildable", revdep)
2096
2097        found = None
2098        for t in sorted(self.sq_deferred.copy()):
2099            if self.sq_deferred[t] == task:
2100                # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task.
2101                # We shouldn't allow all to run at once as it is prone to races.
2102                if not found:
2103                    bb.debug(1, "Deferred task %s now buildable" % t)
2104                    del self.sq_deferred[t]
2105                    update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
2106                    found = t
2107                else:
2108                    bb.debug(1, "Deferring %s after %s" % (t, found))
2109                    self.sq_deferred[t] = found
2110
2111    def task_complete(self, task):
2112        self.stats.taskCompleted()
2113        bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
2114        self.task_completeoutright(task)
2115        self.runq_tasksrun.add(task)
2116
2117    def task_fail(self, task, exitcode, fakerootlog=None):
2118        """
2119        Called when a task has failed
2120        Updates the state engine with the failure
2121        """
2122        self.stats.taskFailed()
2123        self.failed_tids.append(task)
2124
2125        fakeroot_log = []
2126        if fakerootlog and os.path.exists(fakerootlog):
2127            with open(fakerootlog) as fakeroot_log_file:
2128                fakeroot_failed = False
2129                for line in reversed(fakeroot_log_file.readlines()):
2130                    for fakeroot_error in ['mismatch', 'error', 'fatal']:
2131                        if fakeroot_error in line.lower():
2132                            fakeroot_failed = True
2133                    if 'doing new pid setup and server start' in line:
2134                        break
2135                    fakeroot_log.append(line)
2136
2137            if not fakeroot_failed:
2138                fakeroot_log = []
2139
2140        bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData)
2141
2142        if self.rqdata.taskData[''].halt:
2143            self.rq.state = runQueueCleanUp
2144
2145    def task_skip(self, task, reason):
2146        self.runq_running.add(task)
2147        self.setbuildable(task)
2148        bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
2149        self.task_completeoutright(task)
2150        self.stats.taskSkipped()
2151        self.stats.taskCompleted()
2152
2153    def summarise_scenequeue_errors(self):
2154        err = False
2155        if not self.sqdone:
2156            logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
2157            completeevent = sceneQueueComplete(self.stats, self.rq)
2158            bb.event.fire(completeevent, self.cfgData)
2159        if self.sq_deferred:
2160            logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred))
2161            err = True
2162        if self.updated_taskhash_queue:
2163            logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue))
2164            err = True
2165        if self.holdoff_tasks:
2166            logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks))
2167            err = True
2168
2169        for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered):
2170            # No task should end up in both covered and uncovered, that is a bug.
2171            logger.error("Setscene task %s in both covered and notcovered." % tid)
2172
2173        for tid in self.rqdata.runq_setscene_tids:
2174            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2175                err = True
2176                logger.error("Setscene Task %s was never marked as covered or not covered" % tid)
2177            if tid not in self.sq_buildable:
2178                err = True
2179                logger.error("Setscene Task %s was never marked as buildable" % tid)
2180            if tid not in self.sq_running:
2181                err = True
2182                logger.error("Setscene Task %s was never marked as running" % tid)
2183
2184        for x in self.rqdata.runtaskentries:
2185            if x not in self.tasks_covered and x not in self.tasks_notcovered:
2186                logger.error("Task %s was never moved from the setscene queue" % x)
2187                err = True
2188            if x not in self.tasks_scenequeue_done:
2189                logger.error("Task %s was never processed by the setscene code" % x)
2190                err = True
2191            if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable:
2192                logger.error("Task %s was never marked as buildable by the setscene code" % x)
2193                err = True
2194        return err
2195
2196
2197    def execute(self):
2198        """
2199        Run the tasks in a queue prepared by prepare_runqueue
2200        """
2201
2202        self.rq.read_workers()
2203        if self.updated_taskhash_queue or self.pending_migrations:
2204            self.process_possible_migrations()
2205
2206        if not hasattr(self, "sorted_setscene_tids"):
2207            # Don't want to sort this set every execution
2208            self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids)
2209            # Resume looping where we left off when we returned to feed the mainloop
2210            self.setscene_tids_generator = itertools.cycle(self.rqdata.runq_setscene_tids)
2211
2212        task = None
2213        if not self.sqdone and self.can_start_task():
2214            loopcount = 0
2215            # Find the next setscene to run, exit the loop when we've processed all tids or found something to execute
2216            while loopcount < len(self.rqdata.runq_setscene_tids):
2217                loopcount += 1
2218                nexttask = next(self.setscene_tids_generator)
2219                if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values() and nexttask not in self.sq_harddep_deferred:
2220                    if nexttask in self.sq_deferred and self.sq_deferred[nexttask] not in self.runq_complete:
2221                        # Skip deferred tasks quickly before the 'expensive' tests below - this is key to performant multiconfig builds
2222                        continue
2223                    if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and \
2224                            nexttask not in self.sq_needed_harddeps and \
2225                            self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and \
2226                            self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]):
2227                        if nexttask not in self.rqdata.target_tids:
2228                            logger.debug2("Skipping setscene for task %s" % nexttask)
2229                            self.sq_task_skip(nexttask)
2230                            self.scenequeue_notneeded.add(nexttask)
2231                            if nexttask in self.sq_deferred:
2232                                del self.sq_deferred[nexttask]
2233                            return True
2234                    if nexttask in self.sqdata.sq_harddeps_rev and not self.sqdata.sq_harddeps_rev[nexttask].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2235                        logger.debug2("Deferring %s due to hard dependencies" % nexttask)
2236                        updated = False
2237                        for dep in self.sqdata.sq_harddeps_rev[nexttask]:
2238                            if dep not in self.sq_needed_harddeps:
2239                                logger.debug2("Enabling task %s as it is a hard dependency" % dep)
2240                                self.sq_buildable.add(dep)
2241                                self.sq_needed_harddeps.add(dep)
2242                                updated = True
2243                        self.sq_harddep_deferred.add(nexttask)
2244                        if updated:
2245                            return True
2246                        continue
2247                    # If covered tasks are running, need to wait for them to complete
2248                    for t in self.sqdata.sq_covered_tasks[nexttask]:
2249                        if t in self.runq_running and t not in self.runq_complete:
2250                            continue
2251                    if nexttask in self.sq_deferred:
2252                        # Deferred tasks that were still deferred were skipped above so we now need to process
2253                        logger.debug("Task %s no longer deferred" % nexttask)
2254                        del self.sq_deferred[nexttask]
2255                        valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False)
2256                        if not valid:
2257                            logger.debug("%s didn't become valid, skipping setscene" % nexttask)
2258                            self.sq_task_failoutright(nexttask)
2259                            return True
2260                    if nexttask in self.sqdata.outrightfail:
2261                        logger.debug2('No package found, so skipping setscene task %s', nexttask)
2262                        self.sq_task_failoutright(nexttask)
2263                        return True
2264                    if nexttask in self.sqdata.unskippable:
2265                        logger.debug2("Setscene task %s is unskippable" % nexttask)
2266                    task = nexttask
2267                    break
2268        if task is not None:
2269            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2270            taskname = taskname + "_setscene"
2271            if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache):
2272                logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task)
2273                self.sq_task_failoutright(task)
2274                return True
2275
2276            if self.cooker.configuration.force:
2277                if task in self.rqdata.target_tids:
2278                    self.sq_task_failoutright(task)
2279                    return True
2280
2281            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2282                logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task)
2283                self.sq_task_skip(task)
2284                return True
2285
2286            if self.cooker.configuration.skipsetscene:
2287                logger.debug2('No setscene tasks should be executed. Skipping %s', task)
2288                self.sq_task_failoutright(task)
2289                return True
2290
2291            startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
2292            bb.event.fire(startevent, self.cfgData)
2293
2294            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2295            realfn = bb.cache.virtualfn2realfn(taskfn)[0]
2296            runtask = {
2297                'fn' : taskfn,
2298                'task' : task,
2299                'taskname' : taskname,
2300                'taskhash' : self.rqdata.get_task_hash(task),
2301                'unihash' : self.rqdata.get_task_unihash(task),
2302                'quieterrors' : True,
2303                'appends' : self.cooker.collections[mc].get_file_appends(taskfn),
2304                'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2],
2305                'taskdepdata' : self.sq_build_taskdepdata(task),
2306                'dry_run' : False,
2307                'taskdep': taskdep,
2308                'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn],
2309                'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn],
2310                'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn]
2311            }
2312
2313            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
2314                if not mc in self.rq.fakeworker:
2315                    self.rq.start_fakeworker(self, mc)
2316                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
2317                self.rq.fakeworker[mc].process.stdin.flush()
2318            else:
2319                RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
2320                self.rq.worker[mc].process.stdin.flush()
2321
2322            self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2323            self.build_stamps2.append(self.build_stamps[task])
2324            self.sq_running.add(task)
2325            self.sq_live.add(task)
2326            self.stats.updateActiveSetscene(len(self.sq_live))
2327            if self.can_start_task():
2328                return True
2329
2330        self.update_holdofftasks()
2331
2332        if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks:
2333            hashequiv_logger.verbose("Setscene tasks completed")
2334
2335            err = self.summarise_scenequeue_errors()
2336            if err:
2337                self.rq.state = runQueueFailed
2338                return True
2339
2340            if self.cooker.configuration.setsceneonly:
2341                self.rq.state = runQueueComplete
2342                return True
2343            self.sqdone = True
2344
2345            if self.stats.total == 0:
2346                # nothing to do
2347                self.rq.state = runQueueComplete
2348                return True
2349
2350        if self.cooker.configuration.setsceneonly:
2351            task = None
2352        else:
2353            task = self.sched.next()
2354        if task is not None:
2355            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2356
2357            if self.rqdata.setscene_ignore_tasks is not None:
2358                if self.check_setscene_ignore_tasks(task):
2359                    self.task_fail(task, "setscene ignore_tasks")
2360                    return True
2361
2362            if task in self.tasks_covered:
2363                logger.debug2("Setscene covered task %s", task)
2364                self.task_skip(task, "covered")
2365                return True
2366
2367            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2368                logger.debug2("Stamp current task %s", task)
2369
2370                self.task_skip(task, "existing")
2371                self.runq_tasksrun.add(task)
2372                return True
2373
2374            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2375            if 'noexec' in taskdep and taskname in taskdep['noexec']:
2376                startevent = runQueueTaskStarted(task, self.stats, self.rq,
2377                                                 noexec=True)
2378                bb.event.fire(startevent, self.cfgData)
2379                self.runq_running.add(task)
2380                self.stats.taskActive()
2381                if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2382                    bb.build.make_stamp_mcfn(taskname, taskfn)
2383                self.task_complete(task)
2384                return True
2385            else:
2386                startevent = runQueueTaskStarted(task, self.stats, self.rq)
2387                bb.event.fire(startevent, self.cfgData)
2388
2389            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2390            realfn = bb.cache.virtualfn2realfn(taskfn)[0]
2391            runtask = {
2392                'fn' : taskfn,
2393                'task' : task,
2394                'taskname' : taskname,
2395                'taskhash' : self.rqdata.get_task_hash(task),
2396                'unihash' : self.rqdata.get_task_unihash(task),
2397                'quieterrors' : False,
2398                'appends' : self.cooker.collections[mc].get_file_appends(taskfn),
2399                'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2],
2400                'taskdepdata' : self.build_taskdepdata(task),
2401                'dry_run' : self.rqdata.setscene_enforce,
2402                'taskdep': taskdep,
2403                'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn],
2404                'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn],
2405                'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn]
2406            }
2407
2408            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2409                if not mc in self.rq.fakeworker:
2410                    try:
2411                        self.rq.start_fakeworker(self, mc)
2412                    except OSError as exc:
2413                        logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc)))
2414                        self.rq.state = runQueueFailed
2415                        self.stats.taskFailed()
2416                        return True
2417                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
2418                self.rq.fakeworker[mc].process.stdin.flush()
2419            else:
2420                RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
2421                self.rq.worker[mc].process.stdin.flush()
2422
2423            self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2424            self.build_stamps2.append(self.build_stamps[task])
2425            self.runq_running.add(task)
2426            self.stats.taskActive()
2427            if self.can_start_task():
2428                return True
2429
2430        if self.stats.active > 0 or self.sq_live:
2431            self.rq.read_workers()
2432            return self.rq.active_fds()
2433
2434        # No more tasks can be run. If we have deferred setscene tasks we should run them.
2435        if self.sq_deferred:
2436            deferred_tid = list(self.sq_deferred.keys())[0]
2437            blocking_tid = self.sq_deferred.pop(deferred_tid)
2438            logger.warning("Runqueue deadlocked on deferred tasks, forcing task %s blocked by %s" % (deferred_tid, blocking_tid))
2439            return True
2440
2441        if self.failed_tids:
2442            self.rq.state = runQueueFailed
2443            return True
2444
2445        # Sanity Checks
2446        err = self.summarise_scenequeue_errors()
2447        for task in self.rqdata.runtaskentries:
2448            if task not in self.runq_buildable:
2449                logger.error("Task %s never buildable!", task)
2450                err = True
2451            elif task not in self.runq_running:
2452                logger.error("Task %s never ran!", task)
2453                err = True
2454            elif task not in self.runq_complete:
2455                logger.error("Task %s never completed!", task)
2456                err = True
2457
2458        if err:
2459            self.rq.state = runQueueFailed
2460        else:
2461            self.rq.state = runQueueComplete
2462
2463        return True
2464
2465    def filtermcdeps(self, task, mc, deps):
2466        ret = set()
2467        for dep in deps:
2468            thismc = mc_from_tid(dep)
2469            if thismc != mc:
2470                continue
2471            ret.add(dep)
2472        return ret
2473
2474    # Build the individual cache entries in advance once to save time
2475    def build_taskdepdata_cache(self):
2476        taskdepdata_cache = {}
2477        for task in self.rqdata.runtaskentries:
2478            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2479            taskdepdata_cache[task] = bb.TaskData(
2480                pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn],
2481                taskname = taskname,
2482                fn = fn,
2483                deps = self.filtermcdeps(task, mc, self.rqdata.runtaskentries[task].depends),
2484                provides = self.rqdata.dataCaches[mc].fn_provides[taskfn],
2485                taskhash = self.rqdata.runtaskentries[task].hash,
2486                unihash = self.rqdata.runtaskentries[task].unihash,
2487                hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn],
2488                taskhash_deps = self.rqdata.runtaskentries[task].taskhash_deps,
2489            )
2490
2491        self.taskdepdata_cache = taskdepdata_cache
2492
2493    # We filter out multiconfig dependencies from taskdepdata we pass to the tasks
2494    # as most code can't handle them
2495    def build_taskdepdata(self, task):
2496        taskdepdata = {}
2497        mc = mc_from_tid(task)
2498        next = self.rqdata.runtaskentries[task].depends.copy()
2499        next.add(task)
2500        next = self.filtermcdeps(task, mc, next)
2501        while next:
2502            additional = []
2503            for revdep in next:
2504                self.taskdepdata_cache[revdep] = self.taskdepdata_cache[revdep]._replace(
2505                    unihash=self.rqdata.runtaskentries[revdep].unihash
2506                )
2507                taskdepdata[revdep] = self.taskdepdata_cache[revdep]
2508                for revdep2 in self.taskdepdata_cache[revdep].deps:
2509                    if revdep2 not in taskdepdata:
2510                        additional.append(revdep2)
2511            next = additional
2512
2513        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2514        return taskdepdata
2515
2516    def update_holdofftasks(self):
2517
2518        if not self.holdoff_need_update:
2519            return
2520
2521        notcovered = set(self.scenequeue_notcovered)
2522        notcovered |= self.sqdata.cantskip
2523        for tid in self.scenequeue_notcovered:
2524            notcovered |= self.sqdata.sq_covered_tasks[tid]
2525        notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids)
2526        notcovered.intersection_update(self.tasks_scenequeue_done)
2527
2528        covered = set(self.scenequeue_covered)
2529        for tid in self.scenequeue_covered:
2530            covered |= self.sqdata.sq_covered_tasks[tid]
2531        covered.difference_update(notcovered)
2532        covered.intersection_update(self.tasks_scenequeue_done)
2533
2534        for tid in notcovered | covered:
2535            if not self.rqdata.runtaskentries[tid].depends:
2536                self.setbuildable(tid)
2537            elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
2538                 self.setbuildable(tid)
2539
2540        self.tasks_covered = covered
2541        self.tasks_notcovered = notcovered
2542
2543        self.holdoff_tasks = set()
2544
2545        for tid in self.rqdata.runq_setscene_tids:
2546            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2547                self.holdoff_tasks.add(tid)
2548
2549        for tid in self.holdoff_tasks.copy():
2550            for dep in self.sqdata.sq_covered_tasks[tid]:
2551                if dep not in self.runq_complete:
2552                    self.holdoff_tasks.add(dep)
2553
2554        self.holdoff_need_update = False
2555
2556    def process_possible_migrations(self):
2557
2558        changed = set()
2559        toprocess = set()
2560        for tid, unihash in self.updated_taskhash_queue.copy():
2561            if tid in self.runq_running and tid not in self.runq_complete:
2562                continue
2563
2564            self.updated_taskhash_queue.remove((tid, unihash))
2565
2566            if unihash != self.rqdata.runtaskentries[tid].unihash:
2567                # Make sure we rehash any other tasks with the same task hash that we're deferred against.
2568                torehash = [tid]
2569                for deftid in self.sq_deferred:
2570                    if self.sq_deferred[deftid] == tid:
2571                        torehash.append(deftid)
2572                for hashtid in torehash:
2573                    hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash))
2574                    self.rqdata.runtaskentries[hashtid].unihash = unihash
2575                    bb.parse.siggen.set_unihash(hashtid, unihash)
2576                    toprocess.add(hashtid)
2577
2578        # Work out all tasks which depend upon these
2579        total = set()
2580        next = set()
2581        for p in toprocess:
2582            next |= self.rqdata.runtaskentries[p].revdeps
2583        while next:
2584            current = next.copy()
2585            total = total | next
2586            next = set()
2587            for ntid in current:
2588                next |= self.rqdata.runtaskentries[ntid].revdeps
2589            next.difference_update(total)
2590
2591        # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
2592        next = set()
2593        for p in total:
2594            if not self.rqdata.runtaskentries[p].depends:
2595                next.add(p)
2596            elif self.rqdata.runtaskentries[p].depends.isdisjoint(total):
2597                next.add(p)
2598
2599        starttime = time.time()
2600        lasttime = starttime
2601
2602        # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled
2603        while next:
2604            current = next.copy()
2605            next = set()
2606            ready = {}
2607            for tid in current:
2608                if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
2609                    continue
2610                # get_taskhash for a given tid *must* be called before get_unihash* below
2611                ready[tid] = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches)
2612
2613            unihashes = bb.parse.siggen.get_unihashes(ready.keys())
2614
2615            for tid in ready:
2616                orighash = self.rqdata.runtaskentries[tid].hash
2617                newhash = ready[tid]
2618                origuni = self.rqdata.runtaskentries[tid].unihash
2619                newuni = unihashes[tid]
2620
2621                # FIXME, need to check it can come from sstate at all for determinism?
2622                remapped = False
2623                if newuni == origuni:
2624                    # Nothing to do, we match, skip code below
2625                    remapped = True
2626                elif tid in self.scenequeue_covered or tid in self.sq_live:
2627                    # Already ran this setscene task or it running. Report the new taskhash
2628                    bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches)
2629                    hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid))
2630                    remapped = True
2631
2632                if not remapped:
2633                    #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni))
2634                    self.rqdata.runtaskentries[tid].hash = newhash
2635                    self.rqdata.runtaskentries[tid].unihash = newuni
2636                    changed.add(tid)
2637
2638                next |= self.rqdata.runtaskentries[tid].revdeps
2639                total.remove(tid)
2640                next.intersection_update(total)
2641                bb.event.check_for_interrupts(self.cooker.data)
2642
2643                if time.time() > (lasttime + 30):
2644                    lasttime = time.time()
2645                    hashequiv_logger.verbose("Rehash loop slow progress: %s in %s" % (len(total), lasttime - starttime))
2646
2647        endtime = time.time()
2648        if (endtime-starttime > 60):
2649            hashequiv_logger.verbose("Rehash loop took more than 60s: %s" % (endtime-starttime))
2650
2651        if changed:
2652            for mc in self.rq.worker:
2653                RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
2654            for mc in self.rq.fakeworker:
2655                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
2656
2657            hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))
2658
2659        for tid in changed:
2660            if tid not in self.rqdata.runq_setscene_tids:
2661                continue
2662            if tid not in self.pending_migrations:
2663                self.pending_migrations.add(tid)
2664
2665        update_tasks = []
2666        for tid in self.pending_migrations.copy():
2667            if tid in self.runq_running or tid in self.sq_live:
2668                # Too late, task already running, not much we can do now
2669                self.pending_migrations.remove(tid)
2670                continue
2671
2672            valid = True
2673            # Check no tasks this covers are running
2674            for dep in self.sqdata.sq_covered_tasks[tid]:
2675                if dep in self.runq_running and dep not in self.runq_complete:
2676                    hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid))
2677                    valid = False
2678                    break
2679            if not valid:
2680                continue
2681
2682            self.pending_migrations.remove(tid)
2683            changed = True
2684
2685            if tid in self.tasks_scenequeue_done:
2686                self.tasks_scenequeue_done.remove(tid)
2687            for dep in self.sqdata.sq_covered_tasks[tid]:
2688                if dep in self.runq_complete and dep not in self.runq_tasksrun:
2689                    bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep)
2690                    self.failed_tids.append(tid)
2691                    self.rq.state = runQueueCleanUp
2692                    return
2693
2694                if dep not in self.runq_complete:
2695                    if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable:
2696                        self.tasks_scenequeue_done.remove(dep)
2697
2698            if tid in self.sq_buildable:
2699                self.sq_buildable.remove(tid)
2700            if tid in self.sq_running:
2701                self.sq_running.remove(tid)
2702            if tid in self.sqdata.outrightfail:
2703                self.sqdata.outrightfail.remove(tid)
2704            if tid in self.scenequeue_notcovered:
2705                self.scenequeue_notcovered.remove(tid)
2706            if tid in self.scenequeue_covered:
2707                self.scenequeue_covered.remove(tid)
2708            if tid in self.scenequeue_notneeded:
2709                self.scenequeue_notneeded.remove(tid)
2710
2711            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2712            self.sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2713
2714            if tid in self.stampcache:
2715                del self.stampcache[tid]
2716
2717            if tid in self.build_stamps:
2718                del self.build_stamps[tid]
2719
2720            update_tasks.append(tid)
2721
2722        update_tasks2 = []
2723        for tid in update_tasks:
2724            harddepfail = False
2725            for t in self.sqdata.sq_harddeps_rev[tid]:
2726                if t in self.scenequeue_notcovered:
2727                    harddepfail = True
2728                    break
2729            if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2730                if tid not in self.sq_buildable:
2731                    self.sq_buildable.add(tid)
2732            if not self.sqdata.sq_revdeps[tid]:
2733                self.sq_buildable.add(tid)
2734
2735            update_tasks2.append((tid, harddepfail, tid in self.sqdata.valid))
2736
2737        if update_tasks2:
2738            self.sqdone = False
2739            for mc in sorted(self.sqdata.multiconfigs):
2740                for tid in sorted([t[0] for t in update_tasks2]):
2741                    if mc_from_tid(tid) != mc:
2742                        continue
2743                    h = pending_hash_index(tid, self.rqdata)
2744                    if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]:
2745                        self.sq_deferred[tid] = self.sqdata.hashes[h]
2746                        bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h]))
2747            update_scenequeue_data([t[0] for t in update_tasks2], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
2748
2749        for (tid, harddepfail, origvalid) in update_tasks2:
2750            if tid in self.sqdata.valid and not origvalid:
2751                hashequiv_logger.verbose("Setscene task %s became valid" % tid)
2752            if harddepfail:
2753                logger.debug2("%s has an unavailable hard dependency so skipping" % (tid))
2754                self.sq_task_failoutright(tid)
2755
2756        if changed:
2757            self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
2758            self.sq_needed_harddeps = set()
2759            self.sq_harddep_deferred = set()
2760            self.holdoff_need_update = True
2761
2762    def scenequeue_updatecounters(self, task, fail=False):
2763
2764        if fail and task in self.sqdata.sq_harddeps:
2765            for dep in sorted(self.sqdata.sq_harddeps[task]):
2766                if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered:
2767                    # dependency could be already processed, e.g. noexec setscene task
2768                    continue
2769                noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache)
2770                if noexec or stamppresent:
2771                    continue
2772                logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
2773                self.sq_task_failoutright(dep)
2774                continue
2775
2776        # For performance, only compute allcovered once if needed
2777        if self.sqdata.sq_deps[task]:
2778            allcovered = self.scenequeue_covered | self.scenequeue_notcovered
2779        for dep in sorted(self.sqdata.sq_deps[task]):
2780            if self.sqdata.sq_revdeps[dep].issubset(allcovered):
2781                if dep not in self.sq_buildable:
2782                    self.sq_buildable.add(dep)
2783
2784        next = set([task])
2785        while next:
2786            new = set()
2787            for t in sorted(next):
2788                self.tasks_scenequeue_done.add(t)
2789                # Look down the dependency chain for non-setscene things which this task depends on
2790                # and mark as 'done'
2791                for dep in self.rqdata.runtaskentries[t].depends:
2792                    if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done:
2793                        continue
2794                    if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done):
2795                        new.add(dep)
2796            next = new
2797
2798        # If this task was one which other setscene tasks have a hard dependency upon, we need
2799        # to walk through the hard dependencies and allow execution of those which have completed dependencies.
2800        if task in self.sqdata.sq_harddeps:
2801            for dep in self.sq_harddep_deferred.copy():
2802                if self.sqdata.sq_harddeps_rev[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2803                    self.sq_harddep_deferred.remove(dep)
2804
2805        self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
2806        self.holdoff_need_update = True
2807
2808    def sq_task_completeoutright(self, task):
2809        """
2810        Mark a task as completed
2811        Look at the reverse dependencies and mark any task with
2812        completed dependencies as buildable
2813        """
2814
2815        logger.debug('Found task %s which could be accelerated', task)
2816        self.scenequeue_covered.add(task)
2817        self.scenequeue_updatecounters(task)
2818
2819    def sq_check_taskfail(self, task):
2820        if self.rqdata.setscene_ignore_tasks is not None:
2821            realtask = task.split('_setscene')[0]
2822            (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask)
2823            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2824            if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
2825                logger.error('Task %s.%s failed' % (pn, taskname + "_setscene"))
2826                self.rq.state = runQueueCleanUp
2827
2828    def sq_task_complete(self, task):
2829        bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
2830        self.sq_task_completeoutright(task)
2831
2832    def sq_task_fail(self, task, result):
2833        bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
2834        self.scenequeue_notcovered.add(task)
2835        self.scenequeue_updatecounters(task, True)
2836        self.sq_check_taskfail(task)
2837
2838    def sq_task_failoutright(self, task):
2839        self.sq_running.add(task)
2840        self.sq_buildable.add(task)
2841        self.scenequeue_notcovered.add(task)
2842        self.scenequeue_updatecounters(task, True)
2843
2844    def sq_task_skip(self, task):
2845        self.sq_running.add(task)
2846        self.sq_buildable.add(task)
2847        self.sq_task_completeoutright(task)
2848
2849    def sq_build_taskdepdata(self, task):
2850        def getsetscenedeps(tid):
2851            deps = set()
2852            (mc, fn, taskname, _) = split_tid_mcfn(tid)
2853            realtid = tid + "_setscene"
2854            idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends
2855            for (depname, idependtask) in idepends:
2856                if depname not in self.rqdata.taskData[mc].build_targets:
2857                    continue
2858
2859                depfn = self.rqdata.taskData[mc].build_targets[depname][0]
2860                if depfn is None:
2861                     continue
2862                deptid = depfn + ":" + idependtask.replace("_setscene", "")
2863                deps.add(deptid)
2864            return deps
2865
2866        taskdepdata = {}
2867        next = getsetscenedeps(task)
2868        next.add(task)
2869        while next:
2870            additional = []
2871            for revdep in next:
2872                (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2873                deps = getsetscenedeps(revdep)
2874
2875                taskdepdata[revdep] = bb.TaskData(
2876                    pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn],
2877                    taskname = taskname,
2878                    fn = fn,
2879                    deps = deps,
2880                    provides = self.rqdata.dataCaches[mc].fn_provides[taskfn],
2881                    taskhash = self.rqdata.runtaskentries[revdep].hash,
2882                    unihash = self.rqdata.runtaskentries[revdep].unihash,
2883                    hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn],
2884                    taskhash_deps = self.rqdata.runtaskentries[revdep].taskhash_deps,
2885                )
2886                for revdep2 in deps:
2887                    if revdep2 not in taskdepdata:
2888                        additional.append(revdep2)
2889            next = additional
2890
2891        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2892        return taskdepdata
2893
2894    def check_setscene_ignore_tasks(self, tid):
2895        # Check task that is going to run against the ignore tasks list
2896        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2897        # Ignore covered tasks
2898        if tid in self.tasks_covered:
2899            return False
2900        # Ignore stamped tasks
2901        if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache):
2902            return False
2903        # Ignore noexec tasks
2904        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2905        if 'noexec' in taskdep and taskname in taskdep['noexec']:
2906            return False
2907
2908        pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2909        if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
2910            if tid in self.rqdata.runq_setscene_tids:
2911                msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)]
2912            else:
2913                msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)]
2914            for t in self.scenequeue_notcovered:
2915                msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash))
2916            msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered))
2917            logger.error("".join(msg))
2918            return True
2919        return False
2920
2921class SQData(object):
2922    def __init__(self):
2923        # SceneQueue dependencies
2924        self.sq_deps = {}
2925        # SceneQueue reverse dependencies
2926        self.sq_revdeps = {}
2927        # Injected inter-setscene task dependencies
2928        self.sq_harddeps = {}
2929        self.sq_harddeps_rev = {}
2930        # Cache of stamp files so duplicates can't run in parallel
2931        self.stamps = {}
2932        # Setscene tasks directly depended upon by the build
2933        self.unskippable = set()
2934        # List of setscene tasks which aren't present
2935        self.outrightfail = set()
2936        # A list of normal tasks a setscene task covers
2937        self.sq_covered_tasks = {}
2938
2939def build_scenequeue_data(sqdata, rqdata, sqrq):
2940
2941    sq_revdeps = {}
2942    sq_revdeps_squash = {}
2943    sq_collated_deps = {}
2944
2945    # We can't skip specified target tasks which aren't setscene tasks
2946    sqdata.cantskip = set(rqdata.target_tids)
2947    sqdata.cantskip.difference_update(rqdata.runq_setscene_tids)
2948    sqdata.cantskip.intersection_update(rqdata.runtaskentries)
2949
2950    # We need to construct a dependency graph for the setscene functions. Intermediate
2951    # dependencies between the setscene tasks only complicate the code. This code
2952    # therefore aims to collapse the huge runqueue dependency tree into a smaller one
2953    # only containing the setscene functions.
2954
2955    rqdata.init_progress_reporter.next_stage()
2956
2957    # First process the chains up to the first setscene task.
2958    endpoints = {}
2959    for tid in rqdata.runtaskentries:
2960        sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps)
2961        sq_revdeps_squash[tid] = set()
2962        if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids:
2963            #bb.warn("Added endpoint %s" % (tid))
2964            endpoints[tid] = set()
2965
2966    rqdata.init_progress_reporter.next_stage()
2967
2968    # Secondly process the chains between setscene tasks.
2969    for tid in rqdata.runq_setscene_tids:
2970        sq_collated_deps[tid] = set()
2971        #bb.warn("Added endpoint 2 %s" % (tid))
2972        for dep in rqdata.runtaskentries[tid].depends:
2973                if tid in sq_revdeps[dep]:
2974                    sq_revdeps[dep].remove(tid)
2975                if dep not in endpoints:
2976                    endpoints[dep] = set()
2977                #bb.warn("  Added endpoint 3 %s" % (dep))
2978                endpoints[dep].add(tid)
2979
2980    rqdata.init_progress_reporter.next_stage()
2981
2982    def process_endpoints(endpoints):
2983        newendpoints = {}
2984        for point, task in endpoints.items():
2985            tasks = set()
2986            if task:
2987                tasks |= task
2988            if sq_revdeps_squash[point]:
2989                tasks |= sq_revdeps_squash[point]
2990            if point not in rqdata.runq_setscene_tids:
2991                for t in tasks:
2992                    sq_collated_deps[t].add(point)
2993            sq_revdeps_squash[point] = set()
2994            if point in rqdata.runq_setscene_tids:
2995                sq_revdeps_squash[point] = tasks
2996                continue
2997            for dep in rqdata.runtaskentries[point].depends:
2998                if point in sq_revdeps[dep]:
2999                    sq_revdeps[dep].remove(point)
3000                if tasks:
3001                    sq_revdeps_squash[dep] |= tasks
3002                if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids:
3003                    newendpoints[dep] = task
3004        if newendpoints:
3005            process_endpoints(newendpoints)
3006
3007    process_endpoints(endpoints)
3008
3009    rqdata.init_progress_reporter.next_stage()
3010
3011    # Build a list of tasks which are "unskippable"
3012    # These are direct endpoints referenced by the build upto and including setscene tasks
3013    # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon
3014    new = True
3015    for tid in rqdata.runtaskentries:
3016        if not rqdata.runtaskentries[tid].revdeps:
3017            sqdata.unskippable.add(tid)
3018    sqdata.unskippable |= sqdata.cantskip
3019    while new:
3020        new = False
3021        orig = sqdata.unskippable.copy()
3022        for tid in sorted(orig, reverse=True):
3023            if tid in rqdata.runq_setscene_tids:
3024                continue
3025            if not rqdata.runtaskentries[tid].depends:
3026                # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
3027                sqrq.setbuildable(tid)
3028            sqdata.unskippable |= rqdata.runtaskentries[tid].depends
3029            if sqdata.unskippable != orig:
3030                new = True
3031
3032    sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids)
3033
3034    rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries))
3035
3036    # Sanity check all dependencies could be changed to setscene task references
3037    for tid in rqdata.runtaskentries:
3038        if tid in rqdata.runq_setscene_tids:
3039            pass
3040        elif sq_revdeps_squash[tid]:
3041            bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.")
3042        else:
3043            del sq_revdeps_squash[tid]
3044
3045    rqdata.init_progress_reporter.next_stage()
3046
3047    # Resolve setscene inter-task dependencies
3048    # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
3049    # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
3050    for tid in rqdata.runq_setscene_tids:
3051        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
3052        realtid = tid + "_setscene"
3053        idepends = rqdata.taskData[mc].taskentries[realtid].idepends
3054        sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
3055
3056        sqdata.sq_harddeps_rev[tid] = set()
3057        for (depname, idependtask) in idepends:
3058
3059            if depname not in rqdata.taskData[mc].build_targets:
3060                continue
3061
3062            depfn = rqdata.taskData[mc].build_targets[depname][0]
3063            if depfn is None:
3064                continue
3065            deptid = depfn + ":" + idependtask.replace("_setscene", "")
3066            if deptid not in rqdata.runtaskentries:
3067                bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask))
3068
3069            logger.debug2("Adding hard setscene dependency %s for %s" % (deptid, tid))
3070
3071            if not deptid in sqdata.sq_harddeps:
3072                sqdata.sq_harddeps[deptid] = set()
3073            sqdata.sq_harddeps[deptid].add(tid)
3074            sqdata.sq_harddeps_rev[tid].add(deptid)
3075
3076    rqdata.init_progress_reporter.next_stage()
3077
3078    rqdata.init_progress_reporter.next_stage()
3079
3080    #for tid in sq_revdeps_squash:
3081    #    data = ""
3082    #    for dep in sq_revdeps_squash[tid]:
3083    #        data = data + "\n   %s" % dep
3084    #    bb.warn("Task %s_setscene: is %s " % (tid, data))
3085
3086    sqdata.sq_revdeps = sq_revdeps_squash
3087    sqdata.sq_covered_tasks = sq_collated_deps
3088
3089    # Build reverse version of revdeps to populate deps structure
3090    for tid in sqdata.sq_revdeps:
3091        sqdata.sq_deps[tid] = set()
3092    for tid in sqdata.sq_revdeps:
3093        for dep in sqdata.sq_revdeps[tid]:
3094            sqdata.sq_deps[dep].add(tid)
3095
3096    rqdata.init_progress_reporter.next_stage()
3097
3098    sqdata.multiconfigs = set()
3099    for tid in sqdata.sq_revdeps:
3100        sqdata.multiconfigs.add(mc_from_tid(tid))
3101        if not sqdata.sq_revdeps[tid]:
3102            sqrq.sq_buildable.add(tid)
3103
3104    rqdata.init_progress_reporter.next_stage()
3105
3106    sqdata.noexec = set()
3107    sqdata.stamppresent = set()
3108    sqdata.valid = set()
3109
3110    sqdata.hashes = {}
3111    sqrq.sq_deferred = {}
3112    for mc in sorted(sqdata.multiconfigs):
3113        for tid in sorted(sqdata.sq_revdeps):
3114            if mc_from_tid(tid) != mc:
3115                continue
3116            h = pending_hash_index(tid, rqdata)
3117            if h not in sqdata.hashes:
3118                sqdata.hashes[h] = tid
3119            else:
3120                sqrq.sq_deferred[tid] = sqdata.hashes[h]
3121                bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h]))
3122
3123def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False):
3124
3125    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
3126
3127    taskdep = rqdata.dataCaches[mc].task_deps[taskfn]
3128
3129    if 'noexec' in taskdep and taskname in taskdep['noexec']:
3130        bb.build.make_stamp_mcfn(taskname + "_setscene", taskfn)
3131        return True, False
3132
3133    if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache):
3134        logger.debug2('Setscene stamp current for task %s', tid)
3135        return False, True
3136
3137    if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache):
3138        logger.debug2('Normal stamp current for task %s', tid)
3139        return False, True
3140
3141    return False, False
3142
3143def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True):
3144
3145    tocheck = set()
3146
3147    for tid in sorted(tids):
3148        if tid in sqdata.stamppresent:
3149            sqdata.stamppresent.remove(tid)
3150        if tid in sqdata.valid:
3151            sqdata.valid.remove(tid)
3152        if tid in sqdata.outrightfail:
3153            sqdata.outrightfail.remove(tid)
3154
3155        noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True)
3156
3157        if noexec:
3158            sqdata.noexec.add(tid)
3159            sqrq.sq_task_skip(tid)
3160            logger.debug2("%s is noexec so skipping setscene" % (tid))
3161            continue
3162
3163        if stamppresent:
3164            sqdata.stamppresent.add(tid)
3165            sqrq.sq_task_skip(tid)
3166            logger.debug2("%s has a valid stamp, skipping" % (tid))
3167            continue
3168
3169        tocheck.add(tid)
3170
3171    sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary)
3172
3173    for tid in tids:
3174        if tid in sqdata.stamppresent:
3175            continue
3176        if tid in sqdata.valid:
3177            continue
3178        if tid in sqdata.noexec:
3179            continue
3180        if tid in sqrq.scenequeue_covered:
3181            continue
3182        if tid in sqrq.scenequeue_notcovered:
3183            continue
3184        if tid in sqrq.sq_deferred:
3185            continue
3186        sqdata.outrightfail.add(tid)
3187        logger.debug2("%s already handled (fallthrough), skipping" % (tid))
3188
3189class TaskFailure(Exception):
3190    """
3191    Exception raised when a task in a runqueue fails
3192    """
3193    def __init__(self, x):
3194        self.args = x
3195
3196
3197class runQueueExitWait(bb.event.Event):
3198    """
3199    Event when waiting for task processes to exit
3200    """
3201
3202    def __init__(self, remain):
3203        self.remain = remain
3204        self.message = "Waiting for %s active tasks to finish" % remain
3205        bb.event.Event.__init__(self)
3206
3207class runQueueEvent(bb.event.Event):
3208    """
3209    Base runQueue event class
3210    """
3211    def __init__(self, task, stats, rq):
3212        self.taskid = task
3213        self.taskstring = task
3214        self.taskname = taskname_from_tid(task)
3215        self.taskfile = fn_from_tid(task)
3216        self.taskhash = rq.rqdata.get_task_hash(task)
3217        self.stats = stats.copy()
3218        bb.event.Event.__init__(self)
3219
3220class sceneQueueEvent(runQueueEvent):
3221    """
3222    Base sceneQueue event class
3223    """
3224    def __init__(self, task, stats, rq, noexec=False):
3225        runQueueEvent.__init__(self, task, stats, rq)
3226        self.taskstring = task + "_setscene"
3227        self.taskname = taskname_from_tid(task) + "_setscene"
3228        self.taskfile = fn_from_tid(task)
3229        self.taskhash = rq.rqdata.get_task_hash(task)
3230
3231class runQueueTaskStarted(runQueueEvent):
3232    """
3233    Event notifying a task was started
3234    """
3235    def __init__(self, task, stats, rq, noexec=False):
3236        runQueueEvent.__init__(self, task, stats, rq)
3237        self.noexec = noexec
3238
3239class sceneQueueTaskStarted(sceneQueueEvent):
3240    """
3241    Event notifying a setscene task was started
3242    """
3243    def __init__(self, task, stats, rq, noexec=False):
3244        sceneQueueEvent.__init__(self, task, stats, rq)
3245        self.noexec = noexec
3246
3247class runQueueTaskFailed(runQueueEvent):
3248    """
3249    Event notifying a task failed
3250    """
3251    def __init__(self, task, stats, exitcode, rq, fakeroot_log=None):
3252        runQueueEvent.__init__(self, task, stats, rq)
3253        self.exitcode = exitcode
3254        self.fakeroot_log = fakeroot_log
3255
3256    def __str__(self):
3257        if self.fakeroot_log:
3258            return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log)
3259        else:
3260            return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode)
3261
3262class sceneQueueTaskFailed(sceneQueueEvent):
3263    """
3264    Event notifying a setscene task failed
3265    """
3266    def __init__(self, task, stats, exitcode, rq):
3267        sceneQueueEvent.__init__(self, task, stats, rq)
3268        self.exitcode = exitcode
3269
3270    def __str__(self):
3271        return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode)
3272
3273class sceneQueueComplete(sceneQueueEvent):
3274    """
3275    Event when all the sceneQueue tasks are complete
3276    """
3277    def __init__(self, stats, rq):
3278        self.stats = stats.copy()
3279        bb.event.Event.__init__(self)
3280
3281class runQueueTaskCompleted(runQueueEvent):
3282    """
3283    Event notifying a task completed
3284    """
3285
3286class sceneQueueTaskCompleted(sceneQueueEvent):
3287    """
3288    Event notifying a setscene task completed
3289    """
3290
3291class runQueueTaskSkipped(runQueueEvent):
3292    """
3293    Event notifying a task was skipped
3294    """
3295    def __init__(self, task, stats, rq, reason):
3296        runQueueEvent.__init__(self, task, stats, rq)
3297        self.reason = reason
3298
3299class taskUniHashUpdate(bb.event.Event):
3300    """
3301    Base runQueue event class
3302    """
3303    def __init__(self, task, unihash):
3304        self.taskid = task
3305        self.unihash = unihash
3306        bb.event.Event.__init__(self)
3307
3308class runQueuePipe():
3309    """
3310    Abstraction for a pipe between a worker thread and the server
3311    """
3312    def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None):
3313        self.input = pipein
3314        if pipeout:
3315            pipeout.close()
3316        bb.utils.nonblockingfd(self.input)
3317        self.queue = bytearray()
3318        self.d = d
3319        self.rq = rq
3320        self.rqexec = rqexec
3321        self.fakerootlogs = fakerootlogs
3322
3323    def read(self):
3324        for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]:
3325            for worker in workers.values():
3326                worker.process.poll()
3327                if worker.process.returncode is not None and not self.rq.teardown:
3328                    bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode)))
3329                    self.rq.finish_runqueue(True)
3330
3331        start = len(self.queue)
3332        try:
3333            self.queue.extend(self.input.read(512 * 1024) or b"")
3334        except (OSError, IOError) as e:
3335            if e.errno != errno.EAGAIN:
3336                raise
3337        end = len(self.queue)
3338        found = True
3339        while found and self.queue:
3340            found = False
3341            index = self.queue.find(b"</event>")
3342            while index != -1 and self.queue.startswith(b"<event>"):
3343                try:
3344                    event = pickle.loads(self.queue[7:index])
3345                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3346                    if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e):
3347                        # The pickled data could contain "</event>" so search for the next occurance
3348                        # unpickling again, this should be the only way an unpickle error could occur
3349                        index = self.queue.find(b"</event>", index + 1)
3350                        continue
3351                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
3352                bb.event.fire_from_worker(event, self.d)
3353                if isinstance(event, taskUniHashUpdate):
3354                    self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash))
3355                found = True
3356                self.queue = self.queue[index+8:]
3357                index = self.queue.find(b"</event>")
3358            index = self.queue.find(b"</exitcode>")
3359            while index != -1 and self.queue.startswith(b"<exitcode>"):
3360                try:
3361                    task, status = pickle.loads(self.queue[10:index])
3362                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3363                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
3364                (_, _, _, taskfn) = split_tid_mcfn(task)
3365                fakerootlog = None
3366                if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs:
3367                    fakerootlog = self.fakerootlogs[taskfn]
3368                self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog)
3369                found = True
3370                self.queue = self.queue[index+11:]
3371                index = self.queue.find(b"</exitcode>")
3372        return (end > start)
3373
3374    def close(self):
3375        while self.read():
3376            continue
3377        if self.queue:
3378            print("Warning, worker left partial message: %s" % self.queue)
3379        self.input.close()
3380
3381def get_setscene_enforce_ignore_tasks(d, targets):
3382    if d.getVar('BB_SETSCENE_ENFORCE') != '1':
3383        return None
3384    ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split()
3385    outlist = []
3386    for item in ignore_tasks[:]:
3387        if item.startswith('%:'):
3388            for (mc, target, task, fn) in targets:
3389                outlist.append(target + ':' + item.split(':')[1])
3390        else:
3391            outlist.append(item)
3392    return outlist
3393
3394def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks):
3395    import fnmatch
3396    if ignore_tasks is not None:
3397        item = '%s:%s' % (pn, taskname)
3398        for ignore_tasks in ignore_tasks:
3399            if fnmatch.fnmatch(item, ignore_tasks):
3400                return True
3401        return False
3402    return True
3403