1""" 2BitBake 'RunQueue' implementation 3 4Handles preparation and execution of a queue of tasks 5""" 6 7# Copyright (C) 2006-2007 Richard Purdie 8# 9# SPDX-License-Identifier: GPL-2.0-only 10# 11 12import copy 13import os 14import sys 15import stat 16import errno 17import itertools 18import logging 19import re 20import bb 21from bb import msg, event 22from bb import monitordisk 23import subprocess 24import pickle 25from multiprocessing import Process 26import shlex 27import pprint 28import time 29 30bblogger = logging.getLogger("BitBake") 31logger = logging.getLogger("BitBake.RunQueue") 32hashequiv_logger = logging.getLogger("BitBake.RunQueue.HashEquiv") 33 34__find_sha256__ = re.compile( r'(?i)(?<![a-z0-9])[a-f0-9]{64}(?![a-z0-9])' ) 35 36def fn_from_tid(tid): 37 return tid.rsplit(":", 1)[0] 38 39def taskname_from_tid(tid): 40 return tid.rsplit(":", 1)[1] 41 42def mc_from_tid(tid): 43 if tid.startswith('mc:') and tid.count(':') >= 2: 44 return tid.split(':')[1] 45 return "" 46 47def split_tid(tid): 48 (mc, fn, taskname, _) = split_tid_mcfn(tid) 49 return (mc, fn, taskname) 50 51def split_mc(n): 52 if n.startswith("mc:") and n.count(':') >= 2: 53 _, mc, n = n.split(":", 2) 54 return (mc, n) 55 return ('', n) 56 57def split_tid_mcfn(tid): 58 if tid.startswith('mc:') and tid.count(':') >= 2: 59 elems = tid.split(':') 60 mc = elems[1] 61 fn = ":".join(elems[2:-1]) 62 taskname = elems[-1] 63 mcfn = "mc:" + mc + ":" + fn 64 else: 65 tid = tid.rsplit(":", 1) 66 mc = "" 67 fn = tid[0] 68 taskname = tid[1] 69 mcfn = fn 70 71 return (mc, fn, taskname, mcfn) 72 73def build_tid(mc, fn, taskname): 74 if mc: 75 return "mc:" + mc + ":" + fn + ":" + taskname 76 return fn + ":" + taskname 77 78# Index used to pair up potentially matching multiconfig tasks 79# We match on PN, taskname and hash being equal 80def pending_hash_index(tid, rqdata): 81 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 82 pn = rqdata.dataCaches[mc].pkg_fn[taskfn] 83 h = rqdata.runtaskentries[tid].unihash 84 return pn + ":" + "taskname" + h 85 86class RunQueueStats: 87 """ 88 Holds statistics on the tasks handled by the associated runQueue 89 """ 90 def __init__(self, total, setscene_total): 91 self.completed = 0 92 self.skipped = 0 93 self.failed = 0 94 self.active = 0 95 self.setscene_active = 0 96 self.setscene_covered = 0 97 self.setscene_notcovered = 0 98 self.setscene_total = setscene_total 99 self.total = total 100 101 def copy(self): 102 obj = self.__class__(self.total, self.setscene_total) 103 obj.__dict__.update(self.__dict__) 104 return obj 105 106 def taskFailed(self): 107 self.active = self.active - 1 108 self.failed = self.failed + 1 109 110 def taskCompleted(self): 111 self.active = self.active - 1 112 self.completed = self.completed + 1 113 114 def taskSkipped(self): 115 self.active = self.active + 1 116 self.skipped = self.skipped + 1 117 118 def taskActive(self): 119 self.active = self.active + 1 120 121 def updateCovered(self, covered, notcovered): 122 self.setscene_covered = covered 123 self.setscene_notcovered = notcovered 124 125 def updateActiveSetscene(self, active): 126 self.setscene_active = active 127 128# These values indicate the next step due to be run in the 129# runQueue state machine 130runQueuePrepare = 2 131runQueueSceneInit = 3 132runQueueDumpSigs = 4 133runQueueRunning = 6 134runQueueFailed = 7 135runQueueCleanUp = 8 136runQueueComplete = 9 137 138class RunQueueScheduler(object): 139 """ 140 Control the order tasks are scheduled in. 141 """ 142 name = "basic" 143 144 def __init__(self, runqueue, rqdata): 145 """ 146 The default scheduler just returns the first buildable task (the 147 priority map is sorted by task number) 148 """ 149 self.rq = runqueue 150 self.rqdata = rqdata 151 self.numTasks = len(self.rqdata.runtaskentries) 152 153 self.prio_map = [self.rqdata.runtaskentries.keys()] 154 155 self.buildable = set() 156 self.skip_maxthread = {} 157 self.stamps = {} 158 for tid in self.rqdata.runtaskentries: 159 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 160 self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 161 if tid in self.rq.runq_buildable: 162 self.buildable.add(tid) 163 164 self.rev_prio_map = None 165 self.is_pressure_usable() 166 167 def is_pressure_usable(self): 168 """ 169 If monitoring pressure, return True if pressure files can be open and read. For example 170 openSUSE /proc/pressure/* files have readable file permissions but when read the error EOPNOTSUPP (Operation not supported) 171 is returned. 172 """ 173 if self.rq.max_cpu_pressure or self.rq.max_io_pressure or self.rq.max_memory_pressure: 174 try: 175 with open("/proc/pressure/cpu") as cpu_pressure_fds, \ 176 open("/proc/pressure/io") as io_pressure_fds, \ 177 open("/proc/pressure/memory") as memory_pressure_fds: 178 179 self.prev_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] 180 self.prev_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] 181 self.prev_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] 182 self.prev_pressure_time = time.time() 183 self.check_pressure = True 184 except: 185 bb.note("The /proc/pressure files can't be read. Continuing build without monitoring pressure") 186 self.check_pressure = False 187 else: 188 self.check_pressure = False 189 190 def exceeds_max_pressure(self): 191 """ 192 Monitor the difference in total pressure at least once per second, if 193 BB_PRESSURE_MAX_{CPU|IO|MEMORY} are set, return True if above threshold. 194 """ 195 if self.check_pressure: 196 with open("/proc/pressure/cpu") as cpu_pressure_fds, \ 197 open("/proc/pressure/io") as io_pressure_fds, \ 198 open("/proc/pressure/memory") as memory_pressure_fds: 199 # extract "total" from /proc/pressure/{cpu|io} 200 curr_cpu_pressure = cpu_pressure_fds.readline().split()[4].split("=")[1] 201 curr_io_pressure = io_pressure_fds.readline().split()[4].split("=")[1] 202 curr_memory_pressure = memory_pressure_fds.readline().split()[4].split("=")[1] 203 now = time.time() 204 tdiff = now - self.prev_pressure_time 205 psi_accumulation_interval = 1.0 206 cpu_pressure = (float(curr_cpu_pressure) - float(self.prev_cpu_pressure)) / tdiff 207 io_pressure = (float(curr_io_pressure) - float(self.prev_io_pressure)) / tdiff 208 memory_pressure = (float(curr_memory_pressure) - float(self.prev_memory_pressure)) / tdiff 209 exceeds_cpu_pressure = self.rq.max_cpu_pressure and cpu_pressure > self.rq.max_cpu_pressure 210 exceeds_io_pressure = self.rq.max_io_pressure and io_pressure > self.rq.max_io_pressure 211 exceeds_memory_pressure = self.rq.max_memory_pressure and memory_pressure > self.rq.max_memory_pressure 212 213 if tdiff > psi_accumulation_interval: 214 self.prev_cpu_pressure = curr_cpu_pressure 215 self.prev_io_pressure = curr_io_pressure 216 self.prev_memory_pressure = curr_memory_pressure 217 self.prev_pressure_time = now 218 219 pressure_state = (exceeds_cpu_pressure, exceeds_io_pressure, exceeds_memory_pressure) 220 pressure_values = (round(cpu_pressure,1), self.rq.max_cpu_pressure, round(io_pressure,1), self.rq.max_io_pressure, round(memory_pressure,1), self.rq.max_memory_pressure) 221 if hasattr(self, "pressure_state") and pressure_state != self.pressure_state: 222 bb.note("Pressure status changed to CPU: %s, IO: %s, Mem: %s (CPU: %s/%s, IO: %s/%s, Mem: %s/%s) - using %s/%s bitbake threads" % (pressure_state + pressure_values + (len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks))) 223 self.pressure_state = pressure_state 224 return (exceeds_cpu_pressure or exceeds_io_pressure or exceeds_memory_pressure) 225 elif self.rq.max_loadfactor: 226 limit = False 227 loadfactor = float(os.getloadavg()[0]) / os.cpu_count() 228 # bb.warn("Comparing %s to %s" % (loadfactor, self.rq.max_loadfactor)) 229 if loadfactor > self.rq.max_loadfactor: 230 limit = True 231 if hasattr(self, "loadfactor_limit") and limit != self.loadfactor_limit: 232 bb.note("Load average limiting set to %s as load average: %s - using %s/%s bitbake threads" % (limit, loadfactor, len(self.rq.runq_running.difference(self.rq.runq_complete)), self.rq.number_tasks)) 233 self.loadfactor_limit = limit 234 return limit 235 return False 236 237 def next_buildable_task(self): 238 """ 239 Return the id of the first task we find that is buildable 240 """ 241 # Once tasks are running we don't need to worry about them again 242 self.buildable.difference_update(self.rq.runq_running) 243 buildable = set(self.buildable) 244 buildable.difference_update(self.rq.holdoff_tasks) 245 buildable.intersection_update(self.rq.tasks_covered | self.rq.tasks_notcovered) 246 if not buildable: 247 return None 248 249 # Bitbake requires that at least one task be active. Only check for pressure if 250 # this is the case, otherwise the pressure limitation could result in no tasks 251 # being active and no new tasks started thereby, at times, breaking the scheduler. 252 if self.rq.stats.active and self.exceeds_max_pressure(): 253 return None 254 255 # Filter out tasks that have a max number of threads that have been exceeded 256 skip_buildable = {} 257 for running in self.rq.runq_running.difference(self.rq.runq_complete): 258 rtaskname = taskname_from_tid(running) 259 if rtaskname not in self.skip_maxthread: 260 self.skip_maxthread[rtaskname] = self.rq.cfgData.getVarFlag(rtaskname, "number_threads") 261 if not self.skip_maxthread[rtaskname]: 262 continue 263 if rtaskname in skip_buildable: 264 skip_buildable[rtaskname] += 1 265 else: 266 skip_buildable[rtaskname] = 1 267 268 if len(buildable) == 1: 269 tid = buildable.pop() 270 taskname = taskname_from_tid(tid) 271 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 272 return None 273 stamp = self.stamps[tid] 274 if stamp not in self.rq.build_stamps.values(): 275 return tid 276 277 if not self.rev_prio_map: 278 self.rev_prio_map = {} 279 for tid in self.rqdata.runtaskentries: 280 self.rev_prio_map[tid] = self.prio_map.index(tid) 281 282 best = None 283 bestprio = None 284 for tid in buildable: 285 prio = self.rev_prio_map[tid] 286 if bestprio is None or bestprio > prio: 287 taskname = taskname_from_tid(tid) 288 if taskname in skip_buildable and skip_buildable[taskname] >= int(self.skip_maxthread[taskname]): 289 continue 290 stamp = self.stamps[tid] 291 if stamp in self.rq.build_stamps.values(): 292 continue 293 bestprio = prio 294 best = tid 295 296 return best 297 298 def next(self): 299 """ 300 Return the id of the task we should build next 301 """ 302 if self.rq.can_start_task(): 303 return self.next_buildable_task() 304 305 def newbuildable(self, task): 306 self.buildable.add(task) 307 308 def removebuildable(self, task): 309 self.buildable.remove(task) 310 311 def describe_task(self, taskid): 312 result = 'ID %s' % taskid 313 if self.rev_prio_map: 314 result = result + (' pri %d' % self.rev_prio_map[taskid]) 315 return result 316 317 def dump_prio(self, comment): 318 bb.debug(3, '%s (most important first):\n%s' % 319 (comment, 320 '\n'.join(['%d. %s' % (index + 1, self.describe_task(taskid)) for 321 index, taskid in enumerate(self.prio_map)]))) 322 323class RunQueueSchedulerSpeed(RunQueueScheduler): 324 """ 325 A scheduler optimised for speed. The priority map is sorted by task weight, 326 heavier weighted tasks (tasks needed by the most other tasks) are run first. 327 """ 328 name = "speed" 329 330 def __init__(self, runqueue, rqdata): 331 """ 332 The priority map is sorted by task weight. 333 """ 334 RunQueueScheduler.__init__(self, runqueue, rqdata) 335 336 weights = {} 337 for tid in self.rqdata.runtaskentries: 338 weight = self.rqdata.runtaskentries[tid].weight 339 if not weight in weights: 340 weights[weight] = [] 341 weights[weight].append(tid) 342 343 self.prio_map = [] 344 for weight in sorted(weights): 345 for w in weights[weight]: 346 self.prio_map.append(w) 347 348 self.prio_map.reverse() 349 350class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed): 351 """ 352 A scheduler optimised to complete .bb files as quickly as possible. The 353 priority map is sorted by task weight, but then reordered so once a given 354 .bb file starts to build, it's completed as quickly as possible by 355 running all tasks related to the same .bb file one after the after. 356 This works well where disk space is at a premium and classes like OE's 357 rm_work are in force. 358 """ 359 name = "completion" 360 361 def __init__(self, runqueue, rqdata): 362 super(RunQueueSchedulerCompletion, self).__init__(runqueue, rqdata) 363 364 # Extract list of tasks for each recipe, with tasks sorted 365 # ascending from "must run first" (typically do_fetch) to 366 # "runs last" (do_build). The speed scheduler prioritizes 367 # tasks that must run first before the ones that run later; 368 # this is what we depend on here. 369 task_lists = {} 370 for taskid in self.prio_map: 371 fn, taskname = taskid.rsplit(':', 1) 372 task_lists.setdefault(fn, []).append(taskname) 373 374 # Now unify the different task lists. The strategy is that 375 # common tasks get skipped and new ones get inserted after the 376 # preceeding common one(s) as they are found. Because task 377 # lists should differ only by their number of tasks, but not 378 # the ordering of the common tasks, this should result in a 379 # deterministic result that is a superset of the individual 380 # task ordering. 381 all_tasks = [] 382 for recipe, new_tasks in task_lists.items(): 383 index = 0 384 old_task = all_tasks[index] if index < len(all_tasks) else None 385 for new_task in new_tasks: 386 if old_task == new_task: 387 # Common task, skip it. This is the fast-path which 388 # avoids a full search. 389 index += 1 390 old_task = all_tasks[index] if index < len(all_tasks) else None 391 else: 392 try: 393 index = all_tasks.index(new_task) 394 # Already present, just not at the current 395 # place. We re-synchronized by changing the 396 # index so that it matches again. Now 397 # move on to the next existing task. 398 index += 1 399 old_task = all_tasks[index] if index < len(all_tasks) else None 400 except ValueError: 401 # Not present. Insert before old_task, which 402 # remains the same (but gets shifted back). 403 all_tasks.insert(index, new_task) 404 index += 1 405 bb.debug(3, 'merged task list: %s' % all_tasks) 406 407 # Now reverse the order so that tasks that finish the work on one 408 # recipe are considered more imporant (= come first). The ordering 409 # is now so that do_build is most important. 410 all_tasks.reverse() 411 412 # Group tasks of the same kind before tasks of less important 413 # kinds at the head of the queue (because earlier = lower 414 # priority number = runs earlier), while preserving the 415 # ordering by recipe. If recipe foo is more important than 416 # bar, then the goal is to work on foo's do_populate_sysroot 417 # before bar's do_populate_sysroot and on the more important 418 # tasks of foo before any of the less important tasks in any 419 # other recipe (if those other recipes are more important than 420 # foo). 421 # 422 # All of this only applies when tasks are runable. Explicit 423 # dependencies still override this ordering by priority. 424 # 425 # Here's an example why this priority re-ordering helps with 426 # minimizing disk usage. Consider a recipe foo with a higher 427 # priority than bar where foo DEPENDS on bar. Then the 428 # implicit rule (from base.bbclass) is that foo's do_configure 429 # depends on bar's do_populate_sysroot. This ensures that 430 # bar's do_populate_sysroot gets done first. Normally the 431 # tasks from foo would continue to run once that is done, and 432 # bar only gets completed and cleaned up later. By ordering 433 # bar's task that depend on bar's do_populate_sysroot before foo's 434 # do_configure, that problem gets avoided. 435 task_index = 0 436 self.dump_prio('original priorities') 437 for task in all_tasks: 438 for index in range(task_index, self.numTasks): 439 taskid = self.prio_map[index] 440 taskname = taskid.rsplit(':', 1)[1] 441 if taskname == task: 442 del self.prio_map[index] 443 self.prio_map.insert(task_index, taskid) 444 task_index += 1 445 self.dump_prio('completion priorities') 446 447class RunTaskEntry(object): 448 def __init__(self): 449 self.depends = set() 450 self.revdeps = set() 451 self.hash = None 452 self.unihash = None 453 self.task = None 454 self.weight = 1 455 456class RunQueueData: 457 """ 458 BitBake Run Queue implementation 459 """ 460 def __init__(self, rq, cooker, cfgData, dataCaches, taskData, targets): 461 self.cooker = cooker 462 self.dataCaches = dataCaches 463 self.taskData = taskData 464 self.targets = targets 465 self.rq = rq 466 self.warn_multi_bb = False 467 468 self.multi_provider_allowed = (cfgData.getVar("BB_MULTI_PROVIDER_ALLOWED") or "").split() 469 self.setscene_ignore_tasks = get_setscene_enforce_ignore_tasks(cfgData, targets) 470 self.setscene_ignore_tasks_checked = False 471 self.setscene_enforce = (cfgData.getVar('BB_SETSCENE_ENFORCE') == "1") 472 self.init_progress_reporter = bb.progress.DummyMultiStageProcessProgressReporter() 473 474 self.reset() 475 476 def reset(self): 477 self.runtaskentries = {} 478 479 def runq_depends_names(self, ids): 480 import re 481 ret = [] 482 for id in ids: 483 nam = os.path.basename(id) 484 nam = re.sub("_[^,]*,", ",", nam) 485 ret.extend([nam]) 486 return ret 487 488 def get_task_hash(self, tid): 489 return self.runtaskentries[tid].hash 490 491 def get_task_unihash(self, tid): 492 return self.runtaskentries[tid].unihash 493 494 def get_user_idstring(self, tid, task_name_suffix = ""): 495 return tid + task_name_suffix 496 497 def get_short_user_idstring(self, task, task_name_suffix = ""): 498 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 499 pn = self.dataCaches[mc].pkg_fn[taskfn] 500 taskname = taskname_from_tid(task) + task_name_suffix 501 return "%s:%s" % (pn, taskname) 502 503 def circular_depchains_handler(self, tasks): 504 """ 505 Some tasks aren't buildable, likely due to circular dependency issues. 506 Identify the circular dependencies and print them in a user readable format. 507 """ 508 from copy import deepcopy 509 510 valid_chains = [] 511 explored_deps = {} 512 msgs = [] 513 514 class TooManyLoops(Exception): 515 pass 516 517 def chain_reorder(chain): 518 """ 519 Reorder a dependency chain so the lowest task id is first 520 """ 521 lowest = 0 522 new_chain = [] 523 for entry in range(len(chain)): 524 if chain[entry] < chain[lowest]: 525 lowest = entry 526 new_chain.extend(chain[lowest:]) 527 new_chain.extend(chain[:lowest]) 528 return new_chain 529 530 def chain_compare_equal(chain1, chain2): 531 """ 532 Compare two dependency chains and see if they're the same 533 """ 534 if len(chain1) != len(chain2): 535 return False 536 for index in range(len(chain1)): 537 if chain1[index] != chain2[index]: 538 return False 539 return True 540 541 def chain_array_contains(chain, chain_array): 542 """ 543 Return True if chain_array contains chain 544 """ 545 for ch in chain_array: 546 if chain_compare_equal(ch, chain): 547 return True 548 return False 549 550 def find_chains(tid, prev_chain): 551 prev_chain.append(tid) 552 total_deps = [] 553 total_deps.extend(self.runtaskentries[tid].revdeps) 554 for revdep in self.runtaskentries[tid].revdeps: 555 if revdep in prev_chain: 556 idx = prev_chain.index(revdep) 557 # To prevent duplicates, reorder the chain to start with the lowest taskid 558 # and search through an array of those we've already printed 559 chain = prev_chain[idx:] 560 new_chain = chain_reorder(chain) 561 if not chain_array_contains(new_chain, valid_chains): 562 valid_chains.append(new_chain) 563 msgs.append("Dependency loop #%d found:\n" % len(valid_chains)) 564 for dep in new_chain: 565 msgs.append(" Task %s (dependent Tasks %s)\n" % (dep, self.runq_depends_names(self.runtaskentries[dep].depends))) 566 msgs.append("\n") 567 if len(valid_chains) > 10: 568 msgs.append("Halted dependency loops search after 10 matches.\n") 569 raise TooManyLoops 570 continue 571 scan = False 572 if revdep not in explored_deps: 573 scan = True 574 elif revdep in explored_deps[revdep]: 575 scan = True 576 else: 577 for dep in prev_chain: 578 if dep in explored_deps[revdep]: 579 scan = True 580 if scan: 581 find_chains(revdep, copy.deepcopy(prev_chain)) 582 for dep in explored_deps[revdep]: 583 if dep not in total_deps: 584 total_deps.append(dep) 585 586 explored_deps[tid] = total_deps 587 588 try: 589 for task in tasks: 590 find_chains(task, []) 591 except TooManyLoops: 592 pass 593 594 return msgs 595 596 def calculate_task_weights(self, endpoints): 597 """ 598 Calculate a number representing the "weight" of each task. Heavier weighted tasks 599 have more dependencies and hence should be executed sooner for maximum speed. 600 601 This function also sanity checks the task list finding tasks that are not 602 possible to execute due to circular dependencies. 603 """ 604 605 numTasks = len(self.runtaskentries) 606 weight = {} 607 deps_left = {} 608 task_done = {} 609 610 for tid in self.runtaskentries: 611 task_done[tid] = False 612 weight[tid] = 1 613 deps_left[tid] = len(self.runtaskentries[tid].revdeps) 614 615 for tid in endpoints: 616 weight[tid] = 10 617 task_done[tid] = True 618 619 while True: 620 next_points = [] 621 for tid in endpoints: 622 for revdep in self.runtaskentries[tid].depends: 623 weight[revdep] = weight[revdep] + weight[tid] 624 deps_left[revdep] = deps_left[revdep] - 1 625 if deps_left[revdep] == 0: 626 next_points.append(revdep) 627 task_done[revdep] = True 628 endpoints = next_points 629 if not next_points: 630 break 631 632 # Circular dependency sanity check 633 problem_tasks = [] 634 for tid in self.runtaskentries: 635 if task_done[tid] is False or deps_left[tid] != 0: 636 problem_tasks.append(tid) 637 logger.debug2("Task %s is not buildable", tid) 638 logger.debug2("(Complete marker was %s and the remaining dependency count was %s)\n", task_done[tid], deps_left[tid]) 639 self.runtaskentries[tid].weight = weight[tid] 640 641 if problem_tasks: 642 message = "%s unbuildable tasks were found.\n" % len(problem_tasks) 643 message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n" 644 message = message + "Identifying dependency loops (this may take a short while)...\n" 645 logger.error(message) 646 647 msgs = self.circular_depchains_handler(problem_tasks) 648 649 message = "\n" 650 for msg in msgs: 651 message = message + msg 652 bb.msg.fatal("RunQueue", message) 653 654 return weight 655 656 def prepare(self): 657 """ 658 Turn a set of taskData into a RunQueue and compute data needed 659 to optimise the execution order. 660 """ 661 662 runq_build = {} 663 recursivetasks = {} 664 recursiveitasks = {} 665 recursivetasksselfref = set() 666 667 taskData = self.taskData 668 669 found = False 670 for mc in self.taskData: 671 if taskData[mc].taskentries: 672 found = True 673 break 674 if not found: 675 # Nothing to do 676 return 0 677 678 bb.parse.siggen.setup_datacache(self.dataCaches) 679 680 self.init_progress_reporter.start() 681 self.init_progress_reporter.next_stage() 682 bb.event.check_for_interrupts(self.cooker.data) 683 684 # Step A - Work out a list of tasks to run 685 # 686 # Taskdata gives us a list of possible providers for every build and run 687 # target ordered by priority. It also gives information on each of those 688 # providers. 689 # 690 # To create the actual list of tasks to execute we fix the list of 691 # providers and then resolve the dependencies into task IDs. This 692 # process is repeated for each type of dependency (tdepends, deptask, 693 # rdeptast, recrdeptask, idepends). 694 695 def add_build_dependencies(depids, tasknames, depends, mc): 696 for depname in depids: 697 # Won't be in build_targets if ASSUME_PROVIDED 698 if depname not in taskData[mc].build_targets or not taskData[mc].build_targets[depname]: 699 continue 700 depdata = taskData[mc].build_targets[depname][0] 701 if depdata is None: 702 continue 703 for taskname in tasknames: 704 t = depdata + ":" + taskname 705 if t in taskData[mc].taskentries: 706 depends.add(t) 707 708 def add_runtime_dependencies(depids, tasknames, depends, mc): 709 for depname in depids: 710 if depname not in taskData[mc].run_targets or not taskData[mc].run_targets[depname]: 711 continue 712 depdata = taskData[mc].run_targets[depname][0] 713 if depdata is None: 714 continue 715 for taskname in tasknames: 716 t = depdata + ":" + taskname 717 if t in taskData[mc].taskentries: 718 depends.add(t) 719 720 def add_mc_dependencies(mc, tid): 721 mcdeps = taskData[mc].get_mcdepends() 722 for dep in mcdeps: 723 mcdependency = dep.split(':') 724 pn = mcdependency[3] 725 frommc = mcdependency[1] 726 mcdep = mcdependency[2] 727 deptask = mcdependency[4] 728 if mcdep not in taskData: 729 bb.fatal("Multiconfig '%s' is referenced in multiconfig dependency '%s' but not enabled in BBMULTICONFIG?" % (mcdep, dep)) 730 if mc == frommc: 731 fn = taskData[mcdep].build_targets[pn][0] 732 newdep = '%s:%s' % (fn,deptask) 733 taskData[mc].taskentries[tid].tdepends.append(newdep) 734 735 for mc in taskData: 736 for tid in taskData[mc].taskentries: 737 738 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 739 #runtid = build_tid(mc, fn, taskname) 740 741 #logger.debug2("Processing %s,%s:%s", mc, fn, taskname) 742 743 depends = set() 744 task_deps = self.dataCaches[mc].task_deps[taskfn] 745 746 self.runtaskentries[tid] = RunTaskEntry() 747 748 if fn in taskData[mc].failed_fns: 749 continue 750 751 # We add multiconfig dependencies before processing internal task deps (tdepends) 752 if 'mcdepends' in task_deps and taskname in task_deps['mcdepends']: 753 add_mc_dependencies(mc, tid) 754 755 # Resolve task internal dependencies 756 # 757 # e.g. addtask before X after Y 758 for t in taskData[mc].taskentries[tid].tdepends: 759 (depmc, depfn, deptaskname, _) = split_tid_mcfn(t) 760 depends.add(build_tid(depmc, depfn, deptaskname)) 761 762 # Resolve 'deptask' dependencies 763 # 764 # e.g. do_sometask[deptask] = "do_someothertask" 765 # (makes sure sometask runs after someothertask of all DEPENDS) 766 if 'deptask' in task_deps and taskname in task_deps['deptask']: 767 tasknames = task_deps['deptask'][taskname].split() 768 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 769 770 # Resolve 'rdeptask' dependencies 771 # 772 # e.g. do_sometask[rdeptask] = "do_someothertask" 773 # (makes sure sometask runs after someothertask of all RDEPENDS) 774 if 'rdeptask' in task_deps and taskname in task_deps['rdeptask']: 775 tasknames = task_deps['rdeptask'][taskname].split() 776 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 777 778 # Resolve inter-task dependencies 779 # 780 # e.g. do_sometask[depends] = "targetname:do_someothertask" 781 # (makes sure sometask runs after targetname's someothertask) 782 idepends = taskData[mc].taskentries[tid].idepends 783 for (depname, idependtask) in idepends: 784 if depname in taskData[mc].build_targets and taskData[mc].build_targets[depname] and not depname in taskData[mc].failed_deps: 785 # Won't be in build_targets if ASSUME_PROVIDED 786 depdata = taskData[mc].build_targets[depname][0] 787 if depdata is not None: 788 t = depdata + ":" + idependtask 789 depends.add(t) 790 if t not in taskData[mc].taskentries: 791 bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 792 irdepends = taskData[mc].taskentries[tid].irdepends 793 for (depname, idependtask) in irdepends: 794 if depname in taskData[mc].run_targets: 795 # Won't be in run_targets if ASSUME_PROVIDED 796 if not taskData[mc].run_targets[depname]: 797 continue 798 depdata = taskData[mc].run_targets[depname][0] 799 if depdata is not None: 800 t = depdata + ":" + idependtask 801 depends.add(t) 802 if t not in taskData[mc].taskentries: 803 bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskname, fn, idependtask, depdata)) 804 805 # Resolve recursive 'recrdeptask' dependencies (Part A) 806 # 807 # e.g. do_sometask[recrdeptask] = "do_someothertask" 808 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 809 # We cover the recursive part of the dependencies below 810 if 'recrdeptask' in task_deps and taskname in task_deps['recrdeptask']: 811 tasknames = task_deps['recrdeptask'][taskname].split() 812 recursivetasks[tid] = tasknames 813 add_build_dependencies(taskData[mc].depids[taskfn], tasknames, depends, mc) 814 add_runtime_dependencies(taskData[mc].rdepids[taskfn], tasknames, depends, mc) 815 if taskname in tasknames: 816 recursivetasksselfref.add(tid) 817 818 if 'recideptask' in task_deps and taskname in task_deps['recideptask']: 819 recursiveitasks[tid] = [] 820 for t in task_deps['recideptask'][taskname].split(): 821 newdep = build_tid(mc, fn, t) 822 recursiveitasks[tid].append(newdep) 823 824 self.runtaskentries[tid].depends = depends 825 # Remove all self references 826 self.runtaskentries[tid].depends.discard(tid) 827 828 #self.dump_data() 829 830 self.init_progress_reporter.next_stage() 831 bb.event.check_for_interrupts(self.cooker.data) 832 833 # Resolve recursive 'recrdeptask' dependencies (Part B) 834 # 835 # e.g. do_sometask[recrdeptask] = "do_someothertask" 836 # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively) 837 # We need to do this separately since we need all of runtaskentries[*].depends to be complete before this is processed 838 839 # Generating/interating recursive lists of dependencies is painful and potentially slow 840 # Precompute recursive task dependencies here by: 841 # a) create a temp list of reverse dependencies (revdeps) 842 # b) walk up the ends of the chains (when a given task no longer has dependencies i.e. len(deps) == 0) 843 # c) combine the total list of dependencies in cumulativedeps 844 # d) optimise by pre-truncating 'task' off the items in cumulativedeps (keeps items in sets lower) 845 846 847 revdeps = {} 848 deps = {} 849 cumulativedeps = {} 850 for tid in self.runtaskentries: 851 deps[tid] = set(self.runtaskentries[tid].depends) 852 revdeps[tid] = set() 853 cumulativedeps[tid] = set() 854 # Generate a temp list of reverse dependencies 855 for tid in self.runtaskentries: 856 for dep in self.runtaskentries[tid].depends: 857 revdeps[dep].add(tid) 858 # Find the dependency chain endpoints 859 endpoints = set() 860 for tid in self.runtaskentries: 861 if not deps[tid]: 862 endpoints.add(tid) 863 # Iterate the chains collating dependencies 864 while endpoints: 865 next = set() 866 for tid in endpoints: 867 for dep in revdeps[tid]: 868 cumulativedeps[dep].add(fn_from_tid(tid)) 869 cumulativedeps[dep].update(cumulativedeps[tid]) 870 if tid in deps[dep]: 871 deps[dep].remove(tid) 872 if not deps[dep]: 873 next.add(dep) 874 endpoints = next 875 #for tid in deps: 876 # if deps[tid]: 877 # bb.warn("Sanity test failure, dependencies left for %s (%s)" % (tid, deps[tid])) 878 879 # Loop here since recrdeptasks can depend upon other recrdeptasks and we have to 880 # resolve these recursively until we aren't adding any further extra dependencies 881 extradeps = True 882 while extradeps: 883 extradeps = 0 884 for tid in recursivetasks: 885 tasknames = recursivetasks[tid] 886 887 totaldeps = set(self.runtaskentries[tid].depends) 888 if tid in recursiveitasks: 889 totaldeps.update(recursiveitasks[tid]) 890 for dep in recursiveitasks[tid]: 891 if dep not in self.runtaskentries: 892 continue 893 totaldeps.update(self.runtaskentries[dep].depends) 894 895 deps = set() 896 for dep in totaldeps: 897 if dep in cumulativedeps: 898 deps.update(cumulativedeps[dep]) 899 900 for t in deps: 901 for taskname in tasknames: 902 newtid = t + ":" + taskname 903 if newtid == tid: 904 continue 905 if newtid in self.runtaskentries and newtid not in self.runtaskentries[tid].depends: 906 extradeps += 1 907 self.runtaskentries[tid].depends.add(newtid) 908 909 # Handle recursive tasks which depend upon other recursive tasks 910 deps = set() 911 for dep in self.runtaskentries[tid].depends.intersection(recursivetasks): 912 deps.update(self.runtaskentries[dep].depends.difference(self.runtaskentries[tid].depends)) 913 for newtid in deps: 914 for taskname in tasknames: 915 if not newtid.endswith(":" + taskname): 916 continue 917 if newtid in self.runtaskentries: 918 extradeps += 1 919 self.runtaskentries[tid].depends.add(newtid) 920 921 bb.debug(1, "Added %s recursive dependencies in this loop" % extradeps) 922 923 # Remove recrdeptask circular references so that do_a[recrdeptask] = "do_a do_b" can work 924 for tid in recursivetasksselfref: 925 self.runtaskentries[tid].depends.difference_update(recursivetasksselfref) 926 927 self.init_progress_reporter.next_stage() 928 bb.event.check_for_interrupts(self.cooker.data) 929 930 #self.dump_data() 931 932 # Step B - Mark all active tasks 933 # 934 # Start with the tasks we were asked to run and mark all dependencies 935 # as active too. If the task is to be 'forced', clear its stamp. Once 936 # all active tasks are marked, prune the ones we don't need. 937 938 logger.verbose("Marking Active Tasks") 939 940 def mark_active(tid, depth): 941 """ 942 Mark an item as active along with its depends 943 (calls itself recursively) 944 """ 945 946 if tid in runq_build: 947 return 948 949 runq_build[tid] = 1 950 951 depends = self.runtaskentries[tid].depends 952 for depend in depends: 953 mark_active(depend, depth+1) 954 955 def invalidate_task(tid, error_nostamp): 956 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 957 taskdep = self.dataCaches[mc].task_deps[taskfn] 958 if fn + ":" + taskname not in taskData[mc].taskentries: 959 logger.warning("Task %s does not exist, invalidating this task will have no effect" % taskname) 960 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 961 if error_nostamp: 962 bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname) 963 else: 964 bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname) 965 else: 966 logger.verbose("Invalidate task %s, %s", taskname, fn) 967 bb.parse.siggen.invalidate_task(taskname, taskfn) 968 969 self.target_tids = [] 970 for (mc, target, task, fn) in self.targets: 971 972 if target not in taskData[mc].build_targets or not taskData[mc].build_targets[target]: 973 continue 974 975 if target in taskData[mc].failed_deps: 976 continue 977 978 parents = False 979 if task.endswith('-'): 980 parents = True 981 task = task[:-1] 982 983 if fn in taskData[mc].failed_fns: 984 continue 985 986 # fn already has mc prefix 987 tid = fn + ":" + task 988 self.target_tids.append(tid) 989 if tid not in taskData[mc].taskentries: 990 import difflib 991 tasks = [] 992 for x in taskData[mc].taskentries: 993 if x.startswith(fn + ":"): 994 tasks.append(taskname_from_tid(x)) 995 close_matches = difflib.get_close_matches(task, tasks, cutoff=0.7) 996 if close_matches: 997 extra = ". Close matches:\n %s" % "\n ".join(close_matches) 998 else: 999 extra = "" 1000 bb.msg.fatal("RunQueue", "Task %s does not exist for target %s (%s)%s" % (task, target, tid, extra)) 1001 1002 # For tasks called "XXXX-", ony run their dependencies 1003 if parents: 1004 for i in self.runtaskentries[tid].depends: 1005 mark_active(i, 1) 1006 else: 1007 mark_active(tid, 1) 1008 1009 self.init_progress_reporter.next_stage() 1010 bb.event.check_for_interrupts(self.cooker.data) 1011 1012 # Step C - Prune all inactive tasks 1013 # 1014 # Once all active tasks are marked, prune the ones we don't need. 1015 1016 # Handle --runall 1017 if self.cooker.configuration.runall: 1018 # re-run the mark_active and then drop unused tasks from new list 1019 1020 runall_tids = set() 1021 added = True 1022 while added: 1023 reduced_tasklist = set(self.runtaskentries.keys()) 1024 for tid in list(self.runtaskentries.keys()): 1025 if tid not in runq_build: 1026 reduced_tasklist.remove(tid) 1027 runq_build = {} 1028 1029 orig = runall_tids 1030 runall_tids = set() 1031 for task in self.cooker.configuration.runall: 1032 if not task.startswith("do_"): 1033 task = "do_{0}".format(task) 1034 for tid in reduced_tasklist: 1035 wanttid = "{0}:{1}".format(fn_from_tid(tid), task) 1036 if wanttid in self.runtaskentries: 1037 runall_tids.add(wanttid) 1038 1039 for tid in list(runall_tids): 1040 mark_active(tid, 1) 1041 self.target_tids.append(tid) 1042 if self.cooker.configuration.force: 1043 invalidate_task(tid, False) 1044 added = runall_tids - orig 1045 1046 delcount = set() 1047 for tid in list(self.runtaskentries.keys()): 1048 if tid not in runq_build: 1049 delcount.add(tid) 1050 del self.runtaskentries[tid] 1051 1052 if self.cooker.configuration.runall: 1053 if not self.runtaskentries: 1054 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the recipes of the taskgraphs of the targets %s" % (str(self.cooker.configuration.runall), str(self.targets))) 1055 1056 self.init_progress_reporter.next_stage() 1057 bb.event.check_for_interrupts(self.cooker.data) 1058 1059 # Handle runonly 1060 if self.cooker.configuration.runonly: 1061 # re-run the mark_active and then drop unused tasks from new list 1062 runq_build = {} 1063 1064 for task in self.cooker.configuration.runonly: 1065 if not task.startswith("do_"): 1066 task = "do_{0}".format(task) 1067 runonly_tids = [k for k in self.runtaskentries.keys() if taskname_from_tid(k) == task] 1068 1069 for tid in runonly_tids: 1070 mark_active(tid, 1) 1071 if self.cooker.configuration.force: 1072 invalidate_task(tid, False) 1073 1074 for tid in list(self.runtaskentries.keys()): 1075 if tid not in runq_build: 1076 delcount.add(tid) 1077 del self.runtaskentries[tid] 1078 1079 if not self.runtaskentries: 1080 bb.msg.fatal("RunQueue", "Could not find any tasks with the tasknames %s to run within the taskgraphs of the targets %s" % (str(self.cooker.configuration.runonly), str(self.targets))) 1081 1082 # 1083 # Step D - Sanity checks and computation 1084 # 1085 1086 # Check to make sure we still have tasks to run 1087 if not self.runtaskentries: 1088 if not taskData[''].halt: 1089 bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.") 1090 else: 1091 bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.") 1092 1093 logger.verbose("Pruned %s inactive tasks, %s left", len(delcount), len(self.runtaskentries)) 1094 1095 logger.verbose("Assign Weightings") 1096 1097 self.init_progress_reporter.next_stage() 1098 bb.event.check_for_interrupts(self.cooker.data) 1099 1100 # Generate a list of reverse dependencies to ease future calculations 1101 for tid in self.runtaskentries: 1102 for dep in self.runtaskentries[tid].depends: 1103 self.runtaskentries[dep].revdeps.add(tid) 1104 1105 self.init_progress_reporter.next_stage() 1106 bb.event.check_for_interrupts(self.cooker.data) 1107 1108 # Identify tasks at the end of dependency chains 1109 # Error on circular dependency loops (length two) 1110 endpoints = [] 1111 for tid in self.runtaskentries: 1112 revdeps = self.runtaskentries[tid].revdeps 1113 if not revdeps: 1114 endpoints.append(tid) 1115 for dep in revdeps: 1116 if dep in self.runtaskentries[tid].depends: 1117 bb.msg.fatal("RunQueue", "Task %s has circular dependency on %s" % (tid, dep)) 1118 1119 1120 logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints)) 1121 1122 self.init_progress_reporter.next_stage() 1123 bb.event.check_for_interrupts(self.cooker.data) 1124 1125 # Calculate task weights 1126 # Check of higher length circular dependencies 1127 self.runq_weight = self.calculate_task_weights(endpoints) 1128 1129 self.init_progress_reporter.next_stage() 1130 bb.event.check_for_interrupts(self.cooker.data) 1131 1132 # Sanity Check - Check for multiple tasks building the same provider 1133 for mc in self.dataCaches: 1134 prov_list = {} 1135 seen_fn = [] 1136 for tid in self.runtaskentries: 1137 (tidmc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1138 if taskfn in seen_fn: 1139 continue 1140 if mc != tidmc: 1141 continue 1142 seen_fn.append(taskfn) 1143 for prov in self.dataCaches[mc].fn_provides[taskfn]: 1144 if prov not in prov_list: 1145 prov_list[prov] = [taskfn] 1146 elif taskfn not in prov_list[prov]: 1147 prov_list[prov].append(taskfn) 1148 for prov in prov_list: 1149 if len(prov_list[prov]) < 2: 1150 continue 1151 if prov in self.multi_provider_allowed: 1152 continue 1153 seen_pn = [] 1154 # If two versions of the same PN are being built its fatal, we don't support it. 1155 for fn in prov_list[prov]: 1156 pn = self.dataCaches[mc].pkg_fn[fn] 1157 if pn not in seen_pn: 1158 seen_pn.append(pn) 1159 else: 1160 bb.fatal("Multiple versions of %s are due to be built (%s). Only one version of a given PN should be built in any given build. You likely need to set PREFERRED_VERSION_%s to select the correct version or don't depend on multiple versions." % (pn, " ".join(prov_list[prov]), pn)) 1161 msgs = ["Multiple .bb files are due to be built which each provide %s:\n %s" % (prov, "\n ".join(prov_list[prov]))] 1162 # 1163 # Construct a list of things which uniquely depend on each provider 1164 # since this may help the user figure out which dependency is triggering this warning 1165 # 1166 msgs.append("\nA list of tasks depending on these providers is shown and may help explain where the dependency comes from.") 1167 deplist = {} 1168 commondeps = None 1169 for provfn in prov_list[prov]: 1170 deps = set() 1171 for tid in self.runtaskentries: 1172 fn = fn_from_tid(tid) 1173 if fn != provfn: 1174 continue 1175 for dep in self.runtaskentries[tid].revdeps: 1176 fn = fn_from_tid(dep) 1177 if fn == provfn: 1178 continue 1179 deps.add(dep) 1180 if not commondeps: 1181 commondeps = set(deps) 1182 else: 1183 commondeps &= deps 1184 deplist[provfn] = deps 1185 for provfn in deplist: 1186 msgs.append("\n%s has unique dependees:\n %s" % (provfn, "\n ".join(deplist[provfn] - commondeps))) 1187 # 1188 # Construct a list of provides and runtime providers for each recipe 1189 # (rprovides has to cover RPROVIDES, PACKAGES, PACKAGES_DYNAMIC) 1190 # 1191 msgs.append("\nIt could be that one recipe provides something the other doesn't and should. The following provider and runtime provider differences may be helpful.") 1192 provide_results = {} 1193 rprovide_results = {} 1194 commonprovs = None 1195 commonrprovs = None 1196 for provfn in prov_list[prov]: 1197 provides = set(self.dataCaches[mc].fn_provides[provfn]) 1198 rprovides = set() 1199 for rprovide in self.dataCaches[mc].rproviders: 1200 if provfn in self.dataCaches[mc].rproviders[rprovide]: 1201 rprovides.add(rprovide) 1202 for package in self.dataCaches[mc].packages: 1203 if provfn in self.dataCaches[mc].packages[package]: 1204 rprovides.add(package) 1205 for package in self.dataCaches[mc].packages_dynamic: 1206 if provfn in self.dataCaches[mc].packages_dynamic[package]: 1207 rprovides.add(package) 1208 if not commonprovs: 1209 commonprovs = set(provides) 1210 else: 1211 commonprovs &= provides 1212 provide_results[provfn] = provides 1213 if not commonrprovs: 1214 commonrprovs = set(rprovides) 1215 else: 1216 commonrprovs &= rprovides 1217 rprovide_results[provfn] = rprovides 1218 #msgs.append("\nCommon provides:\n %s" % ("\n ".join(commonprovs))) 1219 #msgs.append("\nCommon rprovides:\n %s" % ("\n ".join(commonrprovs))) 1220 for provfn in prov_list[prov]: 1221 msgs.append("\n%s has unique provides:\n %s" % (provfn, "\n ".join(provide_results[provfn] - commonprovs))) 1222 msgs.append("\n%s has unique rprovides:\n %s" % (provfn, "\n ".join(rprovide_results[provfn] - commonrprovs))) 1223 1224 if self.warn_multi_bb: 1225 logger.verbnote("".join(msgs)) 1226 else: 1227 logger.error("".join(msgs)) 1228 1229 self.init_progress_reporter.next_stage() 1230 self.init_progress_reporter.next_stage() 1231 bb.event.check_for_interrupts(self.cooker.data) 1232 1233 # Iterate over the task list looking for tasks with a 'setscene' function 1234 self.runq_setscene_tids = set() 1235 if not self.cooker.configuration.nosetscene: 1236 for tid in self.runtaskentries: 1237 (mc, fn, taskname, _) = split_tid_mcfn(tid) 1238 setscenetid = tid + "_setscene" 1239 if setscenetid not in taskData[mc].taskentries: 1240 continue 1241 self.runq_setscene_tids.add(tid) 1242 1243 self.init_progress_reporter.next_stage() 1244 bb.event.check_for_interrupts(self.cooker.data) 1245 1246 # Invalidate task if force mode active 1247 if self.cooker.configuration.force: 1248 for tid in self.target_tids: 1249 invalidate_task(tid, False) 1250 1251 # Invalidate task if invalidate mode active 1252 if self.cooker.configuration.invalidate_stamp: 1253 for tid in self.target_tids: 1254 fn = fn_from_tid(tid) 1255 for st in self.cooker.configuration.invalidate_stamp.split(','): 1256 if not st.startswith("do_"): 1257 st = "do_%s" % st 1258 invalidate_task(fn + ":" + st, True) 1259 1260 self.init_progress_reporter.next_stage() 1261 bb.event.check_for_interrupts(self.cooker.data) 1262 1263 # Create and print to the logs a virtual/xxxx -> PN (fn) table 1264 for mc in taskData: 1265 virtmap = taskData[mc].get_providermap(prefix="virtual/") 1266 virtpnmap = {} 1267 for v in virtmap: 1268 virtpnmap[v] = self.dataCaches[mc].pkg_fn[virtmap[v]] 1269 bb.debug(2, "%s resolved to: %s (%s)" % (v, virtpnmap[v], virtmap[v])) 1270 if hasattr(bb.parse.siggen, "tasks_resolved"): 1271 bb.parse.siggen.tasks_resolved(virtmap, virtpnmap, self.dataCaches[mc]) 1272 1273 self.init_progress_reporter.next_stage() 1274 bb.event.check_for_interrupts(self.cooker.data) 1275 1276 bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids) 1277 1278 starttime = time.time() 1279 lasttime = starttime 1280 1281 # Iterate over the task list and call into the siggen code 1282 dealtwith = set() 1283 todeal = set(self.runtaskentries) 1284 while todeal: 1285 ready = set() 1286 for tid in todeal.copy(): 1287 if not (self.runtaskentries[tid].depends - dealtwith): 1288 self.runtaskentries[tid].taskhash_deps = bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) 1289 # get_taskhash for a given tid *must* be called before get_unihash* below 1290 self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) 1291 ready.add(tid) 1292 unihashes = bb.parse.siggen.get_unihashes(ready) 1293 for tid in ready: 1294 dealtwith.add(tid) 1295 todeal.remove(tid) 1296 self.runtaskentries[tid].unihash = unihashes[tid] 1297 1298 bb.event.check_for_interrupts(self.cooker.data) 1299 1300 if time.time() > (lasttime + 30): 1301 lasttime = time.time() 1302 hashequiv_logger.verbose("Initial setup loop progress: %s of %s in %s" % (len(todeal), len(self.runtaskentries), lasttime - starttime)) 1303 1304 endtime = time.time() 1305 if (endtime-starttime > 60): 1306 hashequiv_logger.verbose("Initial setup loop took: %s" % (endtime-starttime)) 1307 1308 bb.parse.siggen.writeout_file_checksum_cache() 1309 1310 #self.dump_data() 1311 return len(self.runtaskentries) 1312 1313 def dump_data(self): 1314 """ 1315 Dump some debug information on the internal data structures 1316 """ 1317 logger.debug3("run_tasks:") 1318 for tid in self.runtaskentries: 1319 logger.debug3(" %s: %s Deps %s RevDeps %s", tid, 1320 self.runtaskentries[tid].weight, 1321 self.runtaskentries[tid].depends, 1322 self.runtaskentries[tid].revdeps) 1323 1324class RunQueueWorker(): 1325 def __init__(self, process, pipe): 1326 self.process = process 1327 self.pipe = pipe 1328 1329class RunQueue: 1330 def __init__(self, cooker, cfgData, dataCaches, taskData, targets): 1331 1332 self.cooker = cooker 1333 self.cfgData = cfgData 1334 self.rqdata = RunQueueData(self, cooker, cfgData, dataCaches, taskData, targets) 1335 1336 self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION") or None 1337 self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID") or None 1338 1339 self.state = runQueuePrepare 1340 1341 # For disk space monitor 1342 # Invoked at regular time intervals via the bitbake heartbeat event 1343 # while the build is running. We generate a unique name for the handler 1344 # here, just in case that there ever is more than one RunQueue instance, 1345 # start the handler when reaching runQueueSceneInit, and stop it when 1346 # done with the build. 1347 self.dm = monitordisk.diskMonitor(cfgData) 1348 self.dm_event_handler_name = '_bb_diskmonitor_' + str(id(self)) 1349 self.dm_event_handler_registered = False 1350 self.rqexe = None 1351 self.worker = {} 1352 self.fakeworker = {} 1353 1354 @staticmethod 1355 def send_pickled_data(worker, data, name): 1356 msg = bytearray() 1357 msg.extend(b"<" + name.encode() + b">") 1358 pickled_data = pickle.dumps(data) 1359 msg.extend(len(pickled_data).to_bytes(4, 'big')) 1360 msg.extend(pickled_data) 1361 msg.extend(b"</" + name.encode() + b">") 1362 worker.stdin.write(msg) 1363 1364 def _start_worker(self, mc, fakeroot = False, rqexec = None): 1365 logger.debug("Starting bitbake-worker") 1366 magic = "decafbad" 1367 if self.cooker.configuration.profile: 1368 magic = "decafbadbad" 1369 fakerootlogs = None 1370 1371 workerscript = os.path.realpath(os.path.dirname(__file__) + "/../../bin/bitbake-worker") 1372 if fakeroot: 1373 magic = magic + "beef" 1374 mcdata = self.cooker.databuilder.mcdata[mc] 1375 fakerootcmd = shlex.split(mcdata.getVar("FAKEROOTCMD")) 1376 fakerootenv = (mcdata.getVar("FAKEROOTBASEENV") or "").split() 1377 env = os.environ.copy() 1378 for key, value in (var.split('=',1) for var in fakerootenv): 1379 env[key] = value 1380 worker = subprocess.Popen(fakerootcmd + [sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env) 1381 fakerootlogs = self.rqdata.dataCaches[mc].fakerootlogs 1382 else: 1383 worker = subprocess.Popen([sys.executable, workerscript, magic], stdout=subprocess.PIPE, stdin=subprocess.PIPE) 1384 bb.utils.nonblockingfd(worker.stdout) 1385 workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, self, rqexec, fakerootlogs=fakerootlogs) 1386 1387 workerdata = { 1388 "sigdata" : bb.parse.siggen.get_taskdata(), 1389 "logdefaultlevel" : bb.msg.loggerDefaultLogLevel, 1390 "build_verbose_shell" : self.cooker.configuration.build_verbose_shell, 1391 "build_verbose_stdout" : self.cooker.configuration.build_verbose_stdout, 1392 "logdefaultdomain" : bb.msg.loggerDefaultDomains, 1393 "prhost" : self.cooker.prhost, 1394 "buildname" : self.cfgData.getVar("BUILDNAME"), 1395 "date" : self.cfgData.getVar("DATE"), 1396 "time" : self.cfgData.getVar("TIME"), 1397 "hashservaddr" : self.cooker.hashservaddr, 1398 "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), 1399 } 1400 1401 RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig") 1402 RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata") 1403 RunQueue.send_pickled_data(worker, workerdata, "workerdata") 1404 worker.stdin.flush() 1405 1406 return RunQueueWorker(worker, workerpipe) 1407 1408 def _teardown_worker(self, worker): 1409 if not worker: 1410 return 1411 logger.debug("Teardown for bitbake-worker") 1412 try: 1413 RunQueue.send_pickled_data(worker.process, b"", "quit") 1414 worker.process.stdin.flush() 1415 worker.process.stdin.close() 1416 except IOError: 1417 pass 1418 while worker.process.returncode is None: 1419 worker.pipe.read() 1420 worker.process.poll() 1421 while worker.pipe.read(): 1422 continue 1423 worker.pipe.close() 1424 1425 def start_worker(self, rqexec): 1426 if self.worker: 1427 self.teardown_workers() 1428 self.teardown = False 1429 for mc in self.rqdata.dataCaches: 1430 self.worker[mc] = self._start_worker(mc, False, rqexec) 1431 1432 def start_fakeworker(self, rqexec, mc): 1433 if not mc in self.fakeworker: 1434 self.fakeworker[mc] = self._start_worker(mc, True, rqexec) 1435 1436 def teardown_workers(self): 1437 self.teardown = True 1438 for mc in self.worker: 1439 self._teardown_worker(self.worker[mc]) 1440 self.worker = {} 1441 for mc in self.fakeworker: 1442 self._teardown_worker(self.fakeworker[mc]) 1443 self.fakeworker = {} 1444 1445 def read_workers(self): 1446 for mc in self.worker: 1447 self.worker[mc].pipe.read() 1448 for mc in self.fakeworker: 1449 self.fakeworker[mc].pipe.read() 1450 1451 def active_fds(self): 1452 fds = [] 1453 for mc in self.worker: 1454 fds.append(self.worker[mc].pipe.input) 1455 for mc in self.fakeworker: 1456 fds.append(self.fakeworker[mc].pipe.input) 1457 return fds 1458 1459 def check_stamp_task(self, tid, taskname = None, recurse = False, cache = None): 1460 def get_timestamp(f): 1461 try: 1462 if not os.access(f, os.F_OK): 1463 return None 1464 return os.stat(f)[stat.ST_MTIME] 1465 except: 1466 return None 1467 1468 (mc, fn, tn, taskfn) = split_tid_mcfn(tid) 1469 if taskname is None: 1470 taskname = tn 1471 1472 stampfile = bb.parse.siggen.stampfile_mcfn(taskname, taskfn) 1473 1474 # If the stamp is missing, it's not current 1475 if not os.access(stampfile, os.F_OK): 1476 logger.debug2("Stampfile %s not available", stampfile) 1477 return False 1478 # If it's a 'nostamp' task, it's not current 1479 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1480 if 'nostamp' in taskdep and taskname in taskdep['nostamp']: 1481 logger.debug2("%s.%s is nostamp\n", fn, taskname) 1482 return False 1483 1484 if taskname.endswith("_setscene"): 1485 return True 1486 1487 if cache is None: 1488 cache = {} 1489 1490 iscurrent = True 1491 t1 = get_timestamp(stampfile) 1492 for dep in self.rqdata.runtaskentries[tid].depends: 1493 if iscurrent: 1494 (mc2, fn2, taskname2, taskfn2) = split_tid_mcfn(dep) 1495 stampfile2 = bb.parse.siggen.stampfile_mcfn(taskname2, taskfn2) 1496 stampfile3 = bb.parse.siggen.stampfile_mcfn(taskname2 + "_setscene", taskfn2) 1497 t2 = get_timestamp(stampfile2) 1498 t3 = get_timestamp(stampfile3) 1499 if t3 and not t2: 1500 continue 1501 if t3 and t3 > t2: 1502 continue 1503 if fn == fn2: 1504 if not t2: 1505 logger.debug2('Stampfile %s does not exist', stampfile2) 1506 iscurrent = False 1507 break 1508 if t1 < t2: 1509 logger.debug2('Stampfile %s < %s', stampfile, stampfile2) 1510 iscurrent = False 1511 break 1512 if recurse and iscurrent: 1513 if dep in cache: 1514 iscurrent = cache[dep] 1515 if not iscurrent: 1516 logger.debug2('Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2)) 1517 else: 1518 iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache) 1519 cache[dep] = iscurrent 1520 if recurse: 1521 cache[tid] = iscurrent 1522 return iscurrent 1523 1524 def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False, summary=True): 1525 valid = set() 1526 if self.hashvalidate: 1527 sq_data = {} 1528 sq_data['hash'] = {} 1529 sq_data['hashfn'] = {} 1530 sq_data['unihash'] = {} 1531 for tid in tocheck: 1532 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1533 sq_data['hash'][tid] = self.rqdata.runtaskentries[tid].hash 1534 sq_data['hashfn'][tid] = self.rqdata.dataCaches[mc].hashfn[taskfn] 1535 sq_data['unihash'][tid] = self.rqdata.runtaskentries[tid].unihash 1536 1537 valid = self.validate_hash(sq_data, data, siginfo, currentcount, summary) 1538 1539 return valid 1540 1541 def validate_hash(self, sq_data, d, siginfo, currentcount, summary): 1542 locs = {"sq_data" : sq_data, "d" : d, "siginfo" : siginfo, "currentcount" : currentcount, "summary" : summary} 1543 1544 # Metadata has **kwargs so args can be added, sq_data can also gain new fields 1545 call = self.hashvalidate + "(sq_data, d, siginfo=siginfo, currentcount=currentcount, summary=summary)" 1546 1547 return bb.utils.better_eval(call, locs) 1548 1549 def _execute_runqueue(self): 1550 """ 1551 Run the tasks in a queue prepared by rqdata.prepare() 1552 Upon failure, optionally try to recover the build using any alternate providers 1553 (if the halt on failure configuration option isn't set) 1554 """ 1555 1556 retval = True 1557 bb.event.check_for_interrupts(self.cooker.data) 1558 1559 if self.state is runQueuePrepare: 1560 # NOTE: if you add, remove or significantly refactor the stages of this 1561 # process then you should recalculate the weightings here. This is quite 1562 # easy to do - just change the next line temporarily to pass debug=True as 1563 # the last parameter and you'll get a printout of the weightings as well 1564 # as a map to the lines where next_stage() was called. Of course this isn't 1565 # critical, but it helps to keep the progress reporting accurate. 1566 self.rqdata.init_progress_reporter = bb.progress.MultiStageProcessProgressReporter(self.cooker.data, 1567 "Initialising tasks", 1568 [43, 967, 4, 3, 1, 5, 3, 7, 13, 1, 2, 1, 1, 246, 35, 1, 38, 1, 35, 2, 338, 204, 142, 3, 3, 37, 244]) 1569 if self.rqdata.prepare() == 0: 1570 self.state = runQueueComplete 1571 else: 1572 self.state = runQueueSceneInit 1573 bb.parse.siggen.save_unitaskhashes() 1574 1575 if self.state is runQueueSceneInit: 1576 self.rqdata.init_progress_reporter.next_stage() 1577 1578 # we are ready to run, emit dependency info to any UI or class which 1579 # needs it 1580 depgraph = self.cooker.buildDependTree(self, self.rqdata.taskData) 1581 self.rqdata.init_progress_reporter.next_stage() 1582 bb.event.fire(bb.event.DepTreeGenerated(depgraph), self.cooker.data) 1583 1584 if not self.dm_event_handler_registered: 1585 res = bb.event.register(self.dm_event_handler_name, 1586 lambda x, y: self.dm.check(self) if self.state in [runQueueRunning, runQueueCleanUp] else False, 1587 ('bb.event.HeartbeatEvent',), data=self.cfgData) 1588 self.dm_event_handler_registered = True 1589 1590 self.rqdata.init_progress_reporter.next_stage() 1591 self.rqexe = RunQueueExecute(self) 1592 1593 dumpsigs = self.cooker.configuration.dump_signatures 1594 if dumpsigs: 1595 self.rqdata.init_progress_reporter.finish() 1596 if 'printdiff' in dumpsigs: 1597 self.invalidtasks_dump = self.print_diffscenetasks() 1598 self.state = runQueueDumpSigs 1599 1600 if self.state is runQueueDumpSigs: 1601 dumpsigs = self.cooker.configuration.dump_signatures 1602 retval = self.dump_signatures(dumpsigs) 1603 if retval is False: 1604 if 'printdiff' in dumpsigs: 1605 self.write_diffscenetasks(self.invalidtasks_dump) 1606 self.state = runQueueComplete 1607 1608 if self.state is runQueueSceneInit: 1609 self.start_worker(self.rqexe) 1610 self.rqdata.init_progress_reporter.finish() 1611 1612 # If we don't have any setscene functions, skip execution 1613 if not self.rqdata.runq_setscene_tids: 1614 logger.info('No setscene tasks') 1615 for tid in self.rqdata.runtaskentries: 1616 if not self.rqdata.runtaskentries[tid].depends: 1617 self.rqexe.setbuildable(tid) 1618 self.rqexe.tasks_notcovered.add(tid) 1619 self.rqexe.sqdone = True 1620 logger.info('Executing Tasks') 1621 self.state = runQueueRunning 1622 1623 if self.state is runQueueRunning: 1624 retval = self.rqexe.execute() 1625 1626 if self.state is runQueueCleanUp: 1627 retval = self.rqexe.finish() 1628 1629 build_done = self.state is runQueueComplete or self.state is runQueueFailed 1630 1631 if build_done and self.dm_event_handler_registered: 1632 bb.event.remove(self.dm_event_handler_name, None, data=self.cfgData) 1633 self.dm_event_handler_registered = False 1634 1635 if build_done and self.rqexe: 1636 bb.parse.siggen.save_unitaskhashes() 1637 self.teardown_workers() 1638 if self.rqexe: 1639 if self.rqexe.stats.failed: 1640 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed) 1641 else: 1642 # Let's avoid the word "failed" if nothing actually did 1643 logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped) 1644 1645 if self.state is runQueueFailed: 1646 raise bb.runqueue.TaskFailure(self.rqexe.failed_tids) 1647 1648 if self.state is runQueueComplete: 1649 # All done 1650 return False 1651 1652 # Loop 1653 return retval 1654 1655 def execute_runqueue(self): 1656 # Catch unexpected exceptions and ensure we exit when an error occurs, not loop. 1657 try: 1658 return self._execute_runqueue() 1659 except bb.runqueue.TaskFailure: 1660 raise 1661 except SystemExit: 1662 raise 1663 except bb.BBHandledException: 1664 try: 1665 self.teardown_workers() 1666 except: 1667 pass 1668 self.state = runQueueComplete 1669 raise 1670 except Exception as err: 1671 logger.exception("An uncaught exception occurred in runqueue") 1672 try: 1673 self.teardown_workers() 1674 except: 1675 pass 1676 self.state = runQueueComplete 1677 raise 1678 1679 def finish_runqueue(self, now = False): 1680 if not self.rqexe: 1681 self.state = runQueueComplete 1682 return 1683 1684 if now: 1685 self.rqexe.finish_now() 1686 else: 1687 self.rqexe.finish() 1688 1689 def _rq_dump_sigtid(self, tids): 1690 for tid in tids: 1691 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1692 dataCaches = self.rqdata.dataCaches 1693 bb.parse.siggen.dump_sigtask(taskfn, taskname, dataCaches[mc].stamp[taskfn], True) 1694 1695 def dump_signatures(self, options): 1696 if not hasattr(self, "dumpsigs_launched"): 1697 if bb.cooker.CookerFeatures.RECIPE_SIGGEN_INFO not in self.cooker.featureset: 1698 bb.fatal("The dump signatures functionality needs the RECIPE_SIGGEN_INFO feature enabled") 1699 1700 bb.note("Writing task signature files") 1701 1702 max_process = int(self.cfgData.getVar("BB_NUMBER_PARSE_THREADS") or os.cpu_count() or 1) 1703 def chunkify(l, n): 1704 return [l[i::n] for i in range(n)] 1705 dumpsigs_tids = chunkify(list(self.rqdata.runtaskentries), max_process) 1706 1707 # We cannot use the real multiprocessing.Pool easily due to some local data 1708 # that can't be pickled. This is a cheap multi-process solution. 1709 self.dumpsigs_launched = [] 1710 1711 for tids in dumpsigs_tids: 1712 p = Process(target=self._rq_dump_sigtid, args=(tids, )) 1713 p.start() 1714 self.dumpsigs_launched.append(p) 1715 1716 return 1.0 1717 1718 for q in self.dumpsigs_launched: 1719 # The finished processes are joined when calling is_alive() 1720 if not q.is_alive(): 1721 self.dumpsigs_launched.remove(q) 1722 1723 if self.dumpsigs_launched: 1724 return 1.0 1725 1726 for p in self.dumpsigs_launched: 1727 p.join() 1728 1729 bb.parse.siggen.dump_sigs(self.rqdata.dataCaches, options) 1730 1731 return False 1732 1733 def print_diffscenetasks(self): 1734 def get_root_invalid_tasks(task, taskdepends, valid, noexec, visited_invalid): 1735 invalidtasks = [] 1736 for t in taskdepends[task].depends: 1737 if t not in valid and t not in visited_invalid: 1738 invalidtasks.extend(get_root_invalid_tasks(t, taskdepends, valid, noexec, visited_invalid)) 1739 visited_invalid.add(t) 1740 1741 direct_invalid = [t for t in taskdepends[task].depends if t not in valid] 1742 if not direct_invalid and task not in noexec: 1743 invalidtasks = [task] 1744 return invalidtasks 1745 1746 noexec = [] 1747 tocheck = set() 1748 1749 for tid in self.rqdata.runtaskentries: 1750 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1751 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 1752 1753 if 'noexec' in taskdep and taskname in taskdep['noexec']: 1754 noexec.append(tid) 1755 continue 1756 1757 tocheck.add(tid) 1758 1759 valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True, summary=False) 1760 1761 # Tasks which are both setscene and noexec never care about dependencies 1762 # We therefore find tasks which are setscene and noexec and mark their 1763 # unique dependencies as valid. 1764 for tid in noexec: 1765 if tid not in self.rqdata.runq_setscene_tids: 1766 continue 1767 for dep in self.rqdata.runtaskentries[tid].depends: 1768 hasnoexecparents = True 1769 for dep2 in self.rqdata.runtaskentries[dep].revdeps: 1770 if dep2 in self.rqdata.runq_setscene_tids and dep2 in noexec: 1771 continue 1772 hasnoexecparents = False 1773 break 1774 if hasnoexecparents: 1775 valid_new.add(dep) 1776 1777 invalidtasks = set() 1778 1779 toptasks = set(["{}:{}".format(t[3], t[2]) for t in self.rqdata.targets]) 1780 for tid in toptasks: 1781 toprocess = set([tid]) 1782 while toprocess: 1783 next = set() 1784 visited_invalid = set() 1785 for t in toprocess: 1786 if t not in valid_new and t not in noexec: 1787 invalidtasks.update(get_root_invalid_tasks(t, self.rqdata.runtaskentries, valid_new, noexec, visited_invalid)) 1788 continue 1789 if t in self.rqdata.runq_setscene_tids: 1790 for dep in self.rqexe.sqdata.sq_deps[t]: 1791 next.add(dep) 1792 continue 1793 1794 for dep in self.rqdata.runtaskentries[t].depends: 1795 next.add(dep) 1796 1797 toprocess = next 1798 1799 tasklist = [] 1800 for tid in invalidtasks: 1801 tasklist.append(tid) 1802 1803 if tasklist: 1804 bb.plain("The differences between the current build and any cached tasks start at the following tasks:\n" + "\n".join(tasklist)) 1805 1806 return invalidtasks 1807 1808 def write_diffscenetasks(self, invalidtasks): 1809 bb.siggen.check_siggen_version(bb.siggen) 1810 1811 # Define recursion callback 1812 def recursecb(key, hash1, hash2): 1813 hashes = [hash1, hash2] 1814 bb.debug(1, "Recursively looking for recipe {} hashes {}".format(key, hashes)) 1815 hashfiles = bb.siggen.find_siginfo(key, None, hashes, self.cfgData) 1816 bb.debug(1, "Found hashfiles:\n{}".format(hashfiles)) 1817 1818 recout = [] 1819 if len(hashfiles) == 2: 1820 out2 = bb.siggen.compare_sigfiles(hashfiles[hash1]['path'], hashfiles[hash2]['path'], recursecb) 1821 recout.extend(list(' ' + l for l in out2)) 1822 else: 1823 recout.append("Unable to find matching sigdata for %s with hashes %s or %s" % (key, hash1, hash2)) 1824 1825 return recout 1826 1827 1828 for tid in invalidtasks: 1829 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1830 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 1831 h = self.rqdata.runtaskentries[tid].unihash 1832 bb.debug(1, "Looking for recipe {} task {}".format(pn, taskname)) 1833 matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc]) 1834 bb.debug(1, "Found hashfiles:\n{}".format(matches)) 1835 match = None 1836 for m in matches.values(): 1837 if h in m['path']: 1838 match = m['path'] 1839 if match is None: 1840 bb.fatal("Can't find a task we're supposed to have written out? (hash: %s tid: %s)?" % (h, tid)) 1841 matches = {k : v for k, v in iter(matches.items()) if h not in k} 1842 matches_local = {k : v for k, v in iter(matches.items()) if h not in k and not v['sstate']} 1843 if matches_local: 1844 matches = matches_local 1845 if matches: 1846 latestmatch = matches[sorted(matches.keys(), key=lambda h: matches[h]['time'])[-1]]['path'] 1847 prevh = __find_sha256__.search(latestmatch).group(0) 1848 output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb) 1849 bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, most recent matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) 1850 1851 1852class RunQueueExecute: 1853 1854 def __init__(self, rq): 1855 self.rq = rq 1856 self.cooker = rq.cooker 1857 self.cfgData = rq.cfgData 1858 self.rqdata = rq.rqdata 1859 1860 self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS") or 1) 1861 self.scheduler = self.cfgData.getVar("BB_SCHEDULER") or "speed" 1862 self.max_cpu_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_CPU") 1863 self.max_io_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_IO") 1864 self.max_memory_pressure = self.cfgData.getVar("BB_PRESSURE_MAX_MEMORY") 1865 self.max_loadfactor = self.cfgData.getVar("BB_LOADFACTOR_MAX") 1866 1867 self.sq_buildable = set() 1868 self.sq_running = set() 1869 self.sq_live = set() 1870 1871 self.updated_taskhash_queue = [] 1872 self.pending_migrations = set() 1873 1874 self.runq_buildable = set() 1875 self.runq_running = set() 1876 self.runq_complete = set() 1877 self.runq_tasksrun = set() 1878 1879 self.build_stamps = {} 1880 self.build_stamps2 = [] 1881 self.failed_tids = [] 1882 self.sq_deferred = {} 1883 self.sq_needed_harddeps = set() 1884 self.sq_harddep_deferred = set() 1885 1886 self.stampcache = {} 1887 1888 self.holdoff_tasks = set() 1889 self.holdoff_need_update = True 1890 self.sqdone = False 1891 1892 self.stats = RunQueueStats(len(self.rqdata.runtaskentries), len(self.rqdata.runq_setscene_tids)) 1893 1894 if self.number_tasks <= 0: 1895 bb.fatal("Invalid BB_NUMBER_THREADS %s" % self.number_tasks) 1896 1897 lower_limit = 1.0 1898 upper_limit = 1000000.0 1899 if self.max_cpu_pressure: 1900 self.max_cpu_pressure = float(self.max_cpu_pressure) 1901 if self.max_cpu_pressure < lower_limit: 1902 bb.fatal("Invalid BB_PRESSURE_MAX_CPU %s, minimum value is %s." % (self.max_cpu_pressure, lower_limit)) 1903 if self.max_cpu_pressure > upper_limit: 1904 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_CPU is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_cpu_pressure)) 1905 1906 if self.max_io_pressure: 1907 self.max_io_pressure = float(self.max_io_pressure) 1908 if self.max_io_pressure < lower_limit: 1909 bb.fatal("Invalid BB_PRESSURE_MAX_IO %s, minimum value is %s." % (self.max_io_pressure, lower_limit)) 1910 if self.max_io_pressure > upper_limit: 1911 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_IO is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) 1912 1913 if self.max_memory_pressure: 1914 self.max_memory_pressure = float(self.max_memory_pressure) 1915 if self.max_memory_pressure < lower_limit: 1916 bb.fatal("Invalid BB_PRESSURE_MAX_MEMORY %s, minimum value is %s." % (self.max_memory_pressure, lower_limit)) 1917 if self.max_memory_pressure > upper_limit: 1918 bb.warn("Your build will be largely unregulated since BB_PRESSURE_MAX_MEMORY is set to %s. It is very unlikely that such high pressure will be experienced." % (self.max_io_pressure)) 1919 1920 if self.max_loadfactor: 1921 self.max_loadfactor = float(self.max_loadfactor) 1922 if self.max_loadfactor <= 0: 1923 bb.fatal("Invalid BB_LOADFACTOR_MAX %s, needs to be greater than zero." % (self.max_loadfactor)) 1924 1925 # List of setscene tasks which we've covered 1926 self.scenequeue_covered = set() 1927 # List of tasks which are covered (including setscene ones) 1928 self.tasks_covered = set() 1929 self.tasks_scenequeue_done = set() 1930 self.scenequeue_notcovered = set() 1931 self.tasks_notcovered = set() 1932 self.scenequeue_notneeded = set() 1933 1934 schedulers = self.get_schedulers() 1935 for scheduler in schedulers: 1936 if self.scheduler == scheduler.name: 1937 self.sched = scheduler(self, self.rqdata) 1938 logger.debug("Using runqueue scheduler '%s'", scheduler.name) 1939 break 1940 else: 1941 bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" % 1942 (self.scheduler, ", ".join(obj.name for obj in schedulers))) 1943 1944 #if self.rqdata.runq_setscene_tids: 1945 self.sqdata = SQData() 1946 build_scenequeue_data(self.sqdata, self.rqdata, self) 1947 1948 update_scenequeue_data(self.sqdata.sq_revdeps, self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=True) 1949 1950 # Compute a list of 'stale' sstate tasks where the current hash does not match the one 1951 # in any stamp files. Pass the list out to metadata as an event. 1952 found = {} 1953 for tid in self.rqdata.runq_setscene_tids: 1954 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 1955 stamps = bb.build.find_stale_stamps(taskname, taskfn) 1956 if stamps: 1957 if mc not in found: 1958 found[mc] = {} 1959 found[mc][tid] = stamps 1960 for mc in found: 1961 event = bb.event.StaleSetSceneTasks(found[mc]) 1962 bb.event.fire(event, self.cooker.databuilder.mcdata[mc]) 1963 1964 self.build_taskdepdata_cache() 1965 1966 def runqueue_process_waitpid(self, task, status, fakerootlog=None): 1967 1968 # self.build_stamps[pid] may not exist when use shared work directory. 1969 if task in self.build_stamps: 1970 self.build_stamps2.remove(self.build_stamps[task]) 1971 del self.build_stamps[task] 1972 1973 if task in self.sq_live: 1974 if status != 0: 1975 self.sq_task_fail(task, status) 1976 else: 1977 self.sq_task_complete(task) 1978 self.sq_live.remove(task) 1979 self.stats.updateActiveSetscene(len(self.sq_live)) 1980 else: 1981 if status != 0: 1982 self.task_fail(task, status, fakerootlog=fakerootlog) 1983 else: 1984 self.task_complete(task) 1985 return True 1986 1987 def finish_now(self): 1988 for mc in self.rq.worker: 1989 try: 1990 RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow") 1991 self.rq.worker[mc].process.stdin.flush() 1992 except IOError: 1993 # worker must have died? 1994 pass 1995 for mc in self.rq.fakeworker: 1996 try: 1997 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow") 1998 self.rq.fakeworker[mc].process.stdin.flush() 1999 except IOError: 2000 # worker must have died? 2001 pass 2002 2003 if self.failed_tids: 2004 self.rq.state = runQueueFailed 2005 return 2006 2007 self.rq.state = runQueueComplete 2008 return 2009 2010 def finish(self): 2011 self.rq.state = runQueueCleanUp 2012 2013 active = self.stats.active + len(self.sq_live) 2014 if active > 0: 2015 bb.event.fire(runQueueExitWait(active), self.cfgData) 2016 self.rq.read_workers() 2017 return self.rq.active_fds() 2018 2019 if self.failed_tids: 2020 self.rq.state = runQueueFailed 2021 return True 2022 2023 self.rq.state = runQueueComplete 2024 return True 2025 2026 # Used by setscene only 2027 def check_dependencies(self, task, taskdeps): 2028 if not self.rq.depvalidate: 2029 return False 2030 2031 # Must not edit parent data 2032 taskdeps = set(taskdeps) 2033 2034 taskdata = {} 2035 taskdeps.add(task) 2036 for dep in taskdeps: 2037 (mc, fn, taskname, taskfn) = split_tid_mcfn(dep) 2038 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2039 taskdata[dep] = [pn, taskname, fn] 2040 call = self.rq.depvalidate + "(task, taskdata, notneeded, d)" 2041 locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data } 2042 valid = bb.utils.better_eval(call, locs) 2043 return valid 2044 2045 def can_start_task(self): 2046 active = self.stats.active + len(self.sq_live) 2047 can_start = active < self.number_tasks 2048 return can_start 2049 2050 def get_schedulers(self): 2051 schedulers = set(obj for obj in globals().values() 2052 if type(obj) is type and 2053 issubclass(obj, RunQueueScheduler)) 2054 2055 user_schedulers = self.cfgData.getVar("BB_SCHEDULERS") 2056 if user_schedulers: 2057 for sched in user_schedulers.split(): 2058 if not "." in sched: 2059 bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched) 2060 continue 2061 2062 modname, name = sched.rsplit(".", 1) 2063 try: 2064 module = __import__(modname, fromlist=(name,)) 2065 except ImportError as exc: 2066 bb.fatal("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc)) 2067 else: 2068 schedulers.add(getattr(module, name)) 2069 return schedulers 2070 2071 def setbuildable(self, task): 2072 self.runq_buildable.add(task) 2073 self.sched.newbuildable(task) 2074 2075 def task_completeoutright(self, task): 2076 """ 2077 Mark a task as completed 2078 Look at the reverse dependencies and mark any task with 2079 completed dependencies as buildable 2080 """ 2081 self.runq_complete.add(task) 2082 for revdep in self.rqdata.runtaskentries[task].revdeps: 2083 if revdep in self.runq_running: 2084 continue 2085 if revdep in self.runq_buildable: 2086 continue 2087 alldeps = True 2088 for dep in self.rqdata.runtaskentries[revdep].depends: 2089 if dep not in self.runq_complete: 2090 alldeps = False 2091 break 2092 if alldeps: 2093 self.setbuildable(revdep) 2094 logger.debug("Marking task %s as buildable", revdep) 2095 2096 found = None 2097 for t in sorted(self.sq_deferred.copy()): 2098 if self.sq_deferred[t] == task: 2099 # Allow the next deferred task to run. Any other deferred tasks should be deferred after that task. 2100 # We shouldn't allow all to run at once as it is prone to races. 2101 if not found: 2102 bb.debug(1, "Deferred task %s now buildable" % t) 2103 del self.sq_deferred[t] 2104 update_scenequeue_data([t], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 2105 found = t 2106 else: 2107 bb.debug(1, "Deferring %s after %s" % (t, found)) 2108 self.sq_deferred[t] = found 2109 2110 def task_complete(self, task): 2111 self.stats.taskCompleted() 2112 bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 2113 self.task_completeoutright(task) 2114 self.runq_tasksrun.add(task) 2115 2116 def task_fail(self, task, exitcode, fakerootlog=None): 2117 """ 2118 Called when a task has failed 2119 Updates the state engine with the failure 2120 """ 2121 self.stats.taskFailed() 2122 self.failed_tids.append(task) 2123 2124 fakeroot_log = [] 2125 if fakerootlog and os.path.exists(fakerootlog): 2126 with open(fakerootlog) as fakeroot_log_file: 2127 fakeroot_failed = False 2128 for line in reversed(fakeroot_log_file.readlines()): 2129 for fakeroot_error in ['mismatch', 'error', 'fatal']: 2130 if fakeroot_error in line.lower(): 2131 fakeroot_failed = True 2132 if 'doing new pid setup and server start' in line: 2133 break 2134 fakeroot_log.append(line) 2135 2136 if not fakeroot_failed: 2137 fakeroot_log = [] 2138 2139 bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq, fakeroot_log=("".join(fakeroot_log) or None)), self.cfgData) 2140 2141 if self.rqdata.taskData[''].halt: 2142 self.rq.state = runQueueCleanUp 2143 2144 def task_skip(self, task, reason): 2145 self.runq_running.add(task) 2146 self.setbuildable(task) 2147 bb.event.fire(runQueueTaskSkipped(task, self.stats, self.rq, reason), self.cfgData) 2148 self.task_completeoutright(task) 2149 self.stats.taskSkipped() 2150 self.stats.taskCompleted() 2151 2152 def summarise_scenequeue_errors(self): 2153 err = False 2154 if not self.sqdone: 2155 logger.debug('We could skip tasks %s', "\n".join(sorted(self.scenequeue_covered))) 2156 completeevent = sceneQueueComplete(self.stats, self.rq) 2157 bb.event.fire(completeevent, self.cfgData) 2158 if self.sq_deferred: 2159 logger.error("Scenequeue had deferred entries: %s" % pprint.pformat(self.sq_deferred)) 2160 err = True 2161 if self.updated_taskhash_queue: 2162 logger.error("Scenequeue had unprocessed changed taskhash entries: %s" % pprint.pformat(self.updated_taskhash_queue)) 2163 err = True 2164 if self.holdoff_tasks: 2165 logger.error("Scenequeue had holdoff tasks: %s" % pprint.pformat(self.holdoff_tasks)) 2166 err = True 2167 2168 for tid in self.scenequeue_covered.intersection(self.scenequeue_notcovered): 2169 # No task should end up in both covered and uncovered, that is a bug. 2170 logger.error("Setscene task %s in both covered and notcovered." % tid) 2171 2172 for tid in self.rqdata.runq_setscene_tids: 2173 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2174 err = True 2175 logger.error("Setscene Task %s was never marked as covered or not covered" % tid) 2176 if tid not in self.sq_buildable: 2177 err = True 2178 logger.error("Setscene Task %s was never marked as buildable" % tid) 2179 if tid not in self.sq_running: 2180 err = True 2181 logger.error("Setscene Task %s was never marked as running" % tid) 2182 2183 for x in self.rqdata.runtaskentries: 2184 if x not in self.tasks_covered and x not in self.tasks_notcovered: 2185 logger.error("Task %s was never moved from the setscene queue" % x) 2186 err = True 2187 if x not in self.tasks_scenequeue_done: 2188 logger.error("Task %s was never processed by the setscene code" % x) 2189 err = True 2190 if not self.rqdata.runtaskentries[x].depends and x not in self.runq_buildable: 2191 logger.error("Task %s was never marked as buildable by the setscene code" % x) 2192 err = True 2193 return err 2194 2195 2196 def execute(self): 2197 """ 2198 Run the tasks in a queue prepared by prepare_runqueue 2199 """ 2200 2201 self.rq.read_workers() 2202 if self.updated_taskhash_queue or self.pending_migrations: 2203 self.process_possible_migrations() 2204 2205 if not hasattr(self, "sorted_setscene_tids"): 2206 # Don't want to sort this set every execution 2207 self.sorted_setscene_tids = sorted(self.rqdata.runq_setscene_tids) 2208 # Resume looping where we left off when we returned to feed the mainloop 2209 self.setscene_tids_generator = itertools.cycle(self.rqdata.runq_setscene_tids) 2210 2211 task = None 2212 if not self.sqdone and self.can_start_task(): 2213 loopcount = 0 2214 # Find the next setscene to run, exit the loop when we've processed all tids or found something to execute 2215 while loopcount < len(self.rqdata.runq_setscene_tids): 2216 loopcount += 1 2217 nexttask = next(self.setscene_tids_generator) 2218 if nexttask in self.sq_buildable and nexttask not in self.sq_running and self.sqdata.stamps[nexttask] not in self.build_stamps.values() and nexttask not in self.sq_harddep_deferred: 2219 if nexttask in self.sq_deferred and self.sq_deferred[nexttask] not in self.runq_complete: 2220 # Skip deferred tasks quickly before the 'expensive' tests below - this is key to performant multiconfig builds 2221 continue 2222 if nexttask not in self.sqdata.unskippable and self.sqdata.sq_revdeps[nexttask] and \ 2223 nexttask not in self.sq_needed_harddeps and \ 2224 self.sqdata.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and \ 2225 self.check_dependencies(nexttask, self.sqdata.sq_revdeps[nexttask]): 2226 if nexttask not in self.rqdata.target_tids: 2227 logger.debug2("Skipping setscene for task %s" % nexttask) 2228 self.sq_task_skip(nexttask) 2229 self.scenequeue_notneeded.add(nexttask) 2230 if nexttask in self.sq_deferred: 2231 del self.sq_deferred[nexttask] 2232 return True 2233 if nexttask in self.sqdata.sq_harddeps_rev and not self.sqdata.sq_harddeps_rev[nexttask].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2234 logger.debug2("Deferring %s due to hard dependencies" % nexttask) 2235 updated = False 2236 for dep in self.sqdata.sq_harddeps_rev[nexttask]: 2237 if dep not in self.sq_needed_harddeps: 2238 logger.debug2("Enabling task %s as it is a hard dependency" % dep) 2239 self.sq_buildable.add(dep) 2240 self.sq_needed_harddeps.add(dep) 2241 updated = True 2242 self.sq_harddep_deferred.add(nexttask) 2243 if updated: 2244 return True 2245 continue 2246 # If covered tasks are running, need to wait for them to complete 2247 for t in self.sqdata.sq_covered_tasks[nexttask]: 2248 if t in self.runq_running and t not in self.runq_complete: 2249 continue 2250 if nexttask in self.sq_deferred: 2251 # Deferred tasks that were still deferred were skipped above so we now need to process 2252 logger.debug("Task %s no longer deferred" % nexttask) 2253 del self.sq_deferred[nexttask] 2254 valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False, summary=False) 2255 if not valid: 2256 logger.debug("%s didn't become valid, skipping setscene" % nexttask) 2257 self.sq_task_failoutright(nexttask) 2258 return True 2259 if nexttask in self.sqdata.outrightfail: 2260 logger.debug2('No package found, so skipping setscene task %s', nexttask) 2261 self.sq_task_failoutright(nexttask) 2262 return True 2263 if nexttask in self.sqdata.unskippable: 2264 logger.debug2("Setscene task %s is unskippable" % nexttask) 2265 task = nexttask 2266 break 2267 if task is not None: 2268 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2269 taskname = taskname + "_setscene" 2270 if self.rq.check_stamp_task(task, taskname_from_tid(task), recurse = True, cache=self.stampcache): 2271 logger.debug2('Stamp for underlying task %s is current, so skipping setscene variant', task) 2272 self.sq_task_failoutright(task) 2273 return True 2274 2275 if self.cooker.configuration.force: 2276 if task in self.rqdata.target_tids: 2277 self.sq_task_failoutright(task) 2278 return True 2279 2280 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2281 logger.debug2('Setscene stamp current task %s, so skip it and its dependencies', task) 2282 self.sq_task_skip(task) 2283 return True 2284 2285 if self.cooker.configuration.skipsetscene: 2286 logger.debug2('No setscene tasks should be executed. Skipping %s', task) 2287 self.sq_task_failoutright(task) 2288 return True 2289 2290 startevent = sceneQueueTaskStarted(task, self.stats, self.rq) 2291 bb.event.fire(startevent, self.cfgData) 2292 2293 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2294 realfn = bb.cache.virtualfn2realfn(taskfn)[0] 2295 runtask = { 2296 'fn' : taskfn, 2297 'task' : task, 2298 'taskname' : taskname, 2299 'taskhash' : self.rqdata.get_task_hash(task), 2300 'unihash' : self.rqdata.get_task_unihash(task), 2301 'quieterrors' : True, 2302 'appends' : self.cooker.collections[mc].get_file_appends(taskfn), 2303 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2], 2304 'taskdepdata' : self.sq_build_taskdepdata(task), 2305 'dry_run' : False, 2306 'taskdep': taskdep, 2307 'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn], 2308 'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn], 2309 'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn] 2310 } 2311 2312 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: 2313 if not mc in self.rq.fakeworker: 2314 self.rq.start_fakeworker(self, mc) 2315 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") 2316 self.rq.fakeworker[mc].process.stdin.flush() 2317 else: 2318 RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") 2319 self.rq.worker[mc].process.stdin.flush() 2320 2321 self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2322 self.build_stamps2.append(self.build_stamps[task]) 2323 self.sq_running.add(task) 2324 self.sq_live.add(task) 2325 self.stats.updateActiveSetscene(len(self.sq_live)) 2326 if self.can_start_task(): 2327 return True 2328 2329 self.update_holdofftasks() 2330 2331 if not self.sq_live and not self.sqdone and not self.sq_deferred and not self.updated_taskhash_queue and not self.holdoff_tasks: 2332 hashequiv_logger.verbose("Setscene tasks completed") 2333 2334 err = self.summarise_scenequeue_errors() 2335 if err: 2336 self.rq.state = runQueueFailed 2337 return True 2338 2339 if self.cooker.configuration.setsceneonly: 2340 self.rq.state = runQueueComplete 2341 return True 2342 self.sqdone = True 2343 2344 if self.stats.total == 0: 2345 # nothing to do 2346 self.rq.state = runQueueComplete 2347 return True 2348 2349 if self.cooker.configuration.setsceneonly: 2350 task = None 2351 else: 2352 task = self.sched.next() 2353 if task is not None: 2354 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2355 2356 if self.rqdata.setscene_ignore_tasks is not None: 2357 if self.check_setscene_ignore_tasks(task): 2358 self.task_fail(task, "setscene ignore_tasks") 2359 return True 2360 2361 if task in self.tasks_covered: 2362 logger.debug2("Setscene covered task %s", task) 2363 self.task_skip(task, "covered") 2364 return True 2365 2366 if self.rq.check_stamp_task(task, taskname, cache=self.stampcache): 2367 logger.debug2("Stamp current task %s", task) 2368 2369 self.task_skip(task, "existing") 2370 self.runq_tasksrun.add(task) 2371 return True 2372 2373 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2374 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2375 startevent = runQueueTaskStarted(task, self.stats, self.rq, 2376 noexec=True) 2377 bb.event.fire(startevent, self.cfgData) 2378 self.runq_running.add(task) 2379 self.stats.taskActive() 2380 if not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2381 bb.build.make_stamp_mcfn(taskname, taskfn) 2382 self.task_complete(task) 2383 return True 2384 else: 2385 startevent = runQueueTaskStarted(task, self.stats, self.rq) 2386 bb.event.fire(startevent, self.cfgData) 2387 2388 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2389 realfn = bb.cache.virtualfn2realfn(taskfn)[0] 2390 runtask = { 2391 'fn' : taskfn, 2392 'task' : task, 2393 'taskname' : taskname, 2394 'taskhash' : self.rqdata.get_task_hash(task), 2395 'unihash' : self.rqdata.get_task_unihash(task), 2396 'quieterrors' : False, 2397 'appends' : self.cooker.collections[mc].get_file_appends(taskfn), 2398 'layername' : self.cooker.collections[mc].calc_bbfile_priority(realfn)[2], 2399 'taskdepdata' : self.build_taskdepdata(task), 2400 'dry_run' : self.rqdata.setscene_enforce, 2401 'taskdep': taskdep, 2402 'fakerootenv' : self.rqdata.dataCaches[mc].fakerootenv[taskfn], 2403 'fakerootdirs' : self.rqdata.dataCaches[mc].fakerootdirs[taskfn], 2404 'fakerootnoenv' : self.rqdata.dataCaches[mc].fakerootnoenv[taskfn] 2405 } 2406 2407 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not (self.cooker.configuration.dry_run or self.rqdata.setscene_enforce): 2408 if not mc in self.rq.fakeworker: 2409 try: 2410 self.rq.start_fakeworker(self, mc) 2411 except OSError as exc: 2412 logger.critical("Failed to spawn fakeroot worker to run %s: %s" % (task, str(exc))) 2413 self.rq.state = runQueueFailed 2414 self.stats.taskFailed() 2415 return True 2416 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") 2417 self.rq.fakeworker[mc].process.stdin.flush() 2418 else: 2419 RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") 2420 self.rq.worker[mc].process.stdin.flush() 2421 2422 self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2423 self.build_stamps2.append(self.build_stamps[task]) 2424 self.runq_running.add(task) 2425 self.stats.taskActive() 2426 if self.can_start_task(): 2427 return True 2428 2429 if self.stats.active > 0 or self.sq_live: 2430 self.rq.read_workers() 2431 return self.rq.active_fds() 2432 2433 # No more tasks can be run. If we have deferred setscene tasks we should run them. 2434 if self.sq_deferred: 2435 deferred_tid = list(self.sq_deferred.keys())[0] 2436 blocking_tid = self.sq_deferred.pop(deferred_tid) 2437 logger.warning("Runqueue deadlocked on deferred tasks, forcing task %s blocked by %s" % (deferred_tid, blocking_tid)) 2438 return True 2439 2440 if self.failed_tids: 2441 self.rq.state = runQueueFailed 2442 return True 2443 2444 # Sanity Checks 2445 err = self.summarise_scenequeue_errors() 2446 for task in self.rqdata.runtaskentries: 2447 if task not in self.runq_buildable: 2448 logger.error("Task %s never buildable!", task) 2449 err = True 2450 elif task not in self.runq_running: 2451 logger.error("Task %s never ran!", task) 2452 err = True 2453 elif task not in self.runq_complete: 2454 logger.error("Task %s never completed!", task) 2455 err = True 2456 2457 if err: 2458 self.rq.state = runQueueFailed 2459 else: 2460 self.rq.state = runQueueComplete 2461 2462 return True 2463 2464 def filtermcdeps(self, task, mc, deps): 2465 ret = set() 2466 for dep in deps: 2467 thismc = mc_from_tid(dep) 2468 if thismc != mc: 2469 continue 2470 ret.add(dep) 2471 return ret 2472 2473 # Build the individual cache entries in advance once to save time 2474 def build_taskdepdata_cache(self): 2475 taskdepdata_cache = {} 2476 for task in self.rqdata.runtaskentries: 2477 (mc, fn, taskname, taskfn) = split_tid_mcfn(task) 2478 taskdepdata_cache[task] = bb.TaskData( 2479 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn], 2480 taskname = taskname, 2481 fn = fn, 2482 deps = self.filtermcdeps(task, mc, self.rqdata.runtaskentries[task].depends), 2483 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn], 2484 taskhash = self.rqdata.runtaskentries[task].hash, 2485 unihash = self.rqdata.runtaskentries[task].unihash, 2486 hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn], 2487 taskhash_deps = self.rqdata.runtaskentries[task].taskhash_deps, 2488 ) 2489 2490 self.taskdepdata_cache = taskdepdata_cache 2491 2492 # We filter out multiconfig dependencies from taskdepdata we pass to the tasks 2493 # as most code can't handle them 2494 def build_taskdepdata(self, task): 2495 taskdepdata = {} 2496 mc = mc_from_tid(task) 2497 next = self.rqdata.runtaskentries[task].depends.copy() 2498 next.add(task) 2499 next = self.filtermcdeps(task, mc, next) 2500 while next: 2501 additional = [] 2502 for revdep in next: 2503 self.taskdepdata_cache[revdep] = self.taskdepdata_cache[revdep]._replace( 2504 unihash=self.rqdata.runtaskentries[revdep].unihash 2505 ) 2506 taskdepdata[revdep] = self.taskdepdata_cache[revdep] 2507 for revdep2 in self.taskdepdata_cache[revdep].deps: 2508 if revdep2 not in taskdepdata: 2509 additional.append(revdep2) 2510 next = additional 2511 2512 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2513 return taskdepdata 2514 2515 def update_holdofftasks(self): 2516 2517 if not self.holdoff_need_update: 2518 return 2519 2520 notcovered = set(self.scenequeue_notcovered) 2521 notcovered |= self.sqdata.cantskip 2522 for tid in self.scenequeue_notcovered: 2523 notcovered |= self.sqdata.sq_covered_tasks[tid] 2524 notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids) 2525 notcovered.intersection_update(self.tasks_scenequeue_done) 2526 2527 covered = set(self.scenequeue_covered) 2528 for tid in self.scenequeue_covered: 2529 covered |= self.sqdata.sq_covered_tasks[tid] 2530 covered.difference_update(notcovered) 2531 covered.intersection_update(self.tasks_scenequeue_done) 2532 2533 for tid in notcovered | covered: 2534 if not self.rqdata.runtaskentries[tid].depends: 2535 self.setbuildable(tid) 2536 elif self.rqdata.runtaskentries[tid].depends.issubset(self.runq_complete): 2537 self.setbuildable(tid) 2538 2539 self.tasks_covered = covered 2540 self.tasks_notcovered = notcovered 2541 2542 self.holdoff_tasks = set() 2543 2544 for tid in self.rqdata.runq_setscene_tids: 2545 if tid not in self.scenequeue_covered and tid not in self.scenequeue_notcovered: 2546 self.holdoff_tasks.add(tid) 2547 2548 for tid in self.holdoff_tasks.copy(): 2549 for dep in self.sqdata.sq_covered_tasks[tid]: 2550 if dep not in self.runq_complete: 2551 self.holdoff_tasks.add(dep) 2552 2553 self.holdoff_need_update = False 2554 2555 def process_possible_migrations(self): 2556 2557 changed = set() 2558 toprocess = set() 2559 for tid, unihash in self.updated_taskhash_queue.copy(): 2560 if tid in self.runq_running and tid not in self.runq_complete: 2561 continue 2562 2563 self.updated_taskhash_queue.remove((tid, unihash)) 2564 2565 if unihash != self.rqdata.runtaskentries[tid].unihash: 2566 # Make sure we rehash any other tasks with the same task hash that we're deferred against. 2567 torehash = [tid] 2568 for deftid in self.sq_deferred: 2569 if self.sq_deferred[deftid] == tid: 2570 torehash.append(deftid) 2571 for hashtid in torehash: 2572 hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash)) 2573 self.rqdata.runtaskentries[hashtid].unihash = unihash 2574 bb.parse.siggen.set_unihash(hashtid, unihash) 2575 toprocess.add(hashtid) 2576 2577 # Work out all tasks which depend upon these 2578 total = set() 2579 next = set() 2580 for p in toprocess: 2581 next |= self.rqdata.runtaskentries[p].revdeps 2582 while next: 2583 current = next.copy() 2584 total = total | next 2585 next = set() 2586 for ntid in current: 2587 next |= self.rqdata.runtaskentries[ntid].revdeps 2588 next.difference_update(total) 2589 2590 # Now iterate those tasks in dependency order to regenerate their taskhash/unihash 2591 next = set() 2592 for p in total: 2593 if not self.rqdata.runtaskentries[p].depends: 2594 next.add(p) 2595 elif self.rqdata.runtaskentries[p].depends.isdisjoint(total): 2596 next.add(p) 2597 2598 starttime = time.time() 2599 lasttime = starttime 2600 2601 # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled 2602 while next: 2603 current = next.copy() 2604 next = set() 2605 ready = {} 2606 for tid in current: 2607 if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): 2608 continue 2609 # get_taskhash for a given tid *must* be called before get_unihash* below 2610 ready[tid] = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches) 2611 2612 unihashes = bb.parse.siggen.get_unihashes(ready.keys()) 2613 2614 for tid in ready: 2615 orighash = self.rqdata.runtaskentries[tid].hash 2616 newhash = ready[tid] 2617 origuni = self.rqdata.runtaskentries[tid].unihash 2618 newuni = unihashes[tid] 2619 2620 # FIXME, need to check it can come from sstate at all for determinism? 2621 remapped = False 2622 if newuni == origuni: 2623 # Nothing to do, we match, skip code below 2624 remapped = True 2625 elif tid in self.scenequeue_covered or tid in self.sq_live: 2626 # Already ran this setscene task or it running. Report the new taskhash 2627 bb.parse.siggen.report_unihash_equiv(tid, newhash, origuni, newuni, self.rqdata.dataCaches) 2628 hashequiv_logger.verbose("Already covered setscene for %s so ignoring rehash (remap)" % (tid)) 2629 remapped = True 2630 2631 if not remapped: 2632 #logger.debug("Task %s hash changes: %s->%s %s->%s" % (tid, orighash, newhash, origuni, newuni)) 2633 self.rqdata.runtaskentries[tid].hash = newhash 2634 self.rqdata.runtaskentries[tid].unihash = newuni 2635 changed.add(tid) 2636 2637 next |= self.rqdata.runtaskentries[tid].revdeps 2638 total.remove(tid) 2639 next.intersection_update(total) 2640 bb.event.check_for_interrupts(self.cooker.data) 2641 2642 if time.time() > (lasttime + 30): 2643 lasttime = time.time() 2644 hashequiv_logger.verbose("Rehash loop slow progress: %s in %s" % (len(total), lasttime - starttime)) 2645 2646 endtime = time.time() 2647 if (endtime-starttime > 60): 2648 hashequiv_logger.verbose("Rehash loop took more than 60s: %s" % (endtime-starttime)) 2649 2650 if changed: 2651 for mc in self.rq.worker: 2652 RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") 2653 for mc in self.rq.fakeworker: 2654 RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") 2655 2656 hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) 2657 2658 for tid in changed: 2659 if tid not in self.rqdata.runq_setscene_tids: 2660 continue 2661 if tid not in self.pending_migrations: 2662 self.pending_migrations.add(tid) 2663 2664 update_tasks = [] 2665 for tid in self.pending_migrations.copy(): 2666 if tid in self.runq_running or tid in self.sq_live: 2667 # Too late, task already running, not much we can do now 2668 self.pending_migrations.remove(tid) 2669 continue 2670 2671 valid = True 2672 # Check no tasks this covers are running 2673 for dep in self.sqdata.sq_covered_tasks[tid]: 2674 if dep in self.runq_running and dep not in self.runq_complete: 2675 hashequiv_logger.debug2("Task %s is running which blocks setscene for %s from running" % (dep, tid)) 2676 valid = False 2677 break 2678 if not valid: 2679 continue 2680 2681 self.pending_migrations.remove(tid) 2682 changed = True 2683 2684 if tid in self.tasks_scenequeue_done: 2685 self.tasks_scenequeue_done.remove(tid) 2686 for dep in self.sqdata.sq_covered_tasks[tid]: 2687 if dep in self.runq_complete and dep not in self.runq_tasksrun: 2688 bb.error("Task %s marked as completed but now needing to rerun? Halting build." % dep) 2689 self.failed_tids.append(tid) 2690 self.rq.state = runQueueCleanUp 2691 return 2692 2693 if dep not in self.runq_complete: 2694 if dep in self.tasks_scenequeue_done and dep not in self.sqdata.unskippable: 2695 self.tasks_scenequeue_done.remove(dep) 2696 2697 if tid in self.sq_buildable: 2698 self.sq_buildable.remove(tid) 2699 if tid in self.sq_running: 2700 self.sq_running.remove(tid) 2701 if tid in self.sqdata.outrightfail: 2702 self.sqdata.outrightfail.remove(tid) 2703 if tid in self.scenequeue_notcovered: 2704 self.scenequeue_notcovered.remove(tid) 2705 if tid in self.scenequeue_covered: 2706 self.scenequeue_covered.remove(tid) 2707 if tid in self.scenequeue_notneeded: 2708 self.scenequeue_notneeded.remove(tid) 2709 2710 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2711 self.sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 2712 2713 if tid in self.stampcache: 2714 del self.stampcache[tid] 2715 2716 if tid in self.build_stamps: 2717 del self.build_stamps[tid] 2718 2719 update_tasks.append(tid) 2720 2721 update_tasks2 = [] 2722 for tid in update_tasks: 2723 harddepfail = False 2724 for t in self.sqdata.sq_harddeps_rev[tid]: 2725 if t in self.scenequeue_notcovered: 2726 harddepfail = True 2727 break 2728 if not harddepfail and self.sqdata.sq_revdeps[tid].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2729 if tid not in self.sq_buildable: 2730 self.sq_buildable.add(tid) 2731 if not self.sqdata.sq_revdeps[tid]: 2732 self.sq_buildable.add(tid) 2733 2734 update_tasks2.append((tid, harddepfail, tid in self.sqdata.valid)) 2735 2736 if update_tasks2: 2737 self.sqdone = False 2738 for mc in sorted(self.sqdata.multiconfigs): 2739 for tid in sorted([t[0] for t in update_tasks2]): 2740 if mc_from_tid(tid) != mc: 2741 continue 2742 h = pending_hash_index(tid, self.rqdata) 2743 if h in self.sqdata.hashes and tid != self.sqdata.hashes[h]: 2744 self.sq_deferred[tid] = self.sqdata.hashes[h] 2745 bb.note("Deferring %s after %s" % (tid, self.sqdata.hashes[h])) 2746 update_scenequeue_data([t[0] for t in update_tasks2], self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=False) 2747 2748 for (tid, harddepfail, origvalid) in update_tasks2: 2749 if tid in self.sqdata.valid and not origvalid: 2750 hashequiv_logger.verbose("Setscene task %s became valid" % tid) 2751 if harddepfail: 2752 logger.debug2("%s has an unavailable hard dependency so skipping" % (tid)) 2753 self.sq_task_failoutright(tid) 2754 2755 if changed: 2756 self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) 2757 self.sq_needed_harddeps = set() 2758 self.sq_harddep_deferred = set() 2759 self.holdoff_need_update = True 2760 2761 def scenequeue_updatecounters(self, task, fail=False): 2762 2763 if fail and task in self.sqdata.sq_harddeps: 2764 for dep in sorted(self.sqdata.sq_harddeps[task]): 2765 if dep in self.scenequeue_covered or dep in self.scenequeue_notcovered: 2766 # dependency could be already processed, e.g. noexec setscene task 2767 continue 2768 noexec, stamppresent = check_setscene_stamps(dep, self.rqdata, self.rq, self.stampcache) 2769 if noexec or stamppresent: 2770 continue 2771 logger.debug2("%s was unavailable and is a hard dependency of %s so skipping" % (task, dep)) 2772 self.sq_task_failoutright(dep) 2773 continue 2774 2775 # For performance, only compute allcovered once if needed 2776 if self.sqdata.sq_deps[task]: 2777 allcovered = self.scenequeue_covered | self.scenequeue_notcovered 2778 for dep in sorted(self.sqdata.sq_deps[task]): 2779 if self.sqdata.sq_revdeps[dep].issubset(allcovered): 2780 if dep not in self.sq_buildable: 2781 self.sq_buildable.add(dep) 2782 2783 next = set([task]) 2784 while next: 2785 new = set() 2786 for t in sorted(next): 2787 self.tasks_scenequeue_done.add(t) 2788 # Look down the dependency chain for non-setscene things which this task depends on 2789 # and mark as 'done' 2790 for dep in self.rqdata.runtaskentries[t].depends: 2791 if dep in self.rqdata.runq_setscene_tids or dep in self.tasks_scenequeue_done: 2792 continue 2793 if self.rqdata.runtaskentries[dep].revdeps.issubset(self.tasks_scenequeue_done): 2794 new.add(dep) 2795 next = new 2796 2797 # If this task was one which other setscene tasks have a hard dependency upon, we need 2798 # to walk through the hard dependencies and allow execution of those which have completed dependencies. 2799 if task in self.sqdata.sq_harddeps: 2800 for dep in self.sq_harddep_deferred.copy(): 2801 if self.sqdata.sq_harddeps_rev[dep].issubset(self.scenequeue_covered | self.scenequeue_notcovered): 2802 self.sq_harddep_deferred.remove(dep) 2803 2804 self.stats.updateCovered(len(self.scenequeue_covered), len(self.scenequeue_notcovered)) 2805 self.holdoff_need_update = True 2806 2807 def sq_task_completeoutright(self, task): 2808 """ 2809 Mark a task as completed 2810 Look at the reverse dependencies and mark any task with 2811 completed dependencies as buildable 2812 """ 2813 2814 logger.debug('Found task %s which could be accelerated', task) 2815 self.scenequeue_covered.add(task) 2816 self.scenequeue_updatecounters(task) 2817 2818 def sq_check_taskfail(self, task): 2819 if self.rqdata.setscene_ignore_tasks is not None: 2820 realtask = task.split('_setscene')[0] 2821 (mc, fn, taskname, taskfn) = split_tid_mcfn(realtask) 2822 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2823 if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): 2824 logger.error('Task %s.%s failed' % (pn, taskname + "_setscene")) 2825 self.rq.state = runQueueCleanUp 2826 2827 def sq_task_complete(self, task): 2828 bb.event.fire(sceneQueueTaskCompleted(task, self.stats, self.rq), self.cfgData) 2829 self.sq_task_completeoutright(task) 2830 2831 def sq_task_fail(self, task, result): 2832 bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData) 2833 self.scenequeue_notcovered.add(task) 2834 self.scenequeue_updatecounters(task, True) 2835 self.sq_check_taskfail(task) 2836 2837 def sq_task_failoutright(self, task): 2838 self.sq_running.add(task) 2839 self.sq_buildable.add(task) 2840 self.scenequeue_notcovered.add(task) 2841 self.scenequeue_updatecounters(task, True) 2842 2843 def sq_task_skip(self, task): 2844 self.sq_running.add(task) 2845 self.sq_buildable.add(task) 2846 self.sq_task_completeoutright(task) 2847 2848 def sq_build_taskdepdata(self, task): 2849 def getsetscenedeps(tid): 2850 deps = set() 2851 (mc, fn, taskname, _) = split_tid_mcfn(tid) 2852 realtid = tid + "_setscene" 2853 idepends = self.rqdata.taskData[mc].taskentries[realtid].idepends 2854 for (depname, idependtask) in idepends: 2855 if depname not in self.rqdata.taskData[mc].build_targets: 2856 continue 2857 2858 depfn = self.rqdata.taskData[mc].build_targets[depname][0] 2859 if depfn is None: 2860 continue 2861 deptid = depfn + ":" + idependtask.replace("_setscene", "") 2862 deps.add(deptid) 2863 return deps 2864 2865 taskdepdata = {} 2866 next = getsetscenedeps(task) 2867 next.add(task) 2868 while next: 2869 additional = [] 2870 for revdep in next: 2871 (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) 2872 deps = getsetscenedeps(revdep) 2873 2874 taskdepdata[revdep] = bb.TaskData( 2875 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn], 2876 taskname = taskname, 2877 fn = fn, 2878 deps = deps, 2879 provides = self.rqdata.dataCaches[mc].fn_provides[taskfn], 2880 taskhash = self.rqdata.runtaskentries[revdep].hash, 2881 unihash = self.rqdata.runtaskentries[revdep].unihash, 2882 hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn], 2883 taskhash_deps = self.rqdata.runtaskentries[revdep].taskhash_deps, 2884 ) 2885 for revdep2 in deps: 2886 if revdep2 not in taskdepdata: 2887 additional.append(revdep2) 2888 next = additional 2889 2890 #bb.note("Task %s: " % task + str(taskdepdata).replace("], ", "],\n")) 2891 return taskdepdata 2892 2893 def check_setscene_ignore_tasks(self, tid): 2894 # Check task that is going to run against the ignore tasks list 2895 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 2896 # Ignore covered tasks 2897 if tid in self.tasks_covered: 2898 return False 2899 # Ignore stamped tasks 2900 if self.rq.check_stamp_task(tid, taskname, cache=self.stampcache): 2901 return False 2902 # Ignore noexec tasks 2903 taskdep = self.rqdata.dataCaches[mc].task_deps[taskfn] 2904 if 'noexec' in taskdep and taskname in taskdep['noexec']: 2905 return False 2906 2907 pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] 2908 if not check_setscene_enforce_ignore_tasks(pn, taskname, self.rqdata.setscene_ignore_tasks): 2909 if tid in self.rqdata.runq_setscene_tids: 2910 msg = ['Task %s.%s attempted to execute unexpectedly and should have been setscened' % (pn, taskname)] 2911 else: 2912 msg = ['Task %s.%s attempted to execute unexpectedly' % (pn, taskname)] 2913 for t in self.scenequeue_notcovered: 2914 msg.append("\nTask %s, unihash %s, taskhash %s" % (t, self.rqdata.runtaskentries[t].unihash, self.rqdata.runtaskentries[t].hash)) 2915 msg.append('\nThis is usually due to missing setscene tasks. Those missing in this build were: %s' % pprint.pformat(self.scenequeue_notcovered)) 2916 logger.error("".join(msg)) 2917 return True 2918 return False 2919 2920class SQData(object): 2921 def __init__(self): 2922 # SceneQueue dependencies 2923 self.sq_deps = {} 2924 # SceneQueue reverse dependencies 2925 self.sq_revdeps = {} 2926 # Injected inter-setscene task dependencies 2927 self.sq_harddeps = {} 2928 self.sq_harddeps_rev = {} 2929 # Cache of stamp files so duplicates can't run in parallel 2930 self.stamps = {} 2931 # Setscene tasks directly depended upon by the build 2932 self.unskippable = set() 2933 # List of setscene tasks which aren't present 2934 self.outrightfail = set() 2935 # A list of normal tasks a setscene task covers 2936 self.sq_covered_tasks = {} 2937 2938def build_scenequeue_data(sqdata, rqdata, sqrq): 2939 2940 sq_revdeps = {} 2941 sq_revdeps_squash = {} 2942 sq_collated_deps = {} 2943 2944 # We can't skip specified target tasks which aren't setscene tasks 2945 sqdata.cantskip = set(rqdata.target_tids) 2946 sqdata.cantskip.difference_update(rqdata.runq_setscene_tids) 2947 sqdata.cantskip.intersection_update(rqdata.runtaskentries) 2948 2949 # We need to construct a dependency graph for the setscene functions. Intermediate 2950 # dependencies between the setscene tasks only complicate the code. This code 2951 # therefore aims to collapse the huge runqueue dependency tree into a smaller one 2952 # only containing the setscene functions. 2953 2954 rqdata.init_progress_reporter.next_stage() 2955 2956 # First process the chains up to the first setscene task. 2957 endpoints = {} 2958 for tid in rqdata.runtaskentries: 2959 sq_revdeps[tid] = copy.copy(rqdata.runtaskentries[tid].revdeps) 2960 sq_revdeps_squash[tid] = set() 2961 if not sq_revdeps[tid] and tid not in rqdata.runq_setscene_tids: 2962 #bb.warn("Added endpoint %s" % (tid)) 2963 endpoints[tid] = set() 2964 2965 rqdata.init_progress_reporter.next_stage() 2966 2967 # Secondly process the chains between setscene tasks. 2968 for tid in rqdata.runq_setscene_tids: 2969 sq_collated_deps[tid] = set() 2970 #bb.warn("Added endpoint 2 %s" % (tid)) 2971 for dep in rqdata.runtaskentries[tid].depends: 2972 if tid in sq_revdeps[dep]: 2973 sq_revdeps[dep].remove(tid) 2974 if dep not in endpoints: 2975 endpoints[dep] = set() 2976 #bb.warn(" Added endpoint 3 %s" % (dep)) 2977 endpoints[dep].add(tid) 2978 2979 rqdata.init_progress_reporter.next_stage() 2980 2981 def process_endpoints(endpoints): 2982 newendpoints = {} 2983 for point, task in endpoints.items(): 2984 tasks = set() 2985 if task: 2986 tasks |= task 2987 if sq_revdeps_squash[point]: 2988 tasks |= sq_revdeps_squash[point] 2989 if point not in rqdata.runq_setscene_tids: 2990 for t in tasks: 2991 sq_collated_deps[t].add(point) 2992 sq_revdeps_squash[point] = set() 2993 if point in rqdata.runq_setscene_tids: 2994 sq_revdeps_squash[point] = tasks 2995 continue 2996 for dep in rqdata.runtaskentries[point].depends: 2997 if point in sq_revdeps[dep]: 2998 sq_revdeps[dep].remove(point) 2999 if tasks: 3000 sq_revdeps_squash[dep] |= tasks 3001 if not sq_revdeps[dep] and dep not in rqdata.runq_setscene_tids: 3002 newendpoints[dep] = task 3003 if newendpoints: 3004 process_endpoints(newendpoints) 3005 3006 process_endpoints(endpoints) 3007 3008 rqdata.init_progress_reporter.next_stage() 3009 3010 # Build a list of tasks which are "unskippable" 3011 # These are direct endpoints referenced by the build upto and including setscene tasks 3012 # Take the build endpoints (no revdeps) and find the sstate tasks they depend upon 3013 new = True 3014 for tid in rqdata.runtaskentries: 3015 if not rqdata.runtaskentries[tid].revdeps: 3016 sqdata.unskippable.add(tid) 3017 sqdata.unskippable |= sqdata.cantskip 3018 while new: 3019 new = False 3020 orig = sqdata.unskippable.copy() 3021 for tid in sorted(orig, reverse=True): 3022 if tid in rqdata.runq_setscene_tids: 3023 continue 3024 if not rqdata.runtaskentries[tid].depends: 3025 # These are tasks which have no setscene tasks in their chain, need to mark as directly buildable 3026 sqrq.setbuildable(tid) 3027 sqdata.unskippable |= rqdata.runtaskentries[tid].depends 3028 if sqdata.unskippable != orig: 3029 new = True 3030 3031 sqrq.tasks_scenequeue_done |= sqdata.unskippable.difference(rqdata.runq_setscene_tids) 3032 3033 rqdata.init_progress_reporter.next_stage(len(rqdata.runtaskentries)) 3034 3035 # Sanity check all dependencies could be changed to setscene task references 3036 for taskcounter, tid in enumerate(rqdata.runtaskentries): 3037 if tid in rqdata.runq_setscene_tids: 3038 pass 3039 elif sq_revdeps_squash[tid]: 3040 bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, halting. Please report this problem.") 3041 else: 3042 del sq_revdeps_squash[tid] 3043 rqdata.init_progress_reporter.update(taskcounter) 3044 3045 rqdata.init_progress_reporter.next_stage() 3046 3047 # Resolve setscene inter-task dependencies 3048 # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene" 3049 # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies 3050 for tid in rqdata.runq_setscene_tids: 3051 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 3052 realtid = tid + "_setscene" 3053 idepends = rqdata.taskData[mc].taskentries[realtid].idepends 3054 sqdata.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) 3055 3056 sqdata.sq_harddeps_rev[tid] = set() 3057 for (depname, idependtask) in idepends: 3058 3059 if depname not in rqdata.taskData[mc].build_targets: 3060 continue 3061 3062 depfn = rqdata.taskData[mc].build_targets[depname][0] 3063 if depfn is None: 3064 continue 3065 deptid = depfn + ":" + idependtask.replace("_setscene", "") 3066 if deptid not in rqdata.runtaskentries: 3067 bb.msg.fatal("RunQueue", "Task %s depends upon non-existent task %s:%s" % (realtid, depfn, idependtask)) 3068 3069 logger.debug2("Adding hard setscene dependency %s for %s" % (deptid, tid)) 3070 3071 if not deptid in sqdata.sq_harddeps: 3072 sqdata.sq_harddeps[deptid] = set() 3073 sqdata.sq_harddeps[deptid].add(tid) 3074 sqdata.sq_harddeps_rev[tid].add(deptid) 3075 3076 rqdata.init_progress_reporter.next_stage() 3077 3078 rqdata.init_progress_reporter.next_stage() 3079 3080 #for tid in sq_revdeps_squash: 3081 # data = "" 3082 # for dep in sq_revdeps_squash[tid]: 3083 # data = data + "\n %s" % dep 3084 # bb.warn("Task %s_setscene: is %s " % (tid, data)) 3085 3086 sqdata.sq_revdeps = sq_revdeps_squash 3087 sqdata.sq_covered_tasks = sq_collated_deps 3088 3089 # Build reverse version of revdeps to populate deps structure 3090 for tid in sqdata.sq_revdeps: 3091 sqdata.sq_deps[tid] = set() 3092 for tid in sqdata.sq_revdeps: 3093 for dep in sqdata.sq_revdeps[tid]: 3094 sqdata.sq_deps[dep].add(tid) 3095 3096 rqdata.init_progress_reporter.next_stage() 3097 3098 sqdata.multiconfigs = set() 3099 for tid in sqdata.sq_revdeps: 3100 sqdata.multiconfigs.add(mc_from_tid(tid)) 3101 if not sqdata.sq_revdeps[tid]: 3102 sqrq.sq_buildable.add(tid) 3103 3104 rqdata.init_progress_reporter.next_stage() 3105 3106 sqdata.noexec = set() 3107 sqdata.stamppresent = set() 3108 sqdata.valid = set() 3109 3110 sqdata.hashes = {} 3111 sqrq.sq_deferred = {} 3112 for mc in sorted(sqdata.multiconfigs): 3113 for tid in sorted(sqdata.sq_revdeps): 3114 if mc_from_tid(tid) != mc: 3115 continue 3116 h = pending_hash_index(tid, rqdata) 3117 if h not in sqdata.hashes: 3118 sqdata.hashes[h] = tid 3119 else: 3120 sqrq.sq_deferred[tid] = sqdata.hashes[h] 3121 bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h])) 3122 3123def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False): 3124 3125 (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) 3126 3127 taskdep = rqdata.dataCaches[mc].task_deps[taskfn] 3128 3129 if 'noexec' in taskdep and taskname in taskdep['noexec']: 3130 bb.build.make_stamp_mcfn(taskname + "_setscene", taskfn) 3131 return True, False 3132 3133 if rq.check_stamp_task(tid, taskname + "_setscene", cache=stampcache): 3134 logger.debug2('Setscene stamp current for task %s', tid) 3135 return False, True 3136 3137 if rq.check_stamp_task(tid, taskname, recurse = True, cache=stampcache): 3138 logger.debug2('Normal stamp current for task %s', tid) 3139 return False, True 3140 3141 return False, False 3142 3143def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True): 3144 3145 tocheck = set() 3146 3147 for tid in sorted(tids): 3148 if tid in sqdata.stamppresent: 3149 sqdata.stamppresent.remove(tid) 3150 if tid in sqdata.valid: 3151 sqdata.valid.remove(tid) 3152 if tid in sqdata.outrightfail: 3153 sqdata.outrightfail.remove(tid) 3154 3155 noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True) 3156 3157 if noexec: 3158 sqdata.noexec.add(tid) 3159 sqrq.sq_task_skip(tid) 3160 logger.debug2("%s is noexec so skipping setscene" % (tid)) 3161 continue 3162 3163 if stamppresent: 3164 sqdata.stamppresent.add(tid) 3165 sqrq.sq_task_skip(tid) 3166 logger.debug2("%s has a valid stamp, skipping" % (tid)) 3167 continue 3168 3169 tocheck.add(tid) 3170 3171 sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary) 3172 3173 for tid in tids: 3174 if tid in sqdata.stamppresent: 3175 continue 3176 if tid in sqdata.valid: 3177 continue 3178 if tid in sqdata.noexec: 3179 continue 3180 if tid in sqrq.scenequeue_covered: 3181 continue 3182 if tid in sqrq.scenequeue_notcovered: 3183 continue 3184 if tid in sqrq.sq_deferred: 3185 continue 3186 sqdata.outrightfail.add(tid) 3187 logger.debug2("%s already handled (fallthrough), skipping" % (tid)) 3188 3189class TaskFailure(Exception): 3190 """ 3191 Exception raised when a task in a runqueue fails 3192 """ 3193 def __init__(self, x): 3194 self.args = x 3195 3196 3197class runQueueExitWait(bb.event.Event): 3198 """ 3199 Event when waiting for task processes to exit 3200 """ 3201 3202 def __init__(self, remain): 3203 self.remain = remain 3204 self.message = "Waiting for %s active tasks to finish" % remain 3205 bb.event.Event.__init__(self) 3206 3207class runQueueEvent(bb.event.Event): 3208 """ 3209 Base runQueue event class 3210 """ 3211 def __init__(self, task, stats, rq): 3212 self.taskid = task 3213 self.taskstring = task 3214 self.taskname = taskname_from_tid(task) 3215 self.taskfile = fn_from_tid(task) 3216 self.taskhash = rq.rqdata.get_task_hash(task) 3217 self.stats = stats.copy() 3218 bb.event.Event.__init__(self) 3219 3220class sceneQueueEvent(runQueueEvent): 3221 """ 3222 Base sceneQueue event class 3223 """ 3224 def __init__(self, task, stats, rq, noexec=False): 3225 runQueueEvent.__init__(self, task, stats, rq) 3226 self.taskstring = task + "_setscene" 3227 self.taskname = taskname_from_tid(task) + "_setscene" 3228 self.taskfile = fn_from_tid(task) 3229 self.taskhash = rq.rqdata.get_task_hash(task) 3230 3231class runQueueTaskStarted(runQueueEvent): 3232 """ 3233 Event notifying a task was started 3234 """ 3235 def __init__(self, task, stats, rq, noexec=False): 3236 runQueueEvent.__init__(self, task, stats, rq) 3237 self.noexec = noexec 3238 3239class sceneQueueTaskStarted(sceneQueueEvent): 3240 """ 3241 Event notifying a setscene task was started 3242 """ 3243 def __init__(self, task, stats, rq, noexec=False): 3244 sceneQueueEvent.__init__(self, task, stats, rq) 3245 self.noexec = noexec 3246 3247class runQueueTaskFailed(runQueueEvent): 3248 """ 3249 Event notifying a task failed 3250 """ 3251 def __init__(self, task, stats, exitcode, rq, fakeroot_log=None): 3252 runQueueEvent.__init__(self, task, stats, rq) 3253 self.exitcode = exitcode 3254 self.fakeroot_log = fakeroot_log 3255 3256 def __str__(self): 3257 if self.fakeroot_log: 3258 return "Task (%s) failed with exit code '%s' \nPseudo log:\n%s" % (self.taskstring, self.exitcode, self.fakeroot_log) 3259 else: 3260 return "Task (%s) failed with exit code '%s'" % (self.taskstring, self.exitcode) 3261 3262class sceneQueueTaskFailed(sceneQueueEvent): 3263 """ 3264 Event notifying a setscene task failed 3265 """ 3266 def __init__(self, task, stats, exitcode, rq): 3267 sceneQueueEvent.__init__(self, task, stats, rq) 3268 self.exitcode = exitcode 3269 3270 def __str__(self): 3271 return "Setscene task (%s) failed with exit code '%s' - real task will be run instead" % (self.taskstring, self.exitcode) 3272 3273class sceneQueueComplete(sceneQueueEvent): 3274 """ 3275 Event when all the sceneQueue tasks are complete 3276 """ 3277 def __init__(self, stats, rq): 3278 self.stats = stats.copy() 3279 bb.event.Event.__init__(self) 3280 3281class runQueueTaskCompleted(runQueueEvent): 3282 """ 3283 Event notifying a task completed 3284 """ 3285 3286class sceneQueueTaskCompleted(sceneQueueEvent): 3287 """ 3288 Event notifying a setscene task completed 3289 """ 3290 3291class runQueueTaskSkipped(runQueueEvent): 3292 """ 3293 Event notifying a task was skipped 3294 """ 3295 def __init__(self, task, stats, rq, reason): 3296 runQueueEvent.__init__(self, task, stats, rq) 3297 self.reason = reason 3298 3299class taskUniHashUpdate(bb.event.Event): 3300 """ 3301 Base runQueue event class 3302 """ 3303 def __init__(self, task, unihash): 3304 self.taskid = task 3305 self.unihash = unihash 3306 bb.event.Event.__init__(self) 3307 3308class runQueuePipe(): 3309 """ 3310 Abstraction for a pipe between a worker thread and the server 3311 """ 3312 def __init__(self, pipein, pipeout, d, rq, rqexec, fakerootlogs=None): 3313 self.input = pipein 3314 if pipeout: 3315 pipeout.close() 3316 bb.utils.nonblockingfd(self.input) 3317 self.queue = bytearray() 3318 self.d = d 3319 self.rq = rq 3320 self.rqexec = rqexec 3321 self.fakerootlogs = fakerootlogs 3322 3323 def read(self): 3324 for workers, name in [(self.rq.worker, "Worker"), (self.rq.fakeworker, "Fakeroot")]: 3325 for worker in workers.values(): 3326 worker.process.poll() 3327 if worker.process.returncode is not None and not self.rq.teardown: 3328 bb.error("%s process (%s) exited unexpectedly (%s), shutting down..." % (name, worker.process.pid, str(worker.process.returncode))) 3329 self.rq.finish_runqueue(True) 3330 3331 start = len(self.queue) 3332 try: 3333 self.queue.extend(self.input.read(512 * 1024) or b"") 3334 except (OSError, IOError) as e: 3335 if e.errno != errno.EAGAIN: 3336 raise 3337 end = len(self.queue) 3338 found = True 3339 while found and self.queue: 3340 found = False 3341 index = self.queue.find(b"</event>") 3342 while index != -1 and self.queue.startswith(b"<event>"): 3343 try: 3344 event = pickle.loads(self.queue[7:index]) 3345 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3346 if isinstance(e, pickle.UnpicklingError) and "truncated" in str(e): 3347 # The pickled data could contain "</event>" so search for the next occurance 3348 # unpickling again, this should be the only way an unpickle error could occur 3349 index = self.queue.find(b"</event>", index + 1) 3350 continue 3351 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[7:index])) 3352 bb.event.fire_from_worker(event, self.d) 3353 if isinstance(event, taskUniHashUpdate): 3354 self.rqexec.updated_taskhash_queue.append((event.taskid, event.unihash)) 3355 found = True 3356 self.queue = self.queue[index+8:] 3357 index = self.queue.find(b"</event>") 3358 index = self.queue.find(b"</exitcode>") 3359 while index != -1 and self.queue.startswith(b"<exitcode>"): 3360 try: 3361 task, status = pickle.loads(self.queue[10:index]) 3362 except (ValueError, pickle.UnpicklingError, AttributeError, IndexError) as e: 3363 bb.msg.fatal("RunQueue", "failed load pickle '%s': '%s'" % (e, self.queue[10:index])) 3364 (_, _, _, taskfn) = split_tid_mcfn(task) 3365 fakerootlog = None 3366 if self.fakerootlogs and taskfn and taskfn in self.fakerootlogs: 3367 fakerootlog = self.fakerootlogs[taskfn] 3368 self.rqexec.runqueue_process_waitpid(task, status, fakerootlog=fakerootlog) 3369 found = True 3370 self.queue = self.queue[index+11:] 3371 index = self.queue.find(b"</exitcode>") 3372 return (end > start) 3373 3374 def close(self): 3375 while self.read(): 3376 continue 3377 if self.queue: 3378 print("Warning, worker left partial message: %s" % self.queue) 3379 self.input.close() 3380 3381def get_setscene_enforce_ignore_tasks(d, targets): 3382 if d.getVar('BB_SETSCENE_ENFORCE') != '1': 3383 return None 3384 ignore_tasks = (d.getVar("BB_SETSCENE_ENFORCE_IGNORE_TASKS") or "").split() 3385 outlist = [] 3386 for item in ignore_tasks[:]: 3387 if item.startswith('%:'): 3388 for (mc, target, task, fn) in targets: 3389 outlist.append(target + ':' + item.split(':')[1]) 3390 else: 3391 outlist.append(item) 3392 return outlist 3393 3394def check_setscene_enforce_ignore_tasks(pn, taskname, ignore_tasks): 3395 import fnmatch 3396 if ignore_tasks is not None: 3397 item = '%s:%s' % (pn, taskname) 3398 for ignore_tasks in ignore_tasks: 3399 if fnmatch.fnmatch(item, ignore_tasks): 3400 return True 3401 return False 3402 return True 3403