xref: /openbmc/openbmc/poky/bitbake/lib/bb/runqueue.py (revision 03514f19)
1"""
2BitBake 'RunQueue' implementation
3
4Handles preparation and execution of a queue of tasks
5"""
6
7# Copyright (C) 2006-2007  Richard Purdie
8#
9# SPDX-License-Identifier: GPL-2.0-only
10#
11
12import copy
13import os
14import sys
15import stat
16import errno
17import logging
18import re
19import bb
20from bb import msg, event
21from bb import monitordisk
22import subprocess
23import pickle
24from multiprocessing import Process
25import shlex
26import pprint
27import time
28
29bblogger = logging.getLogger("BitBake")
30logger = logging.getLogger("BitBake.RunQueue")
31hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv")
32
33__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' )
34
35def fn_from_tid(tid):
36     return tid.rsplit(":", 1)[0]
37
38def taskname_from_tid(tid):
39    return tid.rsplit(":", 1)[1]
40
41def mc_from_tid(tid):
42    if tid.startswith('mc:') and tid.count(':') >= 2:
43        return tid.split(':')[1]
44    return ""
45
46def split_tid(tid):
47    (mc, fn, taskname, _) = split_tid_mcfn(tid)
48    return (mc, fn, taskname)
49
50def split_mc(n):
51    if n.startswith("mc:") and n.count(':') >= 2:
52        _, mc, n = n.split(":", 2)
53        return (mc, n)
54    return ('', n)
55
56def split_tid_mcfn(tid):
57    if tid.startswith('mc:') and tid.count(':') >= 2:
58        elems = tid.split(':')
59        mc = elems[1]
60        fn = ":".join(elems[2:-1])
61        taskname = elems[-1]
62        mcfn = "mc:" + mc + ":" + fn
63    else:
64        tid = tid.rsplit(":", 1)
65        mc = ""
66        fn = tid[0]
67        taskname = tid[1]
68        mcfn = fn
69
70    return (mc, fn, taskname, mcfn)
71
72def build_tid(mc, fn, taskname):
73    if mc:
74        return "mc:" + mc + ":" + fn + ":" + taskname
75    return fn + ":" + taskname
76
77# Index used to pair up potentially matching multiconfig tasks
78# We match on PN, taskname and hash being equal
79def pending_hash_index(tid, rqdata):
80    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
81    pn = rqdata.dataCaches[mc].pkg_fn[taskfn]
82    h = rqdata.runtaskentries[tid].unihash
83    return pn + ":" + "taskname" + h
84
85class RunQueueStats:
86    """
87    Holds statistics on the tasks handled by the associated runQueue
88    """
89    def __init__(self, total, setscene_total):
90        self.completed = 0
91        self.skipped = 0
92        self.failed = 0
93        self.active = 0
94        self.setscene_active = 0
95        self.setscene_covered = 0
96        self.setscene_notcovered = 0
97        self.setscene_total = setscene_total
98        self.total = total
99
100    def copy(self):
101        obj = self.__class__(self.total, self.setscene_total)
102        obj.__dict__.update(self.__dict__)
103        return obj
104
105    def taskFailed(self):
106        self.active = self.active - 1
107        self.failed = self.failed + 1
108
109    def taskCompleted(self):
110        self.active = self.active - 1
111        self.completed = self.completed + 1
112
113    def taskSkipped(self):
114        self.active = self.active + 1
115        self.skipped = self.skipped + 1
116
117    def taskActive(self):
118        self.active = self.active + 1
119
120    def updateCovered(self, covered, notcovered):
121        self.setscene_covered = covered
122        self.setscene_notcovered = notcovered
123
124    def updateActiveSetscene(self, active):
125        self.setscene_active = active
126
127# These values indicate the next step due to be run in the
128# runQueue state machine
129runQueuePrepare = 2
130runQueueSceneInit = 3
131runQueueRunning = 6
132runQueueFailed = 7
133runQueueCleanUp = 8
134runQueueComplete = 9
135
136class RunQueueScheduler(object):
137    """
138    Control the order tasks are scheduled in.
139    """
140    name = "basic"
141
142    def __init__(self, runqueue, rqdata):
143        """
144        The default scheduler just returns the first buildable task (the
145        priority map is sorted by task number)
146        """
147        self.rq = runqueue
148        self.rqdata = rqdata
149        self.numTasks = len(self.rqdata.runtaskentries)
150
151        self.prio_map = [self.rqdata.runtaskentries.keys()]
152
153        self.buildable = set()
154        self.skip_maxthread = {}
155        self.stamps = {}
156        for tid in self.rqdata.runtaskentries:
157            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
158            self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
159            if tid in self.rq.runq_buildable:
160                self.buildable.add(tid)
161
162        self.rev_prio_map = None
163        self.is_pressure_usable()
164
165    def is_pressure_usable(self):
166        """
167        If monitoring pressure, return True if pressure files can be open and read. For example
168        openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported)
169        is returned.
170        """
171        if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure:
172            try:
173                with open("/proc/pressure/cpu") as cpu_pressure_fds, \
174                    open("/proc/pressure/io") as io_pressure_fds, \
175                    open("/proc/pressure/memory") as memory_pressure_fds:
176
177                    self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
178                    self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
179                    self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
180                    self.prev_pressure_time = time.time()
181                self.check_pressure = True
182            except:
183                bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure")
184                self.check_pressure = False
185        else:
186            self.check_pressure = False
187
188    def exceeds_max_pressure(self):
189        """
190        Monitor the difference in total pressure at least once per second, if
191        BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold.
192        """
193        if self.check_pressure:
194            with open("/proc/pressure/cpu") as cpu_pressure_fds, \
195                open("/proc/pressure/io") as io_pressure_fds, \
196                open("/proc/pressure/memory") as memory_pressure_fds:
197                # extract "total" from /proc/pressure/{cpu|io}
198                curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1]
199                curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1]
200                curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1]
201                now = time.time()
202                tdiff = now - self.prev_pressure_time
203                psi_accumulation_interval = 1.0
204                cpu_pressure = (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) / tdiff
205                io_pressure = (float(curr_io_pressure) - float(self.prev_io_pressure)) / tdiff
206                memory_pressure = (float(curr_memory_pressure) - float(self.prev_memory_pressure)) / tdiff
207                exceeds_cpu_pressure =  self.rq.max_cpu_pressure and cpu_pressure > self.rq.max_cpu_pressure
208                exceeds_io_pressure =  self.rq.max_io_pressure and io_pressure > self.rq.max_io_pressure
209                exceeds_memory_pressure =  self.rq.max_memory_pressure and memory_pressure > self.rq.max_memory_pressure
210
211                if tdiff > psi_accumulation_interval:
212                    self.prev_cpu_pressure = curr_cpu_pressure
213                    self.prev_io_pressure = curr_io_pressure
214                    self.prev_memory_pressure = curr_memory_pressure
215                    self.prev_pressure_time = now
216
217            pressure_state = (exceeds_cpu_pressure, exceeds_io_pressure, exceeds_memory_pressure)
218            pressure_values = (round(cpu_pressure,1), self.rq.max_cpu_pressure, round(io_pressure,1), self.rq.max_io_pressure, round(memory_pressure,1), self.rq.max_memory_pressure)
219            if hasattr(self, "pressure_state") and pressure_state != self.pressure_state:
220                bb.note("Pressure status changed to CPU: %s, IO: %s, Mem: %s (CPU: %s/%s, IO: %s/%s, Mem: %s/%s) - using %s/%s bitbake threads" % (pressure_state + pressure_values + (len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks)))
221            self.pressure_state = pressure_state
222            return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure)
223        elif self.rq.max_loadfactor:
224            limit = False
225            loadfactor = float(os.getloadavg()[0]) / os.cpu_count()
226            # bb.warn("Comparing %s to %s" % (loadfactor, self.rq.max_loadfactor))
227            if loadfactor > self.rq.max_loadfactor:
228                limit = True
229            if hasattr(self, "loadfactor_limit") and limit != self.loadfactor_limit:
230                bb.note("Load average limiting set to %s as load average: %s - using %s/%s bitbake threads" % (limit, loadfactor, len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks))
231            self.loadfactor_limit = limit
232            return limit
233        return False
234
235    def next_buildable_task(self):
236        """
237        Return the id of the first task we find that is buildable
238        """
239        # Once tasks are running we don't need to worry about them again
240        self.buildable.difference_update(self.rq.runq_running)
241        buildable = set(self.buildable)
242        buildable.difference_update(self.rq.holdoff_tasks)
243        buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered)
244        if not buildable:
245            return None
246
247        # Bitbake requires that at least one task be active. Only check for pressure if
248        # this is the case, otherwise the pressure limitation could result in no tasks
249        # being active and no new tasks started thereby, at times, breaking the scheduler.
250        if self.rq.stats.active and self.exceeds_max_pressure():
251            return None
252
253        # Filter out tasks that have a max number of threads that have been exceeded
254        skip_buildable = {}
255        for running in self.rq.runq_running.difference(self.rq.runq_complete):
256            rtaskname = taskname_from_tid(running)
257            if rtaskname not in self.skip_maxthread:
258                self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads")
259            if not self.skip_maxthread[rtaskname]:
260                continue
261            if rtaskname in skip_buildable:
262                skip_buildable[rtaskname] += 1
263            else:
264                skip_buildable[rtaskname] = 1
265
266        if len(buildable) == 1:
267            tid = buildable.pop()
268            taskname = taskname_from_tid(tid)
269            if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
270                return None
271            stamp = self.stamps[tid]
272            if stamp not in self.rq.build_stamps.values():
273                return tid
274
275        if not self.rev_prio_map:
276            self.rev_prio_map = {}
277            for tid in self.rqdata.runtaskentries:
278                self.rev_prio_map[tid] = self.prio_map.index(tid)
279
280        best = None
281        bestprio = None
282        for tid in buildable:
283            prio = self.rev_prio_map[tid]
284            if bestprio is None or bestprio > prio:
285                taskname = taskname_from_tid(tid)
286                if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]):
287                    continue
288                stamp = self.stamps[tid]
289                if stamp in self.rq.build_stamps.values():
290                    continue
291                bestprio = prio
292                best = tid
293
294        return best
295
296    def next(self):
297        """
298        Return the id of the task we should build next
299        """
300        if self.rq.can_start_task():
301            return self.next_buildable_task()
302
303    def newbuildable(self, task):
304        self.buildable.add(task)
305
306    def removebuildable(self, task):
307        self.buildable.remove(task)
308
309    def describe_task(self, taskid):
310        result = 'ID %s' % taskid
311        if self.rev_prio_map:
312            result = result + (' pri %d' % self.rev_prio_map[taskid])
313        return result
314
315    def dump_prio(self, comment):
316        bb.debug(3, '%s (most important first):\n%s' %
317                 (comment,
318                  '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for
319                             index, taskid in enumerate(self.prio_map)])))
320
321class RunQueueSchedulerSpeed(RunQueueScheduler):
322    """
323    A scheduler optimised for speed. The priority map is sorted by task weight,
324    heavier weighted tasks (tasks needed by the most other tasks) are run first.
325    """
326    name = "speed"
327
328    def __init__(self, runqueue, rqdata):
329        """
330        The priority map is sorted by task weight.
331        """
332        RunQueueScheduler.__init__(self, runqueue, rqdata)
333
334        weights = {}
335        for tid in self.rqdata.runtaskentries:
336            weight = self.rqdata.runtaskentries[tid].weight
337            if not weight in weights:
338                weights[weight] = []
339            weights[weight].append(tid)
340
341        self.prio_map = []
342        for weight in sorted(weights):
343            for w in weights[weight]:
344                self.prio_map.append(w)
345
346        self.prio_map.reverse()
347
348class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
349    """
350    A scheduler optimised to complete .bb files as quickly as possible. The
351    priority map is sorted by task weight, but then reordered so once a given
352    .bb file starts to build, it's completed as quickly as possible by
353    running all tasks related to the same .bb file one after the after.
354    This works well where disk space is at a premium and classes like OE's
355    rm_work are in force.
356    """
357    name = "completion"
358
359    def __init__(self, runqueue, rqdata):
360        super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata)
361
362        # Extract list of tasks for each recipe, with tasks sorted
363        # ascending from "must run first" (typically do_fetch) to
364        # "runs last" (do_build). The speed scheduler prioritizes
365        # tasks that must run first before the ones that run later;
366        # this is what we depend on here.
367        task_lists = {}
368        for taskid in self.prio_map:
369            fn, taskname = taskid.rsplit(':', 1)
370            task_lists.setdefault(fn, []).append(taskname)
371
372        # Now unify the different task lists. The strategy is that
373        # common tasks get skipped and new ones get inserted after the
374        # preceeding common one(s) as they are found. Because task
375        # lists should differ only by their number of tasks, but not
376        # the ordering of the common tasks, this should result in a
377        # deterministic result that is a superset of the individual
378        # task ordering.
379        all_tasks = []
380        for recipe, new_tasks in task_lists.items():
381            index = 0
382            old_task = all_tasks[index] if index < len(all_tasks) else None
383            for new_task in new_tasks:
384                if old_task == new_task:
385                    # Common task, skip it. This is the fast-path which
386                    # avoids a full search.
387                    index += 1
388                    old_task = all_tasks[index] if index < len(all_tasks) else None
389                else:
390                    try:
391                        index = all_tasks.index(new_task)
392                        # Already present, just not at the current
393                        # place. We re-synchronized by changing the
394                        # index so that it matches again. Now
395                        # move on to the next existing task.
396                        index += 1
397                        old_task = all_tasks[index] if index < len(all_tasks) else None
398                    except ValueError:
399                        # Not present. Insert before old_task, which
400                        # remains the same (but gets shifted back).
401                        all_tasks.insert(index, new_task)
402                        index += 1
403        bb.debug(3, 'merged task list: %s'  % all_tasks)
404
405        # Now reverse the order so that tasks that finish the work on one
406        # recipe are considered more imporant (= come first). The ordering
407        # is now so that do_build is most important.
408        all_tasks.reverse()
409
410        # Group tasks of the same kind before tasks of less important
411        # kinds at the head of the queue (because earlier = lower
412        # priority number = runs earlier), while preserving the
413        # ordering by recipe. If recipe foo is more important than
414        # bar, then the goal is to work on foo's do_populate_sysroot
415        # before bar's do_populate_sysroot and on the more important
416        # tasks of foo before any of the less important tasks in any
417        # other recipe (if those other recipes are more important than
418        # foo).
419        #
420        # All of this only applies when tasks are runable. Explicit
421        # dependencies still override this ordering by priority.
422        #
423        # Here's an example why this priority re-ordering helps with
424        # minimizing disk usage. Consider a recipe foo with a higher
425        # priority than bar where foo DEPENDS on bar. Then the
426        # implicit rule (from base.bbclass) is that foo's do_configure
427        # depends on bar's do_populate_sysroot. This ensures that
428        # bar's do_populate_sysroot gets done first. Normally the
429        # tasks from foo would continue to run once that is done, and
430        # bar only gets completed and cleaned up later. By ordering
431        # bar's task that depend on bar's do_populate_sysroot before foo's
432        # do_configure, that problem gets avoided.
433        task_index = 0
434        self.dump_prio('original priorities')
435        for task in all_tasks:
436            for index in range(task_index, self.numTasks):
437                taskid = self.prio_map[index]
438                taskname = taskid.rsplit(':', 1)[1]
439                if taskname == task:
440                    del self.prio_map[index]
441                    self.prio_map.insert(task_index, taskid)
442                    task_index += 1
443        self.dump_prio('completion priorities')
444
445class RunTaskEntry(object):
446    def __init__(self):
447        self.depends = set()
448        self.revdeps = set()
449        self.hash = None
450        self.unihash = None
451        self.task = None
452        self.weight = 1
453
454class RunQueueData:
455    """
456    BitBake Run Queue implementation
457    """
458    def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets):
459        self.cooker = cooker
460        self.dataCaches = dataCaches
461        self.taskData = taskData
462        self.targets = targets
463        self.rq = rq
464        self.warn_multi_bb = False
465
466        self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split()
467        self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets)
468        self.setscene_ignore_tasks_checked = False
469        self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1")
470        self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter()
471
472        self.reset()
473
474    def reset(self):
475        self.runtaskentries = {}
476
477    def runq_depends_names(self, ids):
478        import re
479        ret = []
480        for id in ids:
481            nam = os.path.basename(id)
482            nam = re.sub("_[^,]*,", ",", nam)
483            ret.extend([nam])
484        return ret
485
486    def get_task_hash(self, tid):
487        return self.runtaskentries[tid].hash
488
489    def get_task_unihash(self, tid):
490        return self.runtaskentries[tid].unihash
491
492    def get_user_idstring(self, tid, task_name_suffix = ""):
493        return tid + task_name_suffix
494
495    def get_short_user_idstring(self, task, task_name_suffix = ""):
496        (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
497        pn = self.dataCaches[mc].pkg_fn[taskfn]
498        taskname = taskname_from_tid(task) + task_name_suffix
499        return "%s:%s" % (pn, taskname)
500
501    def circular_depchains_handler(self, tasks):
502        """
503        Some tasks aren't buildable, likely due to circular dependency issues.
504        Identify the circular dependencies and print them in a user readable format.
505        """
506        from copy import deepcopy
507
508        valid_chains = []
509        explored_deps = {}
510        msgs = []
511
512        class TooManyLoops(Exception):
513            pass
514
515        def chain_reorder(chain):
516            """
517            Reorder a dependency chain so the lowest task id is first
518            """
519            lowest = 0
520            new_chain = []
521            for entry in range(len(chain)):
522                if chain[entry] < chain[lowest]:
523                    lowest = entry
524            new_chain.extend(chain[lowest:])
525            new_chain.extend(chain[:lowest])
526            return new_chain
527
528        def chain_compare_equal(chain1, chain2):
529            """
530            Compare two dependency chains and see if they're the same
531            """
532            if len(chain1) != len(chain2):
533                return False
534            for index in range(len(chain1)):
535                if chain1[index] != chain2[index]:
536                    return False
537            return True
538
539        def chain_array_contains(chain, chain_array):
540            """
541            Return True if chain_array contains chain
542            """
543            for ch in chain_array:
544                if chain_compare_equal(ch, chain):
545                    return True
546            return False
547
548        def find_chains(tid, prev_chain):
549            prev_chain.append(tid)
550            total_deps = []
551            total_deps.extend(self.runtaskentries[tid].revdeps)
552            for revdep in self.runtaskentries[tid].revdeps:
553                if revdep in prev_chain:
554                    idx = prev_chain.index(revdep)
555                    # To prevent duplicates, reorder the chain to start with the lowest taskid
556                    # and search through an array of those we've already printed
557                    chain = prev_chain[idx:]
558                    new_chain = chain_reorder(chain)
559                    if not chain_array_contains(new_chain, valid_chains):
560                        valid_chains.append(new_chain)
561                        msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
562                        for dep in new_chain:
563                            msgs.append("  Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends)))
564                        msgs.append("\n")
565                    if len(valid_chains) > 10:
566                        msgs.append("Halted dependency loops search after 10 matches.\n")
567                        raise TooManyLoops
568                    continue
569                scan = False
570                if revdep not in explored_deps:
571                    scan = True
572                elif revdep in explored_deps[revdep]:
573                    scan = True
574                else:
575                    for dep in prev_chain:
576                        if dep in explored_deps[revdep]:
577                            scan = True
578                if scan:
579                    find_chains(revdep, copy.deepcopy(prev_chain))
580                for dep in explored_deps[revdep]:
581                    if dep not in total_deps:
582                        total_deps.append(dep)
583
584            explored_deps[tid] = total_deps
585
586        try:
587            for task in tasks:
588                find_chains(task, [])
589        except TooManyLoops:
590            pass
591
592        return msgs
593
594    def calculate_task_weights(self, endpoints):
595        """
596        Calculate a number representing the "weight" of each task. Heavier weighted tasks
597        have more dependencies and hence should be executed sooner for maximum speed.
598
599        This function also sanity checks the task list finding tasks that are not
600        possible to execute due to circular dependencies.
601        """
602
603        numTasks = len(self.runtaskentries)
604        weight = {}
605        deps_left = {}
606        task_done = {}
607
608        for tid in self.runtaskentries:
609            task_done[tid] = False
610            weight[tid] = 1
611            deps_left[tid] = len(self.runtaskentries[tid].revdeps)
612
613        for tid in endpoints:
614            weight[tid] = 10
615            task_done[tid] = True
616
617        while True:
618            next_points = []
619            for tid in endpoints:
620                for revdep in self.runtaskentries[tid].depends:
621                    weight[revdep] = weight[revdep] + weight[tid]
622                    deps_left[revdep] = deps_left[revdep] - 1
623                    if deps_left[revdep] == 0:
624                        next_points.append(revdep)
625                        task_done[revdep] = True
626            endpoints = next_points
627            if not next_points:
628                break
629
630        # Circular dependency sanity check
631        problem_tasks = []
632        for tid in self.runtaskentries:
633            if task_done[tid] is False or deps_left[tid] != 0:
634                problem_tasks.append(tid)
635                logger.debug2("Task %s is not buildable", tid)
636                logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid])
637            self.runtaskentries[tid].weight = weight[tid]
638
639        if problem_tasks:
640            message = "%s unbuildable tasks were found.\n" % len(problem_tasks)
641            message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
642            message = message + "Identifying dependency loops (this may take a short while)...\n"
643            logger.error(message)
644
645            msgs = self.circular_depchains_handler(problem_tasks)
646
647            message = "\n"
648            for msg in msgs:
649                message = message + msg
650            bb.msg.fatal("RunQueue", message)
651
652        return weight
653
654    def prepare(self):
655        """
656        Turn a set of taskData into a RunQueue and compute data needed
657        to optimise the execution order.
658        """
659
660        runq_build = {}
661        recursivetasks = {}
662        recursiveitasks = {}
663        recursivetasksselfref = set()
664
665        taskData = self.taskData
666
667        found = False
668        for mc in self.taskData:
669            if taskData[mc].taskentries:
670                found = True
671                break
672        if not found:
673            # Nothing to do
674            return 0
675
676        bb.parse.siggen.setup_datacache(self.dataCaches)
677
678        self.init_progress_reporter.start()
679        self.init_progress_reporter.next_stage()
680        bb.event.check_for_interrupts(self.cooker.data)
681
682        # Step A - Work out a list of tasks to run
683        #
684        # Taskdata gives us a list of possible providers for every build and run
685        # target ordered by priority. It also gives information on each of those
686        # providers.
687        #
688        # To create the actual list of tasks to execute we fix the list of
689        # providers and then resolve the dependencies into task IDs. This
690        # process is repeated for each type of dependency (tdepends, deptask,
691        # rdeptast, recrdeptask, idepends).
692
693        def add_build_dependencies(depids, tasknames, depends, mc):
694            for depname in depids:
695                # Won't be in build_targets if ASSUME_PROVIDED
696                if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]:
697                    continue
698                depdata = taskData[mc].build_targets[depname][0]
699                if depdata is None:
700                    continue
701                for taskname in tasknames:
702                    t = depdata + ":" + taskname
703                    if t in taskData[mc].taskentries:
704                        depends.add(t)
705
706        def add_runtime_dependencies(depids, tasknames, depends, mc):
707            for depname in depids:
708                if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]:
709                    continue
710                depdata = taskData[mc].run_targets[depname][0]
711                if depdata is None:
712                    continue
713                for taskname in tasknames:
714                    t = depdata + ":" + taskname
715                    if t in taskData[mc].taskentries:
716                        depends.add(t)
717
718        def add_mc_dependencies(mc, tid):
719            mcdeps = taskData[mc].get_mcdepends()
720            for dep in mcdeps:
721                mcdependency = dep.split(':')
722                pn = mcdependency[3]
723                frommc = mcdependency[1]
724                mcdep = mcdependency[2]
725                deptask = mcdependency[4]
726                if mcdep not in taskData:
727                    bb.fatal("Multiconfig '%s' is referenced in multiconfig dependency '%s' but not enabled in BBMULTICONFIG?" % (mcdep, dep))
728                if mc == frommc:
729                    fn = taskData[mcdep].build_targets[pn][0]
730                    newdep = '%s:%s' % (fn,deptask)
731                    taskData[mc].taskentries[tid].tdepends.append(newdep)
732
733        for mc in taskData:
734            for tid in taskData[mc].taskentries:
735
736                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
737                #runtid = build_tid(mc, fn, taskname)
738
739                #logger.debug2("Processing %s,%s:%s", mc, fn, taskname)
740
741                depends = set()
742                task_deps = self.dataCaches[mc].task_deps[taskfn]
743
744                self.runtaskentries[tid] = RunTaskEntry()
745
746                if fn in taskData[mc].failed_fns:
747                    continue
748
749                # We add multiconfig dependencies before processing internal task deps (tdepends)
750                if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']:
751                    add_mc_dependencies(mc, tid)
752
753                # Resolve task internal dependencies
754                #
755                # e.g. addtask before X after Y
756                for t in taskData[mc].taskentries[tid].tdepends:
757                    (depmc, depfn, deptaskname, _) = split_tid_mcfn(t)
758                    depends.add(build_tid(depmc, depfn, deptaskname))
759
760                # Resolve 'deptask' dependencies
761                #
762                # e.g. do_sometask[deptask] = "do_someothertask"
763                # (makes sure sometask runs after someothertask of all DEPENDS)
764                if 'deptask' in task_deps and taskname in task_deps['deptask']:
765                    tasknames = task_deps['deptask'][taskname].split()
766                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
767
768                # Resolve 'rdeptask' dependencies
769                #
770                # e.g. do_sometask[rdeptask] = "do_someothertask"
771                # (makes sure sometask runs after someothertask of all RDEPENDS)
772                if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']:
773                    tasknames = task_deps['rdeptask'][taskname].split()
774                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
775
776                # Resolve inter-task dependencies
777                #
778                # e.g. do_sometask[depends] = "targetname:do_someothertask"
779                # (makes sure sometask runs after targetname's someothertask)
780                idepends = taskData[mc].taskentries[tid].idepends
781                for (depname, idependtask) in idepends:
782                    if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps:
783                        # Won't be in build_targets if ASSUME_PROVIDED
784                        depdata = taskData[mc].build_targets[depname][0]
785                        if depdata is not None:
786                            t = depdata + ":" + idependtask
787                            depends.add(t)
788                            if t not in taskData[mc].taskentries:
789                                bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
790                irdepends = taskData[mc].taskentries[tid].irdepends
791                for (depname, idependtask) in irdepends:
792                    if depname in taskData[mc].run_targets:
793                        # Won't be in run_targets if ASSUME_PROVIDED
794                        if not taskData[mc].run_targets[depname]:
795                            continue
796                        depdata = taskData[mc].run_targets[depname][0]
797                        if depdata is not None:
798                            t = depdata + ":" + idependtask
799                            depends.add(t)
800                            if t not in taskData[mc].taskentries:
801                                bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata))
802
803                # Resolve recursive 'recrdeptask' dependencies (Part A)
804                #
805                # e.g. do_sometask[recrdeptask] = "do_someothertask"
806                # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
807                # We cover the recursive part of the dependencies below
808                if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']:
809                    tasknames = task_deps['recrdeptask'][taskname].split()
810                    recursivetasks[tid] = tasknames
811                    add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc)
812                    add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc)
813                    if taskname in tasknames:
814                        recursivetasksselfref.add(tid)
815
816                    if 'recideptask' in task_deps and taskname in task_deps['recideptask']:
817                        recursiveitasks[tid] = []
818                        for t in task_deps['recideptask'][taskname].split():
819                            newdep = build_tid(mc, fn, t)
820                            recursiveitasks[tid].append(newdep)
821
822                self.runtaskentries[tid].depends = depends
823                # Remove all self references
824                self.runtaskentries[tid].depends.discard(tid)
825
826        #self.dump_data()
827
828        self.init_progress_reporter.next_stage()
829        bb.event.check_for_interrupts(self.cooker.data)
830
831        # Resolve recursive 'recrdeptask' dependencies (Part B)
832        #
833        # e.g. do_sometask[recrdeptask] = "do_someothertask"
834        # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
835        # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed
836
837        # Generating/interating recursive lists of dependencies is painful and potentially slow
838        # Precompute recursive task dependencies here by:
839        #     a) create a temp list of reverse dependencies (revdeps)
840        #     b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0)
841        #     c) combine the total list of dependencies in cumulativedeps
842        #     d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower)
843
844
845        revdeps = {}
846        deps = {}
847        cumulativedeps = {}
848        for tid in self.runtaskentries:
849            deps[tid] = set(self.runtaskentries[tid].depends)
850            revdeps[tid] = set()
851            cumulativedeps[tid] = set()
852        # Generate a temp list of reverse dependencies
853        for tid in self.runtaskentries:
854            for dep in self.runtaskentries[tid].depends:
855                revdeps[dep].add(tid)
856        # Find the dependency chain endpoints
857        endpoints = set()
858        for tid in self.runtaskentries:
859            if not deps[tid]:
860                endpoints.add(tid)
861        # Iterate the chains collating dependencies
862        while endpoints:
863            next = set()
864            for tid in endpoints:
865                for dep in revdeps[tid]:
866                    cumulativedeps[dep].add(fn_from_tid(tid))
867                    cumulativedeps[dep].update(cumulativedeps[tid])
868                    if tid in deps[dep]:
869                        deps[dep].remove(tid)
870                    if not deps[dep]:
871                        next.add(dep)
872            endpoints = next
873        #for tid in deps:
874        #    if deps[tid]:
875        #        bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid]))
876
877        # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to
878        # resolve these recursively until we aren't adding any further extra dependencies
879        extradeps = True
880        while extradeps:
881            extradeps = 0
882            for tid in recursivetasks:
883                tasknames = recursivetasks[tid]
884
885                totaldeps = set(self.runtaskentries[tid].depends)
886                if tid in recursiveitasks:
887                    totaldeps.update(recursiveitasks[tid])
888                    for dep in recursiveitasks[tid]:
889                        if dep not in self.runtaskentries:
890                            continue
891                        totaldeps.update(self.runtaskentries[dep].depends)
892
893                deps = set()
894                for dep in totaldeps:
895                    if dep in cumulativedeps:
896                        deps.update(cumulativedeps[dep])
897
898                for t in deps:
899                    for taskname in tasknames:
900                        newtid = t + ":" + taskname
901                        if newtid == tid:
902                            continue
903                        if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends:
904                            extradeps += 1
905                            self.runtaskentries[tid].depends.add(newtid)
906
907                # Handle recursive tasks which depend upon other recursive tasks
908                deps = set()
909                for dep in self.runtaskentries[tid].depends.intersection(recursivetasks):
910                    deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends))
911                for newtid in deps:
912                    for taskname in tasknames:
913                        if not newtid.endswith(":" + taskname):
914                            continue
915                        if newtid in self.runtaskentries:
916                            extradeps += 1
917                            self.runtaskentries[tid].depends.add(newtid)
918
919            bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps)
920
921        # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work
922        for tid in recursivetasksselfref:
923            self.runtaskentries[tid].depends.difference_update(recursivetasksselfref)
924
925        self.init_progress_reporter.next_stage()
926        bb.event.check_for_interrupts(self.cooker.data)
927
928        #self.dump_data()
929
930        # Step B - Mark all active tasks
931        #
932        # Start with the tasks we were asked to run and mark all dependencies
933        # as active too. If the task is to be 'forced', clear its stamp. Once
934        # all active tasks are marked, prune the ones we don't need.
935
936        logger.verbose("Marking Active Tasks")
937
938        def mark_active(tid, depth):
939            """
940            Mark an item as active along with its depends
941            (calls itself recursively)
942            """
943
944            if tid in runq_build:
945                return
946
947            runq_build[tid] = 1
948
949            depends = self.runtaskentries[tid].depends
950            for depend in depends:
951                mark_active(depend, depth+1)
952
953        def invalidate_task(tid, error_nostamp):
954            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
955            taskdep = self.dataCaches[mc].task_deps[taskfn]
956            if fn + ":" + taskname not in taskData[mc].taskentries:
957                logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname)
958            if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
959                if error_nostamp:
960                    bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
961                else:
962                    bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
963            else:
964                logger.verbose("Invalidate task %s, %s", taskname, fn)
965                bb.parse.siggen.invalidate_task(taskname, taskfn)
966
967        self.target_tids = []
968        for (mc, target, task, fn) in self.targets:
969
970            if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]:
971                continue
972
973            if target in taskData[mc].failed_deps:
974                continue
975
976            parents = False
977            if task.endswith('-'):
978                parents = True
979                task = task[:-1]
980
981            if fn in taskData[mc].failed_fns:
982                continue
983
984            # fn already has mc prefix
985            tid = fn + ":" + task
986            self.target_tids.append(tid)
987            if tid not in taskData[mc].taskentries:
988                import difflib
989                tasks = []
990                for x in taskData[mc].taskentries:
991                    if x.startswith(fn + ":"):
992                        tasks.append(taskname_from_tid(x))
993                close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7)
994                if close_matches:
995                    extra = ". Close matches:\n  %s" % "\n  ".join(close_matches)
996                else:
997                    extra = ""
998                bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra))
999
1000            # For tasks called "XXXX-", ony run their dependencies
1001            if parents:
1002                for i in self.runtaskentries[tid].depends:
1003                    mark_active(i, 1)
1004            else:
1005                mark_active(tid, 1)
1006
1007        self.init_progress_reporter.next_stage()
1008        bb.event.check_for_interrupts(self.cooker.data)
1009
1010        # Step C - Prune all inactive tasks
1011        #
1012        # Once all active tasks are marked, prune the ones we don't need.
1013
1014        # Handle --runall
1015        if self.cooker.configuration.runall:
1016            # re-run the mark_active and then drop unused tasks from new list
1017
1018            runall_tids = set()
1019            added = True
1020            while added:
1021                reduced_tasklist = set(self.runtaskentries.keys())
1022                for tid in list(self.runtaskentries.keys()):
1023                    if tid not in runq_build:
1024                       reduced_tasklist.remove(tid)
1025                runq_build = {}
1026
1027                orig = runall_tids
1028                runall_tids = set()
1029                for task in self.cooker.configuration.runall:
1030                    if not task.startswith("do_"):
1031                        task = "do_{0}".format(task)
1032                    for tid in reduced_tasklist:
1033                        wanttid = "{0}:{1}".format(fn_from_tid(tid), task)
1034                        if wanttid in self.runtaskentries:
1035                            runall_tids.add(wanttid)
1036
1037                    for tid in list(runall_tids):
1038                        mark_active(tid, 1)
1039                        self.target_tids.append(tid)
1040                        if self.cooker.configuration.force:
1041                            invalidate_task(tid, False)
1042                added = runall_tids - orig
1043
1044        delcount = set()
1045        for tid in list(self.runtaskentries.keys()):
1046            if tid not in runq_build:
1047                delcount.add(tid)
1048                del self.runtaskentries[tid]
1049
1050        if self.cooker.configuration.runall:
1051            if not self.runtaskentries:
1052                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets)))
1053
1054        self.init_progress_reporter.next_stage()
1055        bb.event.check_for_interrupts(self.cooker.data)
1056
1057        # Handle runonly
1058        if self.cooker.configuration.runonly:
1059            # re-run the mark_active and then drop unused tasks from new list
1060            runq_build = {}
1061
1062            for task in self.cooker.configuration.runonly:
1063                if not task.startswith("do_"):
1064                    task = "do_{0}".format(task)
1065                runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task]
1066
1067                for tid in runonly_tids:
1068                    mark_active(tid, 1)
1069                    if self.cooker.configuration.force:
1070                        invalidate_task(tid, False)
1071
1072            for tid in list(self.runtaskentries.keys()):
1073                if tid not in runq_build:
1074                    delcount.add(tid)
1075                    del self.runtaskentries[tid]
1076
1077            if not self.runtaskentries:
1078                bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets)))
1079
1080        #
1081        # Step D - Sanity checks and computation
1082        #
1083
1084        # Check to make sure we still have tasks to run
1085        if not self.runtaskentries:
1086            if not taskData[''].halt:
1087                bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
1088            else:
1089                bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
1090
1091        logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries))
1092
1093        logger.verbose("Assign Weightings")
1094
1095        self.init_progress_reporter.next_stage()
1096        bb.event.check_for_interrupts(self.cooker.data)
1097
1098        # Generate a list of reverse dependencies to ease future calculations
1099        for tid in self.runtaskentries:
1100            for dep in self.runtaskentries[tid].depends:
1101                self.runtaskentries[dep].revdeps.add(tid)
1102
1103        self.init_progress_reporter.next_stage()
1104        bb.event.check_for_interrupts(self.cooker.data)
1105
1106        # Identify tasks at the end of dependency chains
1107        # Error on circular dependency loops (length two)
1108        endpoints = []
1109        for tid in self.runtaskentries:
1110            revdeps = self.runtaskentries[tid].revdeps
1111            if not revdeps:
1112                endpoints.append(tid)
1113            for dep in revdeps:
1114                if dep in self.runtaskentries[tid].depends:
1115                    bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep))
1116
1117
1118        logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
1119
1120        self.init_progress_reporter.next_stage()
1121        bb.event.check_for_interrupts(self.cooker.data)
1122
1123        # Calculate task weights
1124        # Check of higher length circular dependencies
1125        self.runq_weight = self.calculate_task_weights(endpoints)
1126
1127        self.init_progress_reporter.next_stage()
1128        bb.event.check_for_interrupts(self.cooker.data)
1129
1130        # Sanity Check - Check for multiple tasks building the same provider
1131        for mc in self.dataCaches:
1132            prov_list = {}
1133            seen_fn = []
1134            for tid in self.runtaskentries:
1135                (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1136                if taskfn in seen_fn:
1137                    continue
1138                if mc != tidmc:
1139                    continue
1140                seen_fn.append(taskfn)
1141                for prov in self.dataCaches[mc].fn_provides[taskfn]:
1142                    if prov not in prov_list:
1143                        prov_list[prov] = [taskfn]
1144                    elif taskfn not in prov_list[prov]:
1145                        prov_list[prov].append(taskfn)
1146            for prov in prov_list:
1147                if len(prov_list[prov]) < 2:
1148                    continue
1149                if prov in self.multi_provider_allowed:
1150                    continue
1151                seen_pn = []
1152                # If two versions of the same PN are being built its fatal, we don't support it.
1153                for fn in prov_list[prov]:
1154                    pn = self.dataCaches[mc].pkg_fn[fn]
1155                    if pn not in seen_pn:
1156                        seen_pn.append(pn)
1157                    else:
1158                        bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn))
1159                msgs = ["Multiple .bb files are due to be built which each provide %s:\n  %s" % (prov, "\n  ".join(prov_list[prov]))]
1160                #
1161                # Construct a list of things which uniquely depend on each provider
1162                # since this may help the user figure out which dependency is triggering this warning
1163                #
1164                msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.")
1165                deplist = {}
1166                commondeps = None
1167                for provfn in prov_list[prov]:
1168                    deps = set()
1169                    for tid in self.runtaskentries:
1170                        fn = fn_from_tid(tid)
1171                        if fn != provfn:
1172                            continue
1173                        for dep in self.runtaskentries[tid].revdeps:
1174                            fn = fn_from_tid(dep)
1175                            if fn == provfn:
1176                                continue
1177                            deps.add(dep)
1178                    if not commondeps:
1179                        commondeps = set(deps)
1180                    else:
1181                        commondeps &= deps
1182                    deplist[provfn] = deps
1183                for provfn in deplist:
1184                    msgs.append("\n%s has unique dependees:\n  %s" % (provfn, "\n  ".join(deplist[provfn] - commondeps)))
1185                #
1186                # Construct a list of provides and runtime providers for each recipe
1187                # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC)
1188                #
1189                msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.")
1190                provide_results = {}
1191                rprovide_results = {}
1192                commonprovs = None
1193                commonrprovs = None
1194                for provfn in prov_list[prov]:
1195                    provides = set(self.dataCaches[mc].fn_provides[provfn])
1196                    rprovides = set()
1197                    for rprovide in self.dataCaches[mc].rproviders:
1198                        if provfn in self.dataCaches[mc].rproviders[rprovide]:
1199                            rprovides.add(rprovide)
1200                    for package in self.dataCaches[mc].packages:
1201                        if provfn in self.dataCaches[mc].packages[package]:
1202                            rprovides.add(package)
1203                    for package in self.dataCaches[mc].packages_dynamic:
1204                        if provfn in self.dataCaches[mc].packages_dynamic[package]:
1205                            rprovides.add(package)
1206                    if not commonprovs:
1207                        commonprovs = set(provides)
1208                    else:
1209                        commonprovs &= provides
1210                    provide_results[provfn] = provides
1211                    if not commonrprovs:
1212                        commonrprovs = set(rprovides)
1213                    else:
1214                        commonrprovs &= rprovides
1215                    rprovide_results[provfn] = rprovides
1216                #msgs.append("\nCommon provides:\n  %s" % ("\n  ".join(commonprovs)))
1217                #msgs.append("\nCommon rprovides:\n  %s" % ("\n  ".join(commonrprovs)))
1218                for provfn in prov_list[prov]:
1219                    msgs.append("\n%s has unique provides:\n  %s" % (provfn, "\n  ".join(provide_results[provfn] - commonprovs)))
1220                    msgs.append("\n%s has unique rprovides:\n  %s" % (provfn, "\n  ".join(rprovide_results[provfn] - commonrprovs)))
1221
1222                if self.warn_multi_bb:
1223                    logger.verbnote("".join(msgs))
1224                else:
1225                    logger.error("".join(msgs))
1226
1227        self.init_progress_reporter.next_stage()
1228        self.init_progress_reporter.next_stage()
1229        bb.event.check_for_interrupts(self.cooker.data)
1230
1231        # Iterate over the task list looking for tasks with a 'setscene' function
1232        self.runq_setscene_tids = set()
1233        if not self.cooker.configuration.nosetscene:
1234            for tid in self.runtaskentries:
1235                (mc, fn, taskname, _) = split_tid_mcfn(tid)
1236                setscenetid = tid + "_setscene"
1237                if setscenetid not in taskData[mc].taskentries:
1238                    continue
1239                self.runq_setscene_tids.add(tid)
1240
1241        self.init_progress_reporter.next_stage()
1242        bb.event.check_for_interrupts(self.cooker.data)
1243
1244        # Invalidate task if force mode active
1245        if self.cooker.configuration.force:
1246            for tid in self.target_tids:
1247                invalidate_task(tid, False)
1248
1249        # Invalidate task if invalidate mode active
1250        if self.cooker.configuration.invalidate_stamp:
1251            for tid in self.target_tids:
1252                fn = fn_from_tid(tid)
1253                for st in self.cooker.configuration.invalidate_stamp.split(','):
1254                    if not st.startswith("do_"):
1255                        st = "do_%s" % st
1256                    invalidate_task(fn + ":" + st, True)
1257
1258        self.init_progress_reporter.next_stage()
1259        bb.event.check_for_interrupts(self.cooker.data)
1260
1261        # Create and print to the logs a virtual/xxxx -> PN (fn) table
1262        for mc in taskData:
1263            virtmap = taskData[mc].get_providermap(prefix="virtual/")
1264            virtpnmap = {}
1265            for v in virtmap:
1266                virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]]
1267                bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v]))
1268            if hasattr(bb.parse.siggen, "tasks_resolved"):
1269                bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc])
1270
1271        self.init_progress_reporter.next_stage()
1272        bb.event.check_for_interrupts(self.cooker.data)
1273
1274        bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids)
1275
1276        # Iterate over the task list and call into the siggen code
1277        dealtwith = set()
1278        todeal = set(self.runtaskentries)
1279        while todeal:
1280            for tid in todeal.copy():
1281                if not (self.runtaskentries[tid].depends - dealtwith):
1282                    dealtwith.add(tid)
1283                    todeal.remove(tid)
1284                    self.prepare_task_hash(tid)
1285                bb.event.check_for_interrupts(self.cooker.data)
1286
1287        bb.parse.siggen.writeout_file_checksum_cache()
1288
1289        #self.dump_data()
1290        return len(self.runtaskentries)
1291
1292    def prepare_task_hash(self, tid):
1293        bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
1294        self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches)
1295        self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid)
1296
1297    def dump_data(self):
1298        """
1299        Dump some debug information on the internal data structures
1300        """
1301        logger.debug3("run_tasks:")
1302        for tid in self.runtaskentries:
1303            logger.debug3(" %s: %s   Deps %s RevDeps %s", tid,
1304                         self.runtaskentries[tid].weight,
1305                         self.runtaskentries[tid].depends,
1306                         self.runtaskentries[tid].revdeps)
1307
1308class RunQueueWorker():
1309    def __init__(self, process, pipe):
1310        self.process = process
1311        self.pipe = pipe
1312
1313class RunQueue:
1314    def __init__(self, cooker, cfgData, dataCaches, taskData, targets):
1315
1316        self.cooker = cooker
1317        self.cfgData = cfgData
1318        self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets)
1319
1320        self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None
1321        self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None
1322
1323        self.state = runQueuePrepare
1324
1325        # For disk space monitor
1326        # Invoked at regular time intervals via the bitbake heartbeat event
1327        # while the build is running. We generate a unique name for the handler
1328        # here, just in case that there ever is more than one RunQueue instance,
1329        # start the handler when reaching runQueueSceneInit, and stop it when
1330        # done with the build.
1331        self.dm = monitordisk.diskMonitor(cfgData)
1332        self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self))
1333        self.dm_event_handler_registered = False
1334        self.rqexe = None
1335        self.worker = {}
1336        self.fakeworker = {}
1337
1338    @staticmethod
1339    def send_pickled_data(worker, data, name):
1340        msg = bytearray()
1341        msg.extend(b"<" + name.encode() + b">")
1342        pickled_data = pickle.dumps(data)
1343        msg.extend(len(pickled_data).to_bytes(4, 'big'))
1344        msg.extend(pickled_data)
1345        msg.extend(b"</" + name.encode() + b">")
1346        worker.stdin.write(msg)
1347
1348    def _start_worker(self, mc, fakeroot = False, rqexec = None):
1349        logger.debug("Starting bitbake-worker")
1350        magic = "decafbad"
1351        if self.cooker.configuration.profile:
1352            magic = "decafbadbad"
1353        fakerootlogs = None
1354
1355        workerscript = os.path.realpath(os.path.dirname(__file__) + "/../../bin/bitbake-worker")
1356        if fakeroot:
1357            magic = magic + "beef"
1358            mcdata = self.cooker.databuilder.mcdata[mc]
1359            fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD"))
1360            fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split()
1361            env = os.environ.copy()
1362            for key, value in (var.split('=',1) for var in fakerootenv):
1363                env[key] = value
1364            worker = subprocess.Popen(fakerootcmd + [sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
1365            fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs
1366        else:
1367            worker = subprocess.Popen([sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1368        bb.utils.nonblockingfd(worker.stdout)
1369        workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs)
1370
1371        workerdata = {
1372            "sigdata" : bb.parse.siggen.get_taskdata(),
1373            "logdefaultlevel" : bb.msg.loggerDefaultLogLevel,
1374            "build_verbose_shell" : self.cooker.configuration.build_verbose_shell,
1375            "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout,
1376            "logdefaultdomain" : bb.msg.loggerDefaultDomains,
1377            "prhost" : self.cooker.prhost,
1378            "buildname" : self.cfgData.getVar("BUILDNAME"),
1379            "date" : self.cfgData.getVar("DATE"),
1380            "time" : self.cfgData.getVar("TIME"),
1381            "hashservaddr" : self.cooker.hashservaddr,
1382            "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"),
1383        }
1384
1385        RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig")
1386        RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata")
1387        RunQueue.send_pickled_data(worker, workerdata, "workerdata")
1388        worker.stdin.flush()
1389
1390        return RunQueueWorker(worker, workerpipe)
1391
1392    def _teardown_worker(self, worker):
1393        if not worker:
1394            return
1395        logger.debug("Teardown for bitbake-worker")
1396        try:
1397           RunQueue.send_pickled_data(worker.process, b"", "quit")
1398           worker.process.stdin.flush()
1399           worker.process.stdin.close()
1400        except IOError:
1401           pass
1402        while worker.process.returncode is None:
1403            worker.pipe.read()
1404            worker.process.poll()
1405        while worker.pipe.read():
1406            continue
1407        worker.pipe.close()
1408
1409    def start_worker(self, rqexec):
1410        if self.worker:
1411            self.teardown_workers()
1412        self.teardown = False
1413        for mc in self.rqdata.dataCaches:
1414            self.worker[mc] = self._start_worker(mc, False, rqexec)
1415
1416    def start_fakeworker(self, rqexec, mc):
1417        if not mc in self.fakeworker:
1418            self.fakeworker[mc] = self._start_worker(mc, True, rqexec)
1419
1420    def teardown_workers(self):
1421        self.teardown = True
1422        for mc in self.worker:
1423            self._teardown_worker(self.worker[mc])
1424        self.worker = {}
1425        for mc in self.fakeworker:
1426            self._teardown_worker(self.fakeworker[mc])
1427        self.fakeworker = {}
1428
1429    def read_workers(self):
1430        for mc in self.worker:
1431            self.worker[mc].pipe.read()
1432        for mc in self.fakeworker:
1433            self.fakeworker[mc].pipe.read()
1434
1435    def active_fds(self):
1436        fds = []
1437        for mc in self.worker:
1438            fds.append(self.worker[mc].pipe.input)
1439        for mc in self.fakeworker:
1440            fds.append(self.fakeworker[mc].pipe.input)
1441        return fds
1442
1443    def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None):
1444        def get_timestamp(f):
1445            try:
1446                if not os.access(f, os.F_OK):
1447                    return None
1448                return os.stat(f)[stat.ST_MTIME]
1449            except:
1450                return None
1451
1452        (mc, fn, tn, taskfn) = split_tid_mcfn(tid)
1453        if taskname is None:
1454            taskname = tn
1455
1456        stampfile = bb.parse.siggen.stampfile_mcfn(taskname, taskfn)
1457
1458        # If the stamp is missing, it's not current
1459        if not os.access(stampfile, os.F_OK):
1460            logger.debug2("Stampfile %s not available", stampfile)
1461            return False
1462        # If it's a 'nostamp' task, it's not current
1463        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1464        if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
1465            logger.debug2("%s.%s is nostamp\n", fn, taskname)
1466            return False
1467
1468        if taskname.endswith("_setscene"):
1469            return True
1470
1471        if cache is None:
1472            cache = {}
1473
1474        iscurrent = True
1475        t1 = get_timestamp(stampfile)
1476        for dep in self.rqdata.runtaskentries[tid].depends:
1477            if iscurrent:
1478                (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep)
1479                stampfile2 = bb.parse.siggen.stampfile_mcfn(taskname2, taskfn2)
1480                stampfile3 = bb.parse.siggen.stampfile_mcfn(taskname2 + "_setscene", taskfn2)
1481                t2 = get_timestamp(stampfile2)
1482                t3 = get_timestamp(stampfile3)
1483                if t3 and not t2:
1484                    continue
1485                if t3 and t3 > t2:
1486                    continue
1487                if fn == fn2:
1488                    if not t2:
1489                        logger.debug2('Stampfile %s does not exist', stampfile2)
1490                        iscurrent = False
1491                        break
1492                    if t1 < t2:
1493                        logger.debug2('Stampfile %s < %s', stampfile, stampfile2)
1494                        iscurrent = False
1495                        break
1496                    if recurse and iscurrent:
1497                        if dep in cache:
1498                            iscurrent = cache[dep]
1499                            if not iscurrent:
1500                                logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
1501                        else:
1502                            iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
1503                            cache[dep] = iscurrent
1504        if recurse:
1505            cache[tid] = iscurrent
1506        return iscurrent
1507
1508    def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True):
1509        valid = set()
1510        if self.hashvalidate:
1511            sq_data = {}
1512            sq_data['hash'] = {}
1513            sq_data['hashfn'] = {}
1514            sq_data['unihash'] = {}
1515            for tid in tocheck:
1516                (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1517                sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash
1518                sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn]
1519                sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash
1520
1521            valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary)
1522
1523        return valid
1524
1525    def validate_hash(self, sq_data, d, siginfo, currentcount, summary):
1526        locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary}
1527
1528        # Metadata has **kwargs so args can be added, sq_data can also gain new fields
1529        call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)"
1530
1531        return bb.utils.better_eval(call, locs)
1532
1533    def _execute_runqueue(self):
1534        """
1535        Run the tasks in a queue prepared by rqdata.prepare()
1536        Upon failure, optionally try to recover the build using any alternate providers
1537        (if the halt on failure configuration option isn't set)
1538        """
1539
1540        retval = True
1541        bb.event.check_for_interrupts(self.cooker.data)
1542
1543        if self.state is runQueuePrepare:
1544            # NOTE: if you add, remove or significantly refactor the stages of this
1545            # process then you should recalculate the weightings here. This is quite
1546            # easy to do - just change the next line temporarily to pass debug=True as
1547            # the last parameter and you'll get a printout of the weightings as well
1548            # as a map to the lines where next_stage() was called. Of course this isn't
1549            # critical, but it helps to keep the progress reporting accurate.
1550            self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data,
1551                                                            "Initialising tasks",
1552                                                            [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244])
1553            if self.rqdata.prepare() == 0:
1554                self.state = runQueueComplete
1555            else:
1556                self.state = runQueueSceneInit
1557                bb.parse.siggen.save_unitaskhashes()
1558
1559        if self.state is runQueueSceneInit:
1560            self.rqdata.init_progress_reporter.next_stage()
1561
1562            # we are ready to run,  emit dependency info to any UI or class which
1563            # needs it
1564            depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData)
1565            self.rqdata.init_progress_reporter.next_stage()
1566            bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data)
1567
1568            if not self.dm_event_handler_registered:
1569                 res = bb.event.register(self.dm_event_handler_name,
1570                                         lambda x, y: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False,
1571                                         ('bb.event.HeartbeatEvent',), data=self.cfgData)
1572                 self.dm_event_handler_registered = True
1573
1574            self.rqdata.init_progress_reporter.next_stage()
1575            self.rqexe = RunQueueExecute(self)
1576
1577            dump = self.cooker.configuration.dump_signatures
1578            if dump:
1579                self.rqdata.init_progress_reporter.finish()
1580                if 'printdiff' in dump:
1581                    invalidtasks = self.print_diffscenetasks()
1582                self.dump_signatures(dump)
1583                if 'printdiff' in dump:
1584                    self.write_diffscenetasks(invalidtasks)
1585                self.state = runQueueComplete
1586
1587        if self.state is runQueueSceneInit:
1588            self.start_worker(self.rqexe)
1589            self.rqdata.init_progress_reporter.finish()
1590
1591            # If we don't have any setscene functions, skip execution
1592            if not self.rqdata.runq_setscene_tids:
1593                logger.info('No setscene tasks')
1594                for tid in self.rqdata.runtaskentries:
1595                    if not self.rqdata.runtaskentries[tid].depends:
1596                        self.rqexe.setbuildable(tid)
1597                    self.rqexe.tasks_notcovered.add(tid)
1598                self.rqexe.sqdone = True
1599            logger.info('Executing Tasks')
1600            self.state = runQueueRunning
1601
1602        if self.state is runQueueRunning:
1603            retval = self.rqexe.execute()
1604
1605        if self.state is runQueueCleanUp:
1606            retval = self.rqexe.finish()
1607
1608        build_done = self.state is runQueueComplete or self.state is runQueueFailed
1609
1610        if build_done and self.dm_event_handler_registered:
1611            bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData)
1612            self.dm_event_handler_registered = False
1613
1614        if build_done and self.rqexe:
1615            bb.parse.siggen.save_unitaskhashes()
1616            self.teardown_workers()
1617            if self.rqexe:
1618                if self.rqexe.stats.failed:
1619                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
1620                else:
1621                    # Let's avoid the word "failed" if nothing actually did
1622                    logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
1623
1624        if self.state is runQueueFailed:
1625            raise bb.runqueue.TaskFailure(self.rqexe.failed_tids)
1626
1627        if self.state is runQueueComplete:
1628            # All done
1629            return False
1630
1631        # Loop
1632        return retval
1633
1634    def execute_runqueue(self):
1635        # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
1636        try:
1637            return self._execute_runqueue()
1638        except bb.runqueue.TaskFailure:
1639            raise
1640        except SystemExit:
1641            raise
1642        except bb.BBHandledException:
1643            try:
1644                self.teardown_workers()
1645            except:
1646                pass
1647            self.state = runQueueComplete
1648            raise
1649        except Exception as err:
1650            logger.exception("An uncaught exception occurred in runqueue")
1651            try:
1652                self.teardown_workers()
1653            except:
1654                pass
1655            self.state = runQueueComplete
1656            raise
1657
1658    def finish_runqueue(self, now = False):
1659        if not self.rqexe:
1660            self.state = runQueueComplete
1661            return
1662
1663        if now:
1664            self.rqexe.finish_now()
1665        else:
1666            self.rqexe.finish()
1667
1668    def _rq_dump_sigtid(self, tids):
1669        for tid in tids:
1670            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1671            dataCaches = self.rqdata.dataCaches
1672            bb.parse.siggen.dump_sigtask(taskfn, taskname, dataCaches[mc].stamp[taskfn], True)
1673
1674    def dump_signatures(self, options):
1675        if bb.cooker.CookerFeatures.RECIPE_SIGGEN_INFO not in self.cooker.featureset:
1676            bb.fatal("The dump signatures functionality needs the RECIPE_SIGGEN_INFO feature enabled")
1677
1678        bb.note("Writing task signature files")
1679
1680        max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1)
1681        def chunkify(l, n):
1682            return [l[i::n] for i in range(n)]
1683        tids = chunkify(list(self.rqdata.runtaskentries), max_process)
1684        # We cannot use the real multiprocessing.Pool easily due to some local data
1685        # that can't be pickled. This is a cheap multi-process solution.
1686        launched = []
1687        while tids:
1688            if len(launched) < max_process:
1689                p = Process(target=self._rq_dump_sigtid, args=(tids.pop(), ))
1690                p.start()
1691                launched.append(p)
1692            for q in launched:
1693                # The finished processes are joined when calling is_alive()
1694                if not q.is_alive():
1695                    launched.remove(q)
1696        for p in launched:
1697                p.join()
1698
1699        bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options)
1700
1701        return
1702
1703    def print_diffscenetasks(self):
1704        def get_root_invalid_tasks(task, taskdepends, valid, noexec, visited_invalid):
1705            invalidtasks = []
1706            for t in taskdepends[task].depends:
1707                if t not in valid and t not in visited_invalid:
1708                    invalidtasks.extend(get_root_invalid_tasks(t, taskdepends, valid, noexec, visited_invalid))
1709                    visited_invalid.add(t)
1710
1711            direct_invalid = [t for t in taskdepends[task].depends if t not in valid]
1712            if not direct_invalid and task not in noexec:
1713                invalidtasks = [task]
1714            return invalidtasks
1715
1716        noexec = []
1717        tocheck = set()
1718
1719        for tid in self.rqdata.runtaskentries:
1720            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1721            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
1722
1723            if 'noexec' in taskdep and taskname in taskdep['noexec']:
1724                noexec.append(tid)
1725                continue
1726
1727            tocheck.add(tid)
1728
1729        valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False)
1730
1731        # Tasks which are both setscene and noexec never care about dependencies
1732        # We therefore find tasks which are setscene and noexec and mark their
1733        # unique dependencies as valid.
1734        for tid in noexec:
1735            if tid not in self.rqdata.runq_setscene_tids:
1736                continue
1737            for dep in self.rqdata.runtaskentries[tid].depends:
1738                hasnoexecparents = True
1739                for dep2 in self.rqdata.runtaskentries[dep].revdeps:
1740                    if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec:
1741                        continue
1742                    hasnoexecparents = False
1743                    break
1744                if hasnoexecparents:
1745                    valid_new.add(dep)
1746
1747        invalidtasks = set()
1748
1749        toptasks = set(["{}:{}".format(t[3], t[2]) for t in self.rqdata.targets])
1750        for tid in toptasks:
1751            toprocess = set([tid])
1752            while toprocess:
1753                next = set()
1754                visited_invalid = set()
1755                for t in toprocess:
1756                    if t not in valid_new and t not in noexec:
1757                        invalidtasks.update(get_root_invalid_tasks(t, self.rqdata.runtaskentries, valid_new, noexec, visited_invalid))
1758                        continue
1759                    if t in self.rqdata.runq_setscene_tids:
1760                        for dep in self.rqexe.sqdata.sq_deps[t]:
1761                            next.add(dep)
1762                        continue
1763
1764                    for dep in self.rqdata.runtaskentries[t].depends:
1765                        next.add(dep)
1766
1767                toprocess = next
1768
1769        tasklist = []
1770        for tid in invalidtasks:
1771            tasklist.append(tid)
1772
1773        if tasklist:
1774            bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist))
1775
1776        return invalidtasks
1777
1778    def write_diffscenetasks(self, invalidtasks):
1779        bb.siggen.check_siggen_version(bb.siggen)
1780
1781        # Define recursion callback
1782        def recursecb(key, hash1, hash2):
1783            hashes = [hash1, hash2]
1784            bb.debug(1, "Recursively looking for recipe {} hashes {}".format(key, hashes))
1785            hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData)
1786            bb.debug(1, "Found hashfiles:\n{}".format(hashfiles))
1787
1788            recout = []
1789            if len(hashfiles) == 2:
1790                out2 = bb.siggen.compare_sigfiles(hashfiles[hash1]['path'], hashfiles[hash2]['path'], recursecb)
1791                recout.extend(list('    ' + l for l in out2))
1792            else:
1793                recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2))
1794
1795            return recout
1796
1797
1798        for tid in invalidtasks:
1799            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1800            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
1801            h = self.rqdata.runtaskentries[tid].unihash
1802            bb.debug(1, "Looking for recipe {} task {}".format(pn, taskname))
1803            matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc])
1804            bb.debug(1, "Found hashfiles:\n{}".format(matches))
1805            match = None
1806            for m in matches.values():
1807                if h in m['path']:
1808                    match = m['path']
1809            if match is None:
1810                bb.fatal("Can't find a task we're supposed to have written out? (hash: %s tid: %s)?" % (h, tid))
1811            matches = {k : v for k, v in iter(matches.items()) if h not in k}
1812            matches_local = {k : v for k, v in iter(matches.items()) if h not in k and not v['sstate']}
1813            if matches_local:
1814                matches = matches_local
1815            if matches:
1816                latestmatch = matches[sorted(matches.keys(), key=lambda h: matches[h]['time'])[-1]]['path']
1817                prevh = __find_sha256__.search(latestmatch).group(0)
1818                output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb)
1819                bb.plain("\nTask %s:%s couldn't be used from the cache because:\n  We need hash %s, most recent matching task was %s\n  " % (pn, taskname, h, prevh) + '\n  '.join(output))
1820
1821
1822class RunQueueExecute:
1823
1824    def __init__(self, rq):
1825        self.rq = rq
1826        self.cooker = rq.cooker
1827        self.cfgData = rq.cfgData
1828        self.rqdata = rq.rqdata
1829
1830        self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1)
1831        self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed"
1832        self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU")
1833        self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO")
1834        self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY")
1835        self.max_loadfactor = self.cfgData.getVar("BB_LOADFACTOR_MAX")
1836
1837        self.sq_buildable = set()
1838        self.sq_running = set()
1839        self.sq_live = set()
1840
1841        self.updated_taskhash_queue = []
1842        self.pending_migrations = set()
1843
1844        self.runq_buildable = set()
1845        self.runq_running = set()
1846        self.runq_complete = set()
1847        self.runq_tasksrun = set()
1848
1849        self.build_stamps = {}
1850        self.build_stamps2 = []
1851        self.failed_tids = []
1852        self.sq_deferred = {}
1853        self.sq_needed_harddeps = set()
1854        self.sq_harddep_deferred = set()
1855
1856        self.stampcache = {}
1857
1858        self.holdoff_tasks = set()
1859        self.holdoff_need_update = True
1860        self.sqdone = False
1861
1862        self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids))
1863
1864        if self.number_tasks <= 0:
1865             bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks)
1866
1867        lower_limit = 1.0
1868        upper_limit = 1000000.0
1869        if self.max_cpu_pressure:
1870            self.max_cpu_pressure = float(self.max_cpu_pressure)
1871            if self.max_cpu_pressure < lower_limit:
1872                bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit))
1873            if self.max_cpu_pressure > upper_limit:
1874                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure))
1875
1876        if self.max_io_pressure:
1877            self.max_io_pressure = float(self.max_io_pressure)
1878            if self.max_io_pressure < lower_limit:
1879                bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit))
1880            if self.max_io_pressure > upper_limit:
1881                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
1882
1883        if self.max_memory_pressure:
1884            self.max_memory_pressure = float(self.max_memory_pressure)
1885            if self.max_memory_pressure < lower_limit:
1886                bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit))
1887            if self.max_memory_pressure > upper_limit:
1888                bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure))
1889
1890        if self.max_loadfactor:
1891            self.max_loadfactor = float(self.max_loadfactor)
1892            if self.max_loadfactor <= 0:
1893                bb.fatal("Invalid BB_LOADFACTOR_MAX %s, needs to be greater than zero." % (self.max_loadfactor))
1894
1895        # List of setscene tasks which we've covered
1896        self.scenequeue_covered = set()
1897        # List of tasks which are covered (including setscene ones)
1898        self.tasks_covered = set()
1899        self.tasks_scenequeue_done = set()
1900        self.scenequeue_notcovered = set()
1901        self.tasks_notcovered = set()
1902        self.scenequeue_notneeded = set()
1903
1904        schedulers = self.get_schedulers()
1905        for scheduler in schedulers:
1906            if self.scheduler == scheduler.name:
1907                self.sched = scheduler(self, self.rqdata)
1908                logger.debug("Using runqueue scheduler '%s'", scheduler.name)
1909                break
1910        else:
1911            bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
1912                     (self.scheduler, ", ".join(obj.name for obj in schedulers)))
1913
1914        #if self.rqdata.runq_setscene_tids:
1915        self.sqdata = SQData()
1916        build_scenequeue_data(self.sqdata, self.rqdata, self)
1917
1918        update_scenequeue_data(self.sqdata.sq_revdeps, self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=True)
1919
1920        # Compute a list of 'stale' sstate tasks where the current hash does not match the one
1921        # in any stamp files. Pass the list out to metadata as an event.
1922        found = {}
1923        for tid in self.rqdata.runq_setscene_tids:
1924            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
1925            stamps = bb.build.find_stale_stamps(taskname, taskfn)
1926            if stamps:
1927                if mc not in found:
1928                    found[mc] = {}
1929                found[mc][tid] = stamps
1930        for mc in found:
1931            event = bb.event.StaleSetSceneTasks(found[mc])
1932            bb.event.fire(event, self.cooker.databuilder.mcdata[mc])
1933
1934        self.build_taskdepdata_cache()
1935
1936    def runqueue_process_waitpid(self, task, status, fakerootlog=None):
1937
1938        # self.build_stamps[pid] may not exist when use shared work directory.
1939        if task in self.build_stamps:
1940            self.build_stamps2.remove(self.build_stamps[task])
1941            del self.build_stamps[task]
1942
1943        if task in self.sq_live:
1944            if status != 0:
1945                self.sq_task_fail(task, status)
1946            else:
1947                self.sq_task_complete(task)
1948            self.sq_live.remove(task)
1949            self.stats.updateActiveSetscene(len(self.sq_live))
1950        else:
1951            if status != 0:
1952                self.task_fail(task, status, fakerootlog=fakerootlog)
1953            else:
1954                self.task_complete(task)
1955        return True
1956
1957    def finish_now(self):
1958        for mc in self.rq.worker:
1959            try:
1960                RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow")
1961                self.rq.worker[mc].process.stdin.flush()
1962            except IOError:
1963                # worker must have died?
1964                pass
1965        for mc in self.rq.fakeworker:
1966            try:
1967                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow")
1968                self.rq.fakeworker[mc].process.stdin.flush()
1969            except IOError:
1970                # worker must have died?
1971                pass
1972
1973        if self.failed_tids:
1974            self.rq.state = runQueueFailed
1975            return
1976
1977        self.rq.state = runQueueComplete
1978        return
1979
1980    def finish(self):
1981        self.rq.state = runQueueCleanUp
1982
1983        active = self.stats.active + len(self.sq_live)
1984        if active > 0:
1985            bb.event.fire(runQueueExitWait(active), self.cfgData)
1986            self.rq.read_workers()
1987            return self.rq.active_fds()
1988
1989        if self.failed_tids:
1990            self.rq.state = runQueueFailed
1991            return True
1992
1993        self.rq.state = runQueueComplete
1994        return True
1995
1996    # Used by setscene only
1997    def check_dependencies(self, task, taskdeps):
1998        if not self.rq.depvalidate:
1999            return False
2000
2001        # Must not edit parent data
2002        taskdeps = set(taskdeps)
2003
2004        taskdata = {}
2005        taskdeps.add(task)
2006        for dep in taskdeps:
2007            (mc, fn, taskname, taskfn) = split_tid_mcfn(dep)
2008            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2009            taskdata[dep] = [pn, taskname, fn]
2010        call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
2011        locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data }
2012        valid = bb.utils.better_eval(call, locs)
2013        return valid
2014
2015    def can_start_task(self):
2016        active = self.stats.active + len(self.sq_live)
2017        can_start = active < self.number_tasks
2018        return can_start
2019
2020    def get_schedulers(self):
2021        schedulers = set(obj for obj in globals().values()
2022                             if type(obj) is type and
2023                                issubclass(obj, RunQueueScheduler))
2024
2025        user_schedulers = self.cfgData.getVar("BB_SCHEDULERS")
2026        if user_schedulers:
2027            for sched in user_schedulers.split():
2028                if not "." in sched:
2029                    bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
2030                    continue
2031
2032                modname, name = sched.rsplit(".", 1)
2033                try:
2034                    module = __import__(modname, fromlist=(name,))
2035                except ImportError as exc:
2036                    bb.fatal("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
2037                else:
2038                    schedulers.add(getattr(module, name))
2039        return schedulers
2040
2041    def setbuildable(self, task):
2042        self.runq_buildable.add(task)
2043        self.sched.newbuildable(task)
2044
2045    def task_completeoutright(self, task):
2046        """
2047        Mark a task as completed
2048        Look at the reverse dependencies and mark any task with
2049        completed dependencies as buildable
2050        """
2051        self.runq_complete.add(task)
2052        for revdep in self.rqdata.runtaskentries[task].revdeps:
2053            if revdep in self.runq_running:
2054                continue
2055            if revdep in self.runq_buildable:
2056                continue
2057            alldeps = True
2058            for dep in self.rqdata.runtaskentries[revdep].depends:
2059                if dep not in self.runq_complete:
2060                    alldeps = False
2061                    break
2062            if alldeps:
2063                self.setbuildable(revdep)
2064                logger.debug("Marking task %s as buildable", revdep)
2065
2066        found = None
2067        for t in sorted(self.sq_deferred.copy()):
2068            if self.sq_deferred[t] == task:
2069                # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task.
2070                # We shouldn't allow all to run at once as it is prone to races.
2071                if not found:
2072                    bb.debug(1, "Deferred task %s now buildable" % t)
2073                    del self.sq_deferred[t]
2074                    update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
2075                    found = t
2076                else:
2077                    bb.debug(1, "Deferring %s after %s" % (t, found))
2078                    self.sq_deferred[t] = found
2079
2080    def task_complete(self, task):
2081        self.stats.taskCompleted()
2082        bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
2083        self.task_completeoutright(task)
2084        self.runq_tasksrun.add(task)
2085
2086    def task_fail(self, task, exitcode, fakerootlog=None):
2087        """
2088        Called when a task has failed
2089        Updates the state engine with the failure
2090        """
2091        self.stats.taskFailed()
2092        self.failed_tids.append(task)
2093
2094        fakeroot_log = []
2095        if fakerootlog and os.path.exists(fakerootlog):
2096            with open(fakerootlog) as fakeroot_log_file:
2097                fakeroot_failed = False
2098                for line in reversed(fakeroot_log_file.readlines()):
2099                    for fakeroot_error in ['mismatch', 'error', 'fatal']:
2100                        if fakeroot_error in line.lower():
2101                            fakeroot_failed = True
2102                    if 'doing new pid setup and server start' in line:
2103                        break
2104                    fakeroot_log.append(line)
2105
2106            if not fakeroot_failed:
2107                fakeroot_log = []
2108
2109        bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData)
2110
2111        if self.rqdata.taskData[''].halt:
2112            self.rq.state = runQueueCleanUp
2113
2114    def task_skip(self, task, reason):
2115        self.runq_running.add(task)
2116        self.setbuildable(task)
2117        bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData)
2118        self.task_completeoutright(task)
2119        self.stats.taskSkipped()
2120        self.stats.taskCompleted()
2121
2122    def summarise_scenequeue_errors(self):
2123        err = False
2124        if not self.sqdone:
2125            logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered)))
2126            completeevent = sceneQueueComplete(self.stats, self.rq)
2127            bb.event.fire(completeevent, self.cfgData)
2128        if self.sq_deferred:
2129            logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred))
2130            err = True
2131        if self.updated_taskhash_queue:
2132            logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue))
2133            err = True
2134        if self.holdoff_tasks:
2135            logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks))
2136            err = True
2137
2138        for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered):
2139            # No task should end up in both covered and uncovered, that is a bug.
2140            logger.error("Setscene task %s in both covered and notcovered." % tid)
2141
2142        for tid in self.rqdata.runq_setscene_tids:
2143            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2144                err = True
2145                logger.error("Setscene Task %s was never marked as covered or not covered" % tid)
2146            if tid not in self.sq_buildable:
2147                err = True
2148                logger.error("Setscene Task %s was never marked as buildable" % tid)
2149            if tid not in self.sq_running:
2150                err = True
2151                logger.error("Setscene Task %s was never marked as running" % tid)
2152
2153        for x in self.rqdata.runtaskentries:
2154            if x not in self.tasks_covered and x not in self.tasks_notcovered:
2155                logger.error("Task %s was never moved from the setscene queue" % x)
2156                err = True
2157            if x not in self.tasks_scenequeue_done:
2158                logger.error("Task %s was never processed by the setscene code" % x)
2159                err = True
2160            if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable:
2161                logger.error("Task %s was never marked as buildable by the setscene code" % x)
2162                err = True
2163        return err
2164
2165
2166    def execute(self):
2167        """
2168        Run the tasks in a queue prepared by prepare_runqueue
2169        """
2170
2171        self.rq.read_workers()
2172        if self.updated_taskhash_queue or self.pending_migrations:
2173            self.process_possible_migrations()
2174
2175        if not hasattr(self, "sorted_setscene_tids"):
2176            # Don't want to sort this set every execution
2177            self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids)
2178
2179        task = None
2180        if not self.sqdone and self.can_start_task():
2181            # Find the next setscene to run
2182            for nexttask in self.sorted_setscene_tids:
2183                if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values() and nexttask not in self.sq_harddep_deferred:
2184                    if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and \
2185                            nexttask not in self.sq_needed_harddeps and \
2186                            self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and \
2187                            self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]):
2188                        if nexttask not in self.rqdata.target_tids:
2189                            logger.debug2("Skipping setscene for task %s" % nexttask)
2190                            self.sq_task_skip(nexttask)
2191                            self.scenequeue_notneeded.add(nexttask)
2192                            if nexttask in self.sq_deferred:
2193                                del self.sq_deferred[nexttask]
2194                            return True
2195                    if nexttask in self.sqdata.sq_harddeps_rev and not self.sqdata.sq_harddeps_rev[nexttask].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2196                        logger.debug2("Deferring %s due to hard dependencies" % nexttask)
2197                        updated = False
2198                        for dep in self.sqdata.sq_harddeps_rev[nexttask]:
2199                            if dep not in self.sq_needed_harddeps:
2200                                logger.debug2("Enabling task %s as it is a hard dependency" % dep)
2201                                self.sq_buildable.add(dep)
2202                                self.sq_needed_harddeps.add(dep)
2203                                updated = True
2204                        self.sq_harddep_deferred.add(nexttask)
2205                        if updated:
2206                            return True
2207                        continue
2208                    # If covered tasks are running, need to wait for them to complete
2209                    for t in self.sqdata.sq_covered_tasks[nexttask]:
2210                        if t in self.runq_running and t not in self.runq_complete:
2211                            continue
2212                    if nexttask in self.sq_deferred:
2213                        if self.sq_deferred[nexttask] not in self.runq_complete:
2214                            continue
2215                        logger.debug("Task %s no longer deferred" % nexttask)
2216                        del self.sq_deferred[nexttask]
2217                        valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False)
2218                        if not valid:
2219                            logger.debug("%s didn't become valid, skipping setscene" % nexttask)
2220                            self.sq_task_failoutright(nexttask)
2221                            return True
2222                    if nexttask in self.sqdata.outrightfail:
2223                        logger.debug2('No package found, so skipping setscene task %s', nexttask)
2224                        self.sq_task_failoutright(nexttask)
2225                        return True
2226                    if nexttask in self.sqdata.unskippable:
2227                        logger.debug2("Setscene task %s is unskippable" % nexttask)
2228                    task = nexttask
2229                    break
2230        if task is not None:
2231            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2232            taskname = taskname + "_setscene"
2233            if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache):
2234                logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task)
2235                self.sq_task_failoutright(task)
2236                return True
2237
2238            if self.cooker.configuration.force:
2239                if task in self.rqdata.target_tids:
2240                    self.sq_task_failoutright(task)
2241                    return True
2242
2243            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2244                logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task)
2245                self.sq_task_skip(task)
2246                return True
2247
2248            if self.cooker.configuration.skipsetscene:
2249                logger.debug2('No setscene tasks should be executed. Skipping %s', task)
2250                self.sq_task_failoutright(task)
2251                return True
2252
2253            startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
2254            bb.event.fire(startevent, self.cfgData)
2255
2256            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2257            realfn = bb.cache.virtualfn2realfn(taskfn)[0]
2258            runtask = {
2259                'fn' : taskfn,
2260                'task' : task,
2261                'taskname' : taskname,
2262                'taskhash' : self.rqdata.get_task_hash(task),
2263                'unihash' : self.rqdata.get_task_unihash(task),
2264                'quieterrors' : True,
2265                'appends' : self.cooker.collections[mc].get_file_appends(taskfn),
2266                'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2],
2267                'taskdepdata' : self.sq_build_taskdepdata(task),
2268                'dry_run' : False,
2269                'taskdep': taskdep,
2270                'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn],
2271                'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn],
2272                'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn]
2273            }
2274
2275            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run:
2276                if not mc in self.rq.fakeworker:
2277                    self.rq.start_fakeworker(self, mc)
2278                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
2279                self.rq.fakeworker[mc].process.stdin.flush()
2280            else:
2281                RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
2282                self.rq.worker[mc].process.stdin.flush()
2283
2284            self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2285            self.build_stamps2.append(self.build_stamps[task])
2286            self.sq_running.add(task)
2287            self.sq_live.add(task)
2288            self.stats.updateActiveSetscene(len(self.sq_live))
2289            if self.can_start_task():
2290                return True
2291
2292        self.update_holdofftasks()
2293
2294        if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks:
2295            hashequiv_logger.verbose("Setscene tasks completed")
2296
2297            err = self.summarise_scenequeue_errors()
2298            if err:
2299                self.rq.state = runQueueFailed
2300                return True
2301
2302            if self.cooker.configuration.setsceneonly:
2303                self.rq.state = runQueueComplete
2304                return True
2305            self.sqdone = True
2306
2307            if self.stats.total == 0:
2308                # nothing to do
2309                self.rq.state = runQueueComplete
2310                return True
2311
2312        if self.cooker.configuration.setsceneonly:
2313            task = None
2314        else:
2315            task = self.sched.next()
2316        if task is not None:
2317            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2318
2319            if self.rqdata.setscene_ignore_tasks is not None:
2320                if self.check_setscene_ignore_tasks(task):
2321                    self.task_fail(task, "setscene ignore_tasks")
2322                    return True
2323
2324            if task in self.tasks_covered:
2325                logger.debug2("Setscene covered task %s", task)
2326                self.task_skip(task, "covered")
2327                return True
2328
2329            if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
2330                logger.debug2("Stamp current task %s", task)
2331
2332                self.task_skip(task, "existing")
2333                self.runq_tasksrun.add(task)
2334                return True
2335
2336            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2337            if 'noexec' in taskdep and taskname in taskdep['noexec']:
2338                startevent = runQueueTaskStarted(task, self.stats, self.rq,
2339                                                 noexec=True)
2340                bb.event.fire(startevent, self.cfgData)
2341                self.runq_running.add(task)
2342                self.stats.taskActive()
2343                if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2344                    bb.build.make_stamp_mcfn(taskname, taskfn)
2345                self.task_complete(task)
2346                return True
2347            else:
2348                startevent = runQueueTaskStarted(task, self.stats, self.rq)
2349                bb.event.fire(startevent, self.cfgData)
2350
2351            taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2352            realfn = bb.cache.virtualfn2realfn(taskfn)[0]
2353            runtask = {
2354                'fn' : taskfn,
2355                'task' : task,
2356                'taskname' : taskname,
2357                'taskhash' : self.rqdata.get_task_hash(task),
2358                'unihash' : self.rqdata.get_task_unihash(task),
2359                'quieterrors' : False,
2360                'appends' : self.cooker.collections[mc].get_file_appends(taskfn),
2361                'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2],
2362                'taskdepdata' : self.build_taskdepdata(task),
2363                'dry_run' : self.rqdata.setscene_enforce,
2364                'taskdep': taskdep,
2365                'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn],
2366                'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn],
2367                'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn]
2368            }
2369
2370            if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce):
2371                if not mc in self.rq.fakeworker:
2372                    try:
2373                        self.rq.start_fakeworker(self, mc)
2374                    except OSError as exc:
2375                        logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc)))
2376                        self.rq.state = runQueueFailed
2377                        self.stats.taskFailed()
2378                        return True
2379                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask")
2380                self.rq.fakeworker[mc].process.stdin.flush()
2381            else:
2382                RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask")
2383                self.rq.worker[mc].process.stdin.flush()
2384
2385            self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2386            self.build_stamps2.append(self.build_stamps[task])
2387            self.runq_running.add(task)
2388            self.stats.taskActive()
2389            if self.can_start_task():
2390                return True
2391
2392        if self.stats.active > 0 or self.sq_live:
2393            self.rq.read_workers()
2394            return self.rq.active_fds()
2395
2396        # No more tasks can be run. If we have deferred setscene tasks we should run them.
2397        if self.sq_deferred:
2398            deferred_tid = list(self.sq_deferred.keys())[0]
2399            blocking_tid = self.sq_deferred.pop(deferred_tid)
2400            logger.warning("Runqueue deadlocked on deferred tasks, forcing task %s blocked by %s" % (deferred_tid, blocking_tid))
2401            return True
2402
2403        if self.failed_tids:
2404            self.rq.state = runQueueFailed
2405            return True
2406
2407        # Sanity Checks
2408        err = self.summarise_scenequeue_errors()
2409        for task in self.rqdata.runtaskentries:
2410            if task not in self.runq_buildable:
2411                logger.error("Task %s never buildable!", task)
2412                err = True
2413            elif task not in self.runq_running:
2414                logger.error("Task %s never ran!", task)
2415                err = True
2416            elif task not in self.runq_complete:
2417                logger.error("Task %s never completed!", task)
2418                err = True
2419
2420        if err:
2421            self.rq.state = runQueueFailed
2422        else:
2423            self.rq.state = runQueueComplete
2424
2425        return True
2426
2427    def filtermcdeps(self, task, mc, deps):
2428        ret = set()
2429        for dep in deps:
2430            thismc = mc_from_tid(dep)
2431            if thismc != mc:
2432                continue
2433            ret.add(dep)
2434        return ret
2435
2436    # Build the individual cache entries in advance once to save time
2437    def build_taskdepdata_cache(self):
2438        taskdepdata_cache = {}
2439        for task in self.rqdata.runtaskentries:
2440            (mc, fn, taskname, taskfn) = split_tid_mcfn(task)
2441            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2442            deps = self.rqdata.runtaskentries[task].depends
2443            provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
2444            taskhash = self.rqdata.runtaskentries[task].hash
2445            unihash = self.rqdata.runtaskentries[task].unihash
2446            deps = self.filtermcdeps(task, mc, deps)
2447            hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn]
2448            taskdepdata_cache[task] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn]
2449
2450        self.taskdepdata_cache = taskdepdata_cache
2451
2452    # We filter out multiconfig dependencies from taskdepdata we pass to the tasks
2453    # as most code can't handle them
2454    def build_taskdepdata(self, task):
2455        taskdepdata = {}
2456        mc = mc_from_tid(task)
2457        next = self.rqdata.runtaskentries[task].depends.copy()
2458        next.add(task)
2459        next = self.filtermcdeps(task, mc, next)
2460        while next:
2461            additional = []
2462            for revdep in next:
2463                self.taskdepdata_cache[revdep][6] = self.rqdata.runtaskentries[revdep].unihash
2464                taskdepdata[revdep] = self.taskdepdata_cache[revdep]
2465                for revdep2 in self.taskdepdata_cache[revdep][3]:
2466                    if revdep2 not in taskdepdata:
2467                        additional.append(revdep2)
2468            next = additional
2469
2470        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2471        return taskdepdata
2472
2473    def update_holdofftasks(self):
2474
2475        if not self.holdoff_need_update:
2476            return
2477
2478        notcovered = set(self.scenequeue_notcovered)
2479        notcovered |= self.sqdata.cantskip
2480        for tid in self.scenequeue_notcovered:
2481            notcovered |= self.sqdata.sq_covered_tasks[tid]
2482        notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids)
2483        notcovered.intersection_update(self.tasks_scenequeue_done)
2484
2485        covered = set(self.scenequeue_covered)
2486        for tid in self.scenequeue_covered:
2487            covered |= self.sqdata.sq_covered_tasks[tid]
2488        covered.difference_update(notcovered)
2489        covered.intersection_update(self.tasks_scenequeue_done)
2490
2491        for tid in notcovered | covered:
2492            if not self.rqdata.runtaskentries[tid].depends:
2493                self.setbuildable(tid)
2494            elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete):
2495                 self.setbuildable(tid)
2496
2497        self.tasks_covered = covered
2498        self.tasks_notcovered = notcovered
2499
2500        self.holdoff_tasks = set()
2501
2502        for tid in self.rqdata.runq_setscene_tids:
2503            if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered:
2504                self.holdoff_tasks.add(tid)
2505
2506        for tid in self.holdoff_tasks.copy():
2507            for dep in self.sqdata.sq_covered_tasks[tid]:
2508                if dep not in self.runq_complete:
2509                    self.holdoff_tasks.add(dep)
2510
2511        self.holdoff_need_update = False
2512
2513    def process_possible_migrations(self):
2514
2515        changed = set()
2516        toprocess = set()
2517        for tid, unihash in self.updated_taskhash_queue.copy():
2518            if tid in self.runq_running and tid not in self.runq_complete:
2519                continue
2520
2521            self.updated_taskhash_queue.remove((tid, unihash))
2522
2523            if unihash != self.rqdata.runtaskentries[tid].unihash:
2524                # Make sure we rehash any other tasks with the same task hash that we're deferred against.
2525                torehash = [tid]
2526                for deftid in self.sq_deferred:
2527                    if self.sq_deferred[deftid] == tid:
2528                        torehash.append(deftid)
2529                for hashtid in torehash:
2530                    hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash))
2531                    self.rqdata.runtaskentries[hashtid].unihash = unihash
2532                    bb.parse.siggen.set_unihash(hashtid, unihash)
2533                    toprocess.add(hashtid)
2534                if torehash:
2535                    # Need to save after set_unihash above
2536                    bb.parse.siggen.save_unitaskhashes()
2537
2538        # Work out all tasks which depend upon these
2539        total = set()
2540        next = set()
2541        for p in toprocess:
2542            next |= self.rqdata.runtaskentries[p].revdeps
2543        while next:
2544            current = next.copy()
2545            total = total | next
2546            next = set()
2547            for ntid in current:
2548                next |= self.rqdata.runtaskentries[ntid].revdeps
2549            next.difference_update(total)
2550
2551        # Now iterate those tasks in dependency order to regenerate their taskhash/unihash
2552        next = set()
2553        for p in total:
2554            if not self.rqdata.runtaskentries[p].depends:
2555                next.add(p)
2556            elif self.rqdata.runtaskentries[p].depends.isdisjoint(total):
2557                next.add(p)
2558
2559        # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled
2560        while next:
2561            current = next.copy()
2562            next = set()
2563            for tid in current:
2564                if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total):
2565                    continue
2566                orighash = self.rqdata.runtaskentries[tid].hash
2567                newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches)
2568                origuni = self.rqdata.runtaskentries[tid].unihash
2569                newuni = bb.parse.siggen.get_unihash(tid)
2570                # FIXME, need to check it can come from sstate at all for determinism?
2571                remapped = False
2572                if newuni == origuni:
2573                    # Nothing to do, we match, skip code below
2574                    remapped = True
2575                elif tid in self.scenequeue_covered or tid in self.sq_live:
2576                    # Already ran this setscene task or it running. Report the new taskhash
2577                    bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches)
2578                    hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid))
2579                    remapped = True
2580
2581                if not remapped:
2582                    #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni))
2583                    self.rqdata.runtaskentries[tid].hash = newhash
2584                    self.rqdata.runtaskentries[tid].unihash = newuni
2585                    changed.add(tid)
2586
2587                next |= self.rqdata.runtaskentries[tid].revdeps
2588                total.remove(tid)
2589                next.intersection_update(total)
2590
2591        if changed:
2592            for mc in self.rq.worker:
2593                RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
2594            for mc in self.rq.fakeworker:
2595                RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes")
2596
2597            hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed)))
2598
2599        for tid in changed:
2600            if tid not in self.rqdata.runq_setscene_tids:
2601                continue
2602            if tid not in self.pending_migrations:
2603                self.pending_migrations.add(tid)
2604
2605        update_tasks = []
2606        for tid in self.pending_migrations.copy():
2607            if tid in self.runq_running or tid in self.sq_live:
2608                # Too late, task already running, not much we can do now
2609                self.pending_migrations.remove(tid)
2610                continue
2611
2612            valid = True
2613            # Check no tasks this covers are running
2614            for dep in self.sqdata.sq_covered_tasks[tid]:
2615                if dep in self.runq_running and dep not in self.runq_complete:
2616                    hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid))
2617                    valid = False
2618                    break
2619            if not valid:
2620                continue
2621
2622            self.pending_migrations.remove(tid)
2623            changed = True
2624
2625            if tid in self.tasks_scenequeue_done:
2626                self.tasks_scenequeue_done.remove(tid)
2627            for dep in self.sqdata.sq_covered_tasks[tid]:
2628                if dep in self.runq_complete and dep not in self.runq_tasksrun:
2629                    bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep)
2630                    self.failed_tids.append(tid)
2631                    self.rq.state = runQueueCleanUp
2632                    return
2633
2634                if dep not in self.runq_complete:
2635                    if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable:
2636                        self.tasks_scenequeue_done.remove(dep)
2637
2638            if tid in self.sq_buildable:
2639                self.sq_buildable.remove(tid)
2640            if tid in self.sq_running:
2641                self.sq_running.remove(tid)
2642            if tid in self.sqdata.outrightfail:
2643                self.sqdata.outrightfail.remove(tid)
2644            if tid in self.scenequeue_notcovered:
2645                self.scenequeue_notcovered.remove(tid)
2646            if tid in self.scenequeue_covered:
2647                self.scenequeue_covered.remove(tid)
2648            if tid in self.scenequeue_notneeded:
2649                self.scenequeue_notneeded.remove(tid)
2650
2651            (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2652            self.sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2653
2654            if tid in self.stampcache:
2655                del self.stampcache[tid]
2656
2657            if tid in self.build_stamps:
2658                del self.build_stamps[tid]
2659
2660            update_tasks.append(tid)
2661
2662        update_tasks2 = []
2663        for tid in update_tasks:
2664            harddepfail = False
2665            for t in self.sqdata.sq_harddeps_rev[tid]:
2666                if t in self.scenequeue_notcovered:
2667                    harddepfail = True
2668                    break
2669            if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2670                if tid not in self.sq_buildable:
2671                    self.sq_buildable.add(tid)
2672            if not self.sqdata.sq_revdeps[tid]:
2673                self.sq_buildable.add(tid)
2674
2675            update_tasks2.append((tid, harddepfail, tid in self.sqdata.valid))
2676
2677        if update_tasks2:
2678            self.sqdone = False
2679            for mc in sorted(self.sqdata.multiconfigs):
2680                for tid in sorted([t[0] for t in update_tasks2]):
2681                    if mc_from_tid(tid) != mc:
2682                        continue
2683                    h = pending_hash_index(tid, self.rqdata)
2684                    if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]:
2685                        self.sq_deferred[tid] = self.sqdata.hashes[h]
2686                        bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h]))
2687            update_scenequeue_data([t[0] for t in update_tasks2], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False)
2688
2689        for (tid, harddepfail, origvalid) in update_tasks2:
2690            if tid in self.sqdata.valid and not origvalid:
2691                hashequiv_logger.verbose("Setscene task %s became valid" % tid)
2692            if harddepfail:
2693                logger.debug2("%s has an unavailable hard dependency so skipping" % (tid))
2694                self.sq_task_failoutright(tid)
2695
2696        if changed:
2697            self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
2698            self.sq_needed_harddeps = set()
2699            self.sq_harddep_deferred = set()
2700            self.holdoff_need_update = True
2701
2702    def scenequeue_updatecounters(self, task, fail=False):
2703
2704        if fail and task in self.sqdata.sq_harddeps:
2705            for dep in sorted(self.sqdata.sq_harddeps[task]):
2706                if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered:
2707                    # dependency could be already processed, e.g. noexec setscene task
2708                    continue
2709                noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache)
2710                if noexec or stamppresent:
2711                    continue
2712                logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep))
2713                self.sq_task_failoutright(dep)
2714                continue
2715        for dep in sorted(self.sqdata.sq_deps[task]):
2716            if self.sqdata.sq_revdeps[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2717                if dep not in self.sq_buildable:
2718                    self.sq_buildable.add(dep)
2719
2720        next = set([task])
2721        while next:
2722            new = set()
2723            for t in sorted(next):
2724                self.tasks_scenequeue_done.add(t)
2725                # Look down the dependency chain for non-setscene things which this task depends on
2726                # and mark as 'done'
2727                for dep in self.rqdata.runtaskentries[t].depends:
2728                    if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done:
2729                        continue
2730                    if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done):
2731                        new.add(dep)
2732            next = new
2733
2734        # If this task was one which other setscene tasks have a hard dependency upon, we need
2735        # to walk through the hard dependencies and allow execution of those which have completed dependencies.
2736        if task in self.sqdata.sq_harddeps:
2737            for dep in self.sq_harddep_deferred.copy():
2738                if self.sqdata.sq_harddeps_rev[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered):
2739                    self.sq_harddep_deferred.remove(dep)
2740
2741        self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered))
2742        self.holdoff_need_update = True
2743
2744    def sq_task_completeoutright(self, task):
2745        """
2746        Mark a task as completed
2747        Look at the reverse dependencies and mark any task with
2748        completed dependencies as buildable
2749        """
2750
2751        logger.debug('Found task %s which could be accelerated', task)
2752        self.scenequeue_covered.add(task)
2753        self.scenequeue_updatecounters(task)
2754
2755    def sq_check_taskfail(self, task):
2756        if self.rqdata.setscene_ignore_tasks is not None:
2757            realtask = task.split('_setscene')[0]
2758            (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask)
2759            pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2760            if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
2761                logger.error('Task %s.%s failed' % (pn, taskname + "_setscene"))
2762                self.rq.state = runQueueCleanUp
2763
2764    def sq_task_complete(self, task):
2765        bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
2766        self.sq_task_completeoutright(task)
2767
2768    def sq_task_fail(self, task, result):
2769        bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
2770        self.scenequeue_notcovered.add(task)
2771        self.scenequeue_updatecounters(task, True)
2772        self.sq_check_taskfail(task)
2773
2774    def sq_task_failoutright(self, task):
2775        self.sq_running.add(task)
2776        self.sq_buildable.add(task)
2777        self.scenequeue_notcovered.add(task)
2778        self.scenequeue_updatecounters(task, True)
2779
2780    def sq_task_skip(self, task):
2781        self.sq_running.add(task)
2782        self.sq_buildable.add(task)
2783        self.sq_task_completeoutright(task)
2784
2785    def sq_build_taskdepdata(self, task):
2786        def getsetscenedeps(tid):
2787            deps = set()
2788            (mc, fn, taskname, _) = split_tid_mcfn(tid)
2789            realtid = tid + "_setscene"
2790            idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends
2791            for (depname, idependtask) in idepends:
2792                if depname not in self.rqdata.taskData[mc].build_targets:
2793                    continue
2794
2795                depfn = self.rqdata.taskData[mc].build_targets[depname][0]
2796                if depfn is None:
2797                     continue
2798                deptid = depfn + ":" + idependtask.replace("_setscene", "")
2799                deps.add(deptid)
2800            return deps
2801
2802        taskdepdata = {}
2803        next = getsetscenedeps(task)
2804        next.add(task)
2805        while next:
2806            additional = []
2807            for revdep in next:
2808                (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep)
2809                pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2810                deps = getsetscenedeps(revdep)
2811                provides = self.rqdata.dataCaches[mc].fn_provides[taskfn]
2812                taskhash = self.rqdata.runtaskentries[revdep].hash
2813                unihash = self.rqdata.runtaskentries[revdep].unihash
2814                hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn]
2815                taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn]
2816                for revdep2 in deps:
2817                    if revdep2 not in taskdepdata:
2818                        additional.append(revdep2)
2819            next = additional
2820
2821        #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n"))
2822        return taskdepdata
2823
2824    def check_setscene_ignore_tasks(self, tid):
2825        # Check task that is going to run against the ignore tasks list
2826        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2827        # Ignore covered tasks
2828        if tid in self.tasks_covered:
2829            return False
2830        # Ignore stamped tasks
2831        if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache):
2832            return False
2833        # Ignore noexec tasks
2834        taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn]
2835        if 'noexec' in taskdep and taskname in taskdep['noexec']:
2836            return False
2837
2838        pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn]
2839        if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks):
2840            if tid in self.rqdata.runq_setscene_tids:
2841                msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)]
2842            else:
2843                msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)]
2844            for t in self.scenequeue_notcovered:
2845                msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash))
2846            msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered))
2847            logger.error("".join(msg))
2848            return True
2849        return False
2850
2851class SQData(object):
2852    def __init__(self):
2853        # SceneQueue dependencies
2854        self.sq_deps = {}
2855        # SceneQueue reverse dependencies
2856        self.sq_revdeps = {}
2857        # Injected inter-setscene task dependencies
2858        self.sq_harddeps = {}
2859        self.sq_harddeps_rev = {}
2860        # Cache of stamp files so duplicates can't run in parallel
2861        self.stamps = {}
2862        # Setscene tasks directly depended upon by the build
2863        self.unskippable = set()
2864        # List of setscene tasks which aren't present
2865        self.outrightfail = set()
2866        # A list of normal tasks a setscene task covers
2867        self.sq_covered_tasks = {}
2868
2869def build_scenequeue_data(sqdata, rqdata, sqrq):
2870
2871    sq_revdeps = {}
2872    sq_revdeps_squash = {}
2873    sq_collated_deps = {}
2874
2875    # We can't skip specified target tasks which aren't setscene tasks
2876    sqdata.cantskip = set(rqdata.target_tids)
2877    sqdata.cantskip.difference_update(rqdata.runq_setscene_tids)
2878    sqdata.cantskip.intersection_update(rqdata.runtaskentries)
2879
2880    # We need to construct a dependency graph for the setscene functions. Intermediate
2881    # dependencies between the setscene tasks only complicate the code. This code
2882    # therefore aims to collapse the huge runqueue dependency tree into a smaller one
2883    # only containing the setscene functions.
2884
2885    rqdata.init_progress_reporter.next_stage()
2886
2887    # First process the chains up to the first setscene task.
2888    endpoints = {}
2889    for tid in rqdata.runtaskentries:
2890        sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps)
2891        sq_revdeps_squash[tid] = set()
2892        if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids:
2893            #bb.warn("Added endpoint %s" % (tid))
2894            endpoints[tid] = set()
2895
2896    rqdata.init_progress_reporter.next_stage()
2897
2898    # Secondly process the chains between setscene tasks.
2899    for tid in rqdata.runq_setscene_tids:
2900        sq_collated_deps[tid] = set()
2901        #bb.warn("Added endpoint 2 %s" % (tid))
2902        for dep in rqdata.runtaskentries[tid].depends:
2903                if tid in sq_revdeps[dep]:
2904                    sq_revdeps[dep].remove(tid)
2905                if dep not in endpoints:
2906                    endpoints[dep] = set()
2907                #bb.warn("  Added endpoint 3 %s" % (dep))
2908                endpoints[dep].add(tid)
2909
2910    rqdata.init_progress_reporter.next_stage()
2911
2912    def process_endpoints(endpoints):
2913        newendpoints = {}
2914        for point, task in endpoints.items():
2915            tasks = set()
2916            if task:
2917                tasks |= task
2918            if sq_revdeps_squash[point]:
2919                tasks |= sq_revdeps_squash[point]
2920            if point not in rqdata.runq_setscene_tids:
2921                for t in tasks:
2922                    sq_collated_deps[t].add(point)
2923            sq_revdeps_squash[point] = set()
2924            if point in rqdata.runq_setscene_tids:
2925                sq_revdeps_squash[point] = tasks
2926                continue
2927            for dep in rqdata.runtaskentries[point].depends:
2928                if point in sq_revdeps[dep]:
2929                    sq_revdeps[dep].remove(point)
2930                if tasks:
2931                    sq_revdeps_squash[dep] |= tasks
2932                if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids:
2933                    newendpoints[dep] = task
2934        if newendpoints:
2935            process_endpoints(newendpoints)
2936
2937    process_endpoints(endpoints)
2938
2939    rqdata.init_progress_reporter.next_stage()
2940
2941    # Build a list of tasks which are "unskippable"
2942    # These are direct endpoints referenced by the build upto and including setscene tasks
2943    # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon
2944    new = True
2945    for tid in rqdata.runtaskentries:
2946        if not rqdata.runtaskentries[tid].revdeps:
2947            sqdata.unskippable.add(tid)
2948    sqdata.unskippable |= sqdata.cantskip
2949    while new:
2950        new = False
2951        orig = sqdata.unskippable.copy()
2952        for tid in sorted(orig, reverse=True):
2953            if tid in rqdata.runq_setscene_tids:
2954                continue
2955            if not rqdata.runtaskentries[tid].depends:
2956                # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable
2957                sqrq.setbuildable(tid)
2958            sqdata.unskippable |= rqdata.runtaskentries[tid].depends
2959            if sqdata.unskippable != orig:
2960                new = True
2961
2962    sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids)
2963
2964    rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries))
2965
2966    # Sanity check all dependencies could be changed to setscene task references
2967    for taskcounter, tid in enumerate(rqdata.runtaskentries):
2968        if tid in rqdata.runq_setscene_tids:
2969            pass
2970        elif sq_revdeps_squash[tid]:
2971            bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.")
2972        else:
2973            del sq_revdeps_squash[tid]
2974        rqdata.init_progress_reporter.update(taskcounter)
2975
2976    rqdata.init_progress_reporter.next_stage()
2977
2978    # Resolve setscene inter-task dependencies
2979    # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
2980    # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
2981    for tid in rqdata.runq_setscene_tids:
2982        (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
2983        realtid = tid + "_setscene"
2984        idepends = rqdata.taskData[mc].taskentries[realtid].idepends
2985        sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False)
2986
2987        sqdata.sq_harddeps_rev[tid] = set()
2988        for (depname, idependtask) in idepends:
2989
2990            if depname not in rqdata.taskData[mc].build_targets:
2991                continue
2992
2993            depfn = rqdata.taskData[mc].build_targets[depname][0]
2994            if depfn is None:
2995                continue
2996            deptid = depfn + ":" + idependtask.replace("_setscene", "")
2997            if deptid not in rqdata.runtaskentries:
2998                bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask))
2999
3000            logger.debug2("Adding hard setscene dependency %s for %s" % (deptid, tid))
3001
3002            if not deptid in sqdata.sq_harddeps:
3003                sqdata.sq_harddeps[deptid] = set()
3004            sqdata.sq_harddeps[deptid].add(tid)
3005            sqdata.sq_harddeps_rev[tid].add(deptid)
3006
3007    rqdata.init_progress_reporter.next_stage()
3008
3009    rqdata.init_progress_reporter.next_stage()
3010
3011    #for tid in sq_revdeps_squash:
3012    #    data = ""
3013    #    for dep in sq_revdeps_squash[tid]:
3014    #        data = data + "\n   %s" % dep
3015    #    bb.warn("Task %s_setscene: is %s " % (tid, data))
3016
3017    sqdata.sq_revdeps = sq_revdeps_squash
3018    sqdata.sq_covered_tasks = sq_collated_deps
3019
3020    # Build reverse version of revdeps to populate deps structure
3021    for tid in sqdata.sq_revdeps:
3022        sqdata.sq_deps[tid] = set()
3023    for tid in sqdata.sq_revdeps:
3024        for dep in sqdata.sq_revdeps[tid]:
3025            sqdata.sq_deps[dep].add(tid)
3026
3027    rqdata.init_progress_reporter.next_stage()
3028
3029    sqdata.multiconfigs = set()
3030    for tid in sqdata.sq_revdeps:
3031        sqdata.multiconfigs.add(mc_from_tid(tid))
3032        if not sqdata.sq_revdeps[tid]:
3033            sqrq.sq_buildable.add(tid)
3034
3035    rqdata.init_progress_reporter.next_stage()
3036
3037    sqdata.noexec = set()
3038    sqdata.stamppresent = set()
3039    sqdata.valid = set()
3040
3041    sqdata.hashes = {}
3042    sqrq.sq_deferred = {}
3043    for mc in sorted(sqdata.multiconfigs):
3044        for tid in sorted(sqdata.sq_revdeps):
3045            if mc_from_tid(tid) != mc:
3046                continue
3047            h = pending_hash_index(tid, rqdata)
3048            if h not in sqdata.hashes:
3049                sqdata.hashes[h] = tid
3050            else:
3051                sqrq.sq_deferred[tid] = sqdata.hashes[h]
3052                bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h]))
3053
3054def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False):
3055
3056    (mc, fn, taskname, taskfn) = split_tid_mcfn(tid)
3057
3058    taskdep = rqdata.dataCaches[mc].task_deps[taskfn]
3059
3060    if 'noexec' in taskdep and taskname in taskdep['noexec']:
3061        bb.build.make_stamp_mcfn(taskname + "_setscene", taskfn)
3062        return True, False
3063
3064    if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache):
3065        logger.debug2('Setscene stamp current for task %s', tid)
3066        return False, True
3067
3068    if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache):
3069        logger.debug2('Normal stamp current for task %s', tid)
3070        return False, True
3071
3072    return False, False
3073
3074def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True):
3075
3076    tocheck = set()
3077
3078    for tid in sorted(tids):
3079        if tid in sqdata.stamppresent:
3080            sqdata.stamppresent.remove(tid)
3081        if tid in sqdata.valid:
3082            sqdata.valid.remove(tid)
3083        if tid in sqdata.outrightfail:
3084            sqdata.outrightfail.remove(tid)
3085
3086        noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True)
3087
3088        if noexec:
3089            sqdata.noexec.add(tid)
3090            sqrq.sq_task_skip(tid)
3091            logger.debug2("%s is noexec so skipping setscene" % (tid))
3092            continue
3093
3094        if stamppresent:
3095            sqdata.stamppresent.add(tid)
3096            sqrq.sq_task_skip(tid)
3097            logger.debug2("%s has a valid stamp, skipping" % (tid))
3098            continue
3099
3100        tocheck.add(tid)
3101
3102    sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary)
3103
3104    for tid in tids:
3105        if tid in sqdata.stamppresent:
3106            continue
3107        if tid in sqdata.valid:
3108            continue
3109        if tid in sqdata.noexec:
3110            continue
3111        if tid in sqrq.scenequeue_covered:
3112            continue
3113        if tid in sqrq.scenequeue_notcovered:
3114            continue
3115        if tid in sqrq.sq_deferred:
3116            continue
3117        sqdata.outrightfail.add(tid)
3118        logger.debug2("%s already handled (fallthrough), skipping" % (tid))
3119
3120class TaskFailure(Exception):
3121    """
3122    Exception raised when a task in a runqueue fails
3123    """
3124    def __init__(self, x):
3125        self.args = x
3126
3127
3128class runQueueExitWait(bb.event.Event):
3129    """
3130    Event when waiting for task processes to exit
3131    """
3132
3133    def __init__(self, remain):
3134        self.remain = remain
3135        self.message = "Waiting for %s active tasks to finish" % remain
3136        bb.event.Event.__init__(self)
3137
3138class runQueueEvent(bb.event.Event):
3139    """
3140    Base runQueue event class
3141    """
3142    def __init__(self, task, stats, rq):
3143        self.taskid = task
3144        self.taskstring = task
3145        self.taskname = taskname_from_tid(task)
3146        self.taskfile = fn_from_tid(task)
3147        self.taskhash = rq.rqdata.get_task_hash(task)
3148        self.stats = stats.copy()
3149        bb.event.Event.__init__(self)
3150
3151class sceneQueueEvent(runQueueEvent):
3152    """
3153    Base sceneQueue event class
3154    """
3155    def __init__(self, task, stats, rq, noexec=False):
3156        runQueueEvent.__init__(self, task, stats, rq)
3157        self.taskstring = task + "_setscene"
3158        self.taskname = taskname_from_tid(task) + "_setscene"
3159        self.taskfile = fn_from_tid(task)
3160        self.taskhash = rq.rqdata.get_task_hash(task)
3161
3162class runQueueTaskStarted(runQueueEvent):
3163    """
3164    Event notifying a task was started
3165    """
3166    def __init__(self, task, stats, rq, noexec=False):
3167        runQueueEvent.__init__(self, task, stats, rq)
3168        self.noexec = noexec
3169
3170class sceneQueueTaskStarted(sceneQueueEvent):
3171    """
3172    Event notifying a setscene task was started
3173    """
3174    def __init__(self, task, stats, rq, noexec=False):
3175        sceneQueueEvent.__init__(self, task, stats, rq)
3176        self.noexec = noexec
3177
3178class runQueueTaskFailed(runQueueEvent):
3179    """
3180    Event notifying a task failed
3181    """
3182    def __init__(self, task, stats, exitcode, rq, fakeroot_log=None):
3183        runQueueEvent.__init__(self, task, stats, rq)
3184        self.exitcode = exitcode
3185        self.fakeroot_log = fakeroot_log
3186
3187    def __str__(self):
3188        if self.fakeroot_log:
3189            return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log)
3190        else:
3191            return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode)
3192
3193class sceneQueueTaskFailed(sceneQueueEvent):
3194    """
3195    Event notifying a setscene task failed
3196    """
3197    def __init__(self, task, stats, exitcode, rq):
3198        sceneQueueEvent.__init__(self, task, stats, rq)
3199        self.exitcode = exitcode
3200
3201    def __str__(self):
3202        return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode)
3203
3204class sceneQueueComplete(sceneQueueEvent):
3205    """
3206    Event when all the sceneQueue tasks are complete
3207    """
3208    def __init__(self, stats, rq):
3209        self.stats = stats.copy()
3210        bb.event.Event.__init__(self)
3211
3212class runQueueTaskCompleted(runQueueEvent):
3213    """
3214    Event notifying a task completed
3215    """
3216
3217class sceneQueueTaskCompleted(sceneQueueEvent):
3218    """
3219    Event notifying a setscene task completed
3220    """
3221
3222class runQueueTaskSkipped(runQueueEvent):
3223    """
3224    Event notifying a task was skipped
3225    """
3226    def __init__(self, task, stats, rq, reason):
3227        runQueueEvent.__init__(self, task, stats, rq)
3228        self.reason = reason
3229
3230class taskUniHashUpdate(bb.event.Event):
3231    """
3232    Base runQueue event class
3233    """
3234    def __init__(self, task, unihash):
3235        self.taskid = task
3236        self.unihash = unihash
3237        bb.event.Event.__init__(self)
3238
3239class runQueuePipe():
3240    """
3241    Abstraction for a pipe between a worker thread and the server
3242    """
3243    def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None):
3244        self.input = pipein
3245        if pipeout:
3246            pipeout.close()
3247        bb.utils.nonblockingfd(self.input)
3248        self.queue = bytearray()
3249        self.d = d
3250        self.rq = rq
3251        self.rqexec = rqexec
3252        self.fakerootlogs = fakerootlogs
3253
3254    def read(self):
3255        for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]:
3256            for worker in workers.values():
3257                worker.process.poll()
3258                if worker.process.returncode is not None and not self.rq.teardown:
3259                    bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode)))
3260                    self.rq.finish_runqueue(True)
3261
3262        start = len(self.queue)
3263        try:
3264            self.queue.extend(self.input.read(102400) or b"")
3265        except (OSError, IOError) as e:
3266            if e.errno != errno.EAGAIN:
3267                raise
3268        end = len(self.queue)
3269        found = True
3270        while found and self.queue:
3271            found = False
3272            index = self.queue.find(b"</event>")
3273            while index != -1 and self.queue.startswith(b"<event>"):
3274                try:
3275                    event = pickle.loads(self.queue[7:index])
3276                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3277                    if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e):
3278                        # The pickled data could contain "</event>" so search for the next occurance
3279                        # unpickling again, this should be the only way an unpickle error could occur
3280                        index = self.queue.find(b"</event>", index + 1)
3281                        continue
3282                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index]))
3283                bb.event.fire_from_worker(event, self.d)
3284                if isinstance(event, taskUniHashUpdate):
3285                    self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash))
3286                found = True
3287                self.queue = self.queue[index+8:]
3288                index = self.queue.find(b"</event>")
3289            index = self.queue.find(b"</exitcode>")
3290            while index != -1 and self.queue.startswith(b"<exitcode>"):
3291                try:
3292                    task, status = pickle.loads(self.queue[10:index])
3293                except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e:
3294                    bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index]))
3295                (_, _, _, taskfn) = split_tid_mcfn(task)
3296                fakerootlog = None
3297                if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs:
3298                    fakerootlog = self.fakerootlogs[taskfn]
3299                self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog)
3300                found = True
3301                self.queue = self.queue[index+11:]
3302                index = self.queue.find(b"</exitcode>")
3303        return (end > start)
3304
3305    def close(self):
3306        while self.read():
3307            continue
3308        if self.queue:
3309            print("Warning, worker left partial message: %s" % self.queue)
3310        self.input.close()
3311
3312def get_setscene_enforce_ignore_tasks(d, targets):
3313    if d.getVar('BB_SETSCENE_ENFORCE') != '1':
3314        return None
3315    ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split()
3316    outlist = []
3317    for item in ignore_tasks[:]:
3318        if item.startswith('%:'):
3319            for (mc, target, task, fn) in targets:
3320                outlist.append(target + ':' + item.split(':')[1])
3321        else:
3322            outlist.append(item)
3323    return outlist
3324
3325def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks):
3326    import fnmatch
3327    if ignore_tasks is not None:
3328        item = '%s:%s' % (pn, taskname)
3329        for ignore_tasks in ignore_tasks:
3330            if fnmatch.fnmatch(item, ignore_tasks):
3331                return True
3332        return False
3333    return True
3334